You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:53 UTC

[15/43] tez git commit: TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly (zjffdu)

TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a6808ce
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a6808ce
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a6808ce

Branch: refs/heads/TEZ-2003
Commit: 4a6808ce4c99458653bbe4328dfcad24649a48fb
Parents: 05f77fe
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri May 8 12:42:46 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri May 8 12:42:46 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  61 ++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   1 -
 .../apache/tez/dag/app/dag/impl/TestCommit.java | 454 ++++++++++++++++++-
 4 files changed, 477 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba8e9d8..3520768 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
     Default max limit increased. Should not affect existing users.
 
 ALL CHANGES:
+  TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly
   TEZ-776. Reduce AM mem usage caused by storing TezEvents
   TEZ-2423. Tez UI: Remove Attempt Index column from task->attempts page
   TEZ-2416. Tez UI: Make tooltips display faster.

http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f769565..1726c18 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -212,7 +212,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   @VisibleForTesting
   Map<OutputKey, ListenableFuture<Void>> commitFutures
     = new HashMap<OutputKey, ListenableFuture<Void>>();
-  private Set<OutputKey> succeededCommits = new HashSet<OutputKey>();
 
   private static final DiagnosticsUpdateTransition
       DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
@@ -457,7 +456,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     Set<String> outputs;
     Map<String, InputDescriptor> edgeMergedInputs;
     int successfulMembers;
-    boolean committed;
+    int successfulCommits;
+    boolean commitStarted;
+
     VertexGroupInfo(PlanVertexGroupInfo groupInfo) {
       groupName = groupInfo.getGroupName();
       groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList());
@@ -468,10 +469,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
       outputs = Sets.newHashSet(groupInfo.getOutputsList());
       successfulMembers = 0;
-      committed = false;
+      successfulCommits = 0;
+      commitStarted = false;
+    }
+
+    public boolean isInCommitting() {
+      return commitStarted && successfulCommits < outputs.size();
+    }
+
+    public boolean isCommitted() {
+      return commitStarted && successfulCommits == outputs.size();
     }
   }
 
+
   public DAGImpl(TezDAGID dagId,
       Configuration amConf,
       DAGPlan jobPlan,
@@ -962,7 +973,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     // commit all shared outputs
     for (final VertexGroupInfo groupInfo : vertexGroups.values()) {
       if (!groupInfo.outputs.isEmpty()) {
-        groupInfo.committed = true;
+        groupInfo.commitStarted = true;
         final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
         for (final String outputName : groupInfo.outputs) {
           final OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true);
@@ -1920,7 +1931,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
                 + " data, groupName=" + groupInfo.groupName);
             continue;
           }
-          groupInfo.committed = true;
+          groupInfo.commitStarted = true;
           final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
           try {
             appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
@@ -1966,11 +1977,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       + ", vertexId=" + vertex.getVertexId());
 
     if (!commitAllOutputsOnSuccess) {
-      // partial output may already have been committed. fail if so
+      // partial output may already have been in committing or committed. fail if so
       List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertex.getName());
       if (groupList != null) {
         for (VertexGroupInfo groupInfo : groupList) {
-          if (groupInfo.committed) {
+          if (groupInfo.isInCommitting()) {
+            String msg = "Aborting job as committing vertex: "
+                + vertex.getLogIdentifier() + " is re-running";
+            LOG.info(msg);
+            addDiagnostic(msg);
+            enactKill(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING,
+                VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING);
+            return true;
+          } else if (groupInfo.isCommitted()) {
             String msg = "Aborting job as committed vertex: "
                 + vertex.getLogIdentifier() + " is re-running";
             LOG.info(msg);
@@ -2091,17 +2110,23 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     boolean recoveryFailed = false;
     if (commitCompletedEvent.isSucceeded()) {
       LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputKey());
-      succeededCommits.add(commitCompletedEvent.getOutputKey());
-      if (!commitAllOutputsOnSuccess) {
-        try {
-          appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
-              new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(),
-                  clock.getTime())));
-        } catch (IOException e) {
-          String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e);
-          addDiagnostic(diag);
-          LOG.error(diag);
-          recoveryFailed = true;
+      OutputKey outputKey = commitCompletedEvent.getOutputKey();
+      if (outputKey.isVertexGroupOutput){
+        VertexGroupInfo vertexGroup = vertexGroups.get(outputKey.getEntityName());
+        vertexGroup.successfulCommits++;
+        if (vertexGroup.isCommitted()) {
+          if (!commitAllOutputsOnSuccess) {
+            try {
+              appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+                  new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(),
+                      clock.getTime())));
+            } catch (IOException e) {
+              String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e);
+              addDiagnostic(diag);
+              LOG.error(diag);
+              recoveryFailed = true;
+            }
+          }
         }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a16ee0a..3a9558d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1901,7 +1901,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             vertex.trySetTerminationCause(VertexTerminationCause.RECOVERY_ERROR);
             return vertex.finished(VertexState.FAILED);
           }
-        } else {
           firstCommit = false;
         }
         VertexCommitCallback commitCallback = new VertexCommitCallback(vertex, outputName);

http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 0df8a4f..8fc29c2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -26,9 +26,9 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -63,7 +63,6 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -96,11 +95,13 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.OutputKey;
 import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.*;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.api.Event;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -144,7 +145,7 @@ public class TestCommit {
   private TaskHeartbeatHandler thh;
   private Clock clock = new SystemClock();
   private DAGFinishEventHandler dagFinishEventHandler;
-  private HistoryEventHandler historyEventHandler;
+  private MockHistoryEventHandler historyEventHandler;
   private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
 
   private ExecutorService rawExecutor;
@@ -305,7 +306,7 @@ public class TestCommit {
     execService = MoreExecutors.listeningDecorator(rawExecutor);
 
     doReturn(execService).when(appContext).getExecService();
-    historyEventHandler = mock(HistoryEventHandler.class);
+    historyEventHandler = new MockHistoryEventHandler(appContext);
     aclManager = new ACLManager("amUser");
     doReturn(conf).when(appContext).getAMConf();
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
@@ -441,6 +442,63 @@ public class TestCommit {
     return dag.createDag(conf, null, null, null, true);
   }
 
+  // v1->v3
+  // v2->v3
+  // vertex_group (v1, v2) has 2 shared outputs
+  private DAGPlan createDAGPlanWith2VertexGroupOutputs(boolean vertexGroupCommitSucceeded1,
+    boolean vertexGroupCommitSucceeded2, boolean v3CommitSucceeded) throws Exception {
+    LOG.info("Setting up group dag plan");
+    int dummyTaskCount = 1;
+    Resource dummyTaskResource = Resource.newInstance(1, 1);
+    org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create(
+            "vertex1", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+            dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create(
+        "vertex2", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+        dummyTaskResource);
+    org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create(
+        "vertex3", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+        dummyTaskResource);
+
+    DAG dag = DAG.create("testDag");
+    String groupName1 = "uv12";
+    OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create(
+        CountingOutputCommitter.class.getName()).setUserPayload(
+        UserPayload.create(ByteBuffer
+            .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                !vertexGroupCommitSucceeded1, true).toUserPayload())));
+    OutputCommitterDescriptor ocd2 = OutputCommitterDescriptor.create(
+    CountingOutputCommitter.class.getName()).setUserPayload(
+    UserPayload.create(ByteBuffer
+        .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                !vertexGroupCommitSucceeded2, true).toUserPayload())));
+    OutputCommitterDescriptor ocd3 = OutputCommitterDescriptor.create(
+        CountingOutputCommitter.class.getName()).setUserPayload(
+        UserPayload.create(ByteBuffer
+            .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+                !v3CommitSucceeded, true).toUserPayload())));
+
+    org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1,
+        v1, v2);
+    OutputDescriptor outDesc = OutputDescriptor.create("output.class");
+    uv12.addDataSink("v12Out1", DataSinkDescriptor.create(outDesc, ocd1, null));
+    uv12.addDataSink("v12Out2", DataSinkDescriptor.create(outDesc, ocd2, null));
+    v3.addDataSink("v3Out", DataSinkDescriptor.create(outDesc, ocd3, null));
+
+    GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("dummy output class"),
+        InputDescriptor.create("dummy input class")), InputDescriptor
+        .create("merge.class"));
+
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    return dag.createDag(conf, null, null, null, true);
+  }
+
   private DAGPlan createDAGPlan_SingleVertexWith2Committer(
       boolean commit1Succeed, boolean commit2Succeed) throws IOException {
     return createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false);
@@ -493,7 +551,7 @@ public class TestCommit {
   }
 
   @Test(timeout = 5000)
-  public void testVertexSucceedWithoutCommit() throws Exception {
+  public void testVertexCommit_OnDAGSuccess() throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         true);
     setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
@@ -509,6 +567,9 @@ public class TestCommit {
         .getOutputCommitter("v1Out_1");
     CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
         .getOutputCommitter("v1Out_2");
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+
     Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
     Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
     Assert.assertEquals(0, v1OutputCommitter_1.commitCounter);
@@ -544,6 +605,8 @@ public class TestCommit {
     waitUntil(v1, VertexState.SUCCEEDED);
     Assert.assertNull(v1.getTerminationCause());
     Assert.assertTrue(v1.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
 
     Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
     Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
@@ -576,8 +639,11 @@ public class TestCommit {
     Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
         v1.getTerminationCause());
     Assert.assertTrue(v1.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
     CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
         .getOutputCommitter("v1Out_2");
+
     Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
     Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
     Assert.assertEquals(1, v1OutputCommitter_1.commitCounter);
@@ -606,11 +672,13 @@ public class TestCommit {
     v1OutputCommitter_1.unblockCommit();
     waitForCommitCompleted(v1, "v1Out_1");
     Assert.assertEquals(VertexState.COMMITTING, v1.getState());
-    
+
     CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
         .getOutputCommitter("v1Out_2");
     v1OutputCommitter_2.unblockCommit();
     waitUntil(v1, VertexState.FAILED);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
 
     Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
         v1.getTerminationCause());
@@ -647,6 +715,8 @@ public class TestCommit {
     Assert.assertEquals(DAGState.KILLED, dag.getState());
     Assert
         .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
 
     CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
         .getOutputCommitter("v1Out_1");
@@ -685,6 +755,8 @@ public class TestCommit {
     Assert.assertEquals(DAGState.FAILED, dag.getState());
     Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
         dag.getTerminationCause());
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
 
     CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
         .getOutputCommitter("v1Out_1");
@@ -727,6 +799,8 @@ public class TestCommit {
     Assert.assertEquals(DAGState.FAILED, dag.getState());
     Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
         dag.getTerminationCause());
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
 
     CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
         .getOutputCommitter("v1Out_1");
@@ -764,6 +838,8 @@ public class TestCommit {
     Assert.assertEquals(DAGState.ERROR, dag.getState());
     Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR,
         dag.getTerminationCause());
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
 
     CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
         .getOutputCommitter("v1Out_1");
@@ -811,6 +887,20 @@ public class TestCommit {
     waitUntil(dag, DAGState.SUCCEEDED);
     Assert.assertTrue(dag.commitFutures.isEmpty());
     Assert.assertNull(dag.getTerminationCause());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("v3", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("v3", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -857,10 +947,21 @@ public class TestCommit {
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
-
     Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
         dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -899,6 +1000,8 @@ public class TestCommit {
         .getOutputCommitter("v3Out");
     v12OutputCommitter.unblockCommit();
     waitUntil(dag, DAGState.FAILED);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
 
     Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
@@ -907,6 +1010,18 @@ public class TestCommit {
     Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
         dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -953,6 +1068,18 @@ public class TestCommit {
     v12OutputCommitter.unblockCommit();
     waitUntil(dag, DAGState.SUCCEEDED);
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1001,6 +1128,18 @@ public class TestCommit {
     v3OutputCommitter.unblockCommit();
     waitUntil(dag, DAGState.SUCCEEDED);
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1013,6 +1152,71 @@ public class TestCommit {
     Assert.assertEquals(0, v3OutputCommitter.abortCounter);
   }
 
+  // test DAGCommitSucceeded when vertex group has multiple shared outputs
+  @Test(timeout = 5000)
+  public void testDAGCommitSucceeded3_OnVertexSuccess() throws Exception {
+    conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+        false);
+    setupDAG(createDAGPlanWith2VertexGroupOutputs(true, true, true));
+    initDAG(dag);
+    startDAG(dag);
+    VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+    VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+    VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+    v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+        TaskState.SUCCEEDED));
+    Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+    Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v12OutputCommitter1 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out1");
+    v12OutputCommitter1.unblockCommit();
+    CountingOutputCommitter v12OutputCommitter2 = (CountingOutputCommitter) v1
+        .getOutputCommitter("v12Out2");
+    v12OutputCommitter2.unblockCommit();
+    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+    CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+        .getOutputCommitter("v3Out");
+    v3OutputCommitter.unblockCommit();
+    waitUntil(dag, DAGState.SUCCEEDED);
+    Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
+    Assert.assertEquals(1, v12OutputCommitter1.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter1.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter1.commitCounter);
+    Assert.assertEquals(0, v12OutputCommitter1.abortCounter);
+
+    Assert.assertEquals(1, v12OutputCommitter2.initCounter);
+    Assert.assertEquals(1, v12OutputCommitter2.setupCounter);
+    Assert.assertEquals(1, v12OutputCommitter2.commitCounter);
+    Assert.assertEquals(0, v12OutputCommitter2.abortCounter);
+
+    Assert.assertEquals(1, v3OutputCommitter.initCounter);
+    Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+    Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+    Assert.assertEquals(0, v3OutputCommitter.abortCounter);
+  }
+
   // commit of vertex group(v1,v2) fail and commit of v3 is not completed
   @Test(timeout = 5000)
   public void testDAGCommitFail1_OnVertexSuccess() throws Exception {
@@ -1048,6 +1252,16 @@ public class TestCommit {
     Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
         dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1097,6 +1311,16 @@ public class TestCommit {
     Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
         dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
@@ -1152,6 +1376,16 @@ public class TestCommit {
     Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
         dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1202,6 +1436,16 @@ public class TestCommit {
     Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
         dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(1, v12OutputCommitter.initCounter);
     Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1215,7 +1459,7 @@ public class TestCommit {
   }
 
   @Test (timeout = 5000)
-  public void testDAGInternalErrorWhileCommiting() throws Exception {
+  public void testDAGInternalErrorWhileCommiting_OnDAGSuccess() throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         true);
     setupDAG(createDAGPlan(true, true));
@@ -1236,6 +1480,17 @@ public class TestCommit {
     waitUntil(dag, DAGState.ERROR);
 
     Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
     CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1282,6 +1537,16 @@ public class TestCommit {
     Assert
         .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
@@ -1335,6 +1600,16 @@ public class TestCommit {
     Assert
         .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
@@ -1383,7 +1658,16 @@ public class TestCommit {
     Assert
         .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
-
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
@@ -1424,6 +1708,17 @@ public class TestCommit {
 
     Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
     CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1461,6 +1756,17 @@ public class TestCommit {
     waitUntil(dag, DAGState.ERROR);
 
     Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
     CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1476,14 +1782,12 @@ public class TestCommit {
     Assert.assertEquals(1, v3OutputCommitter.abortCounter);
   }
 
-  @Test(timeout = 5000)
-  public void testVertexGroupCommitFinishedEventFail() throws Exception {
+  @Test (timeout = 5000)
+  public void testVertexGroupCommitFinishedEventFail_OnVertexSuccess() throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         false);
     setupDAG(createDAGPlan(true, true));
-    MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext);
-    doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler();
-    mockHistoryEventHandler.failVertexGroupCommitFinishedEvent = true;
+    historyEventHandler.failVertexGroupCommitFinishedEvent = true;
     
     initDAG(dag);
     startDAG(dag);
@@ -1503,6 +1807,16 @@ public class TestCommit {
         .getOutputCommitter("v3Out");
     v12OutputCommitter.unblockCommit();
     waitUntil(dag, DAGState.FAILED);
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     Assert.assertEquals(DAGState.FAILED, dag.getState());
     Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE,
@@ -1524,13 +1838,11 @@ public class TestCommit {
   }
 
   @Test(timeout = 5000)
-  public void testDAGCommitStartedEventFail() throws Exception {
+  public void testDAGCommitStartedEventFail_OnDAGSuccess() throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         true);
     setupDAG(createDAGPlan(true, true));
-    MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext);
-    doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler();
-    mockHistoryEventHandler.failDAGCommitStartedEvent = true;
+    historyEventHandler.failDAGCommitStartedEvent = true;
     
     initDAG(dag);
     startDAG(dag);
@@ -1547,6 +1859,17 @@ public class TestCommit {
     waitUntil(dag, DAGState.FAILED);
     Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, dag.getTerminationCause());
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
     CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1567,7 +1890,7 @@ public class TestCommit {
   // test commit will be canceled no matter it is started or still in the threadpool
   // ControlledThreadPoolExecutor is used for to not schedule the commits
   @Test(timeout = 5000)
-  public void testCommitCanceled() throws Exception {
+  public void testCommitCanceled_OnDAGSuccess() throws Exception {
     conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
         true);
     setupDAG(createDAGPlan(true, true));
@@ -1598,6 +1921,16 @@ public class TestCommit {
     Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
     // mean the commits have been canceled
     Assert.assertTrue(dag.commitFutures.isEmpty());
+    historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+    historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+    historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+    historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+    historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+    historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+    historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
 
     CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
         .getOutputCommitter("v12Out");
@@ -1635,7 +1968,7 @@ public class TestCommit {
 
     public boolean failVertexGroupCommitFinishedEvent = false;
     public boolean failDAGCommitStartedEvent = false;
-
+    public List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
     public MockHistoryEventHandler(AppContext context) {
       super(context);
     }
@@ -1650,6 +1983,85 @@ public class TestCommit {
           && failDAGCommitStartedEvent) {
         throw new IOException("fail DAGCommitStartedEvent");
       }
+      historyEvents.add(event.getHistoryEvent());
+    }
+
+    public void verifyVertexGroupCommitStartedEvent(String groupName, int expectedTimes) {
+      int actualTimes = 0;
+      for (HistoryEvent event : historyEvents) {
+        if (event.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_STARTED) {
+          VertexGroupCommitStartedEvent startedEvent = (VertexGroupCommitStartedEvent)event;
+          if (startedEvent.getVertexGroupName().equals(groupName)) {
+            actualTimes ++;
+          }
+        }
+      }
+      Assert.assertEquals(expectedTimes, actualTimes);
+    }
+
+    public void verifyVertexGroupCommitFinishedEvent(String groupName, int expectedTimes) {
+      int actualTimes = 0;
+      for (HistoryEvent event : historyEvents) {
+        if (event.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED) {
+          VertexGroupCommitFinishedEvent finishedEvent = (VertexGroupCommitFinishedEvent)event;
+          if (finishedEvent.getVertexGroupName().equals(groupName)) {
+            actualTimes ++;
+          }
+        }
+      }
+      Assert.assertEquals(expectedTimes, actualTimes);
+    }
+
+    public void verifyVertexCommitStartedEvent(TezVertexID vertexId, int expectedTimes) {
+      int actualTimes = 0;
+      for (HistoryEvent event : historyEvents) {
+        if (event.getEventType() == HistoryEventType.VERTEX_COMMIT_STARTED) {
+          VertexCommitStartedEvent startedEvent = (VertexCommitStartedEvent)event;
+          if (startedEvent.getVertexID().equals(vertexId)) {
+            actualTimes ++;
+          }
+        }
+      }
+      Assert.assertEquals(expectedTimes, actualTimes);
+    }
+
+    public void verifyVertexFinishedEvent(TezVertexID vertexId, int expectedTimes) {
+      int actualTimes = 0;
+      for (HistoryEvent event : historyEvents) {
+        if (event.getEventType() == HistoryEventType.VERTEX_FINISHED) {
+          VertexFinishedEvent finishedEvent = (VertexFinishedEvent)event;
+          if (finishedEvent.getVertexID().equals(vertexId)) {
+            actualTimes ++;
+          }
+        }
+      }
+      Assert.assertEquals(expectedTimes, actualTimes);
+    }
+
+    public void verifyDAGCommitStartedEvent(TezDAGID dagId, int expectedTimes) {
+      int actualTimes = 0;
+      for (HistoryEvent event : historyEvents) {
+        if (event.getEventType() == HistoryEventType.DAG_COMMIT_STARTED) {
+          DAGCommitStartedEvent startedEvent = (DAGCommitStartedEvent)event;
+          if (startedEvent.getDagID().equals(dagId)) {
+            actualTimes ++;
+          }
+        }
+      }
+      Assert.assertEquals(expectedTimes, actualTimes);
+    }
+
+    public void verifyDAGFinishedEvent(TezDAGID dagId, int expectedTimes) {
+      int actualTimes = 0;
+      for (HistoryEvent event : historyEvents) {
+        if (event.getEventType() == HistoryEventType.DAG_FINISHED) {
+          DAGFinishedEvent startedEvent = (DAGFinishedEvent)event;
+          if (startedEvent.getDagID().equals(dagId)) {
+            actualTimes ++;
+          }
+        }
+      }
+      Assert.assertEquals(expectedTimes, actualTimes);
     }
   }