You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/05/08 06:43:22 UTC
tez git commit: TEZ-2410. VertexGroupCommitFinishedEvent &
VertexCommitStartedEvent is not logged correctly (zjffdu)
Repository: tez
Updated Branches:
refs/heads/master 05f77fe2b -> 4a6808ce4
TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a6808ce
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a6808ce
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a6808ce
Branch: refs/heads/master
Commit: 4a6808ce4c99458653bbe4328dfcad24649a48fb
Parents: 05f77fe
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri May 8 12:42:46 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri May 8 12:42:46 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 61 ++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 1 -
.../apache/tez/dag/app/dag/impl/TestCommit.java | 454 ++++++++++++++++++-
4 files changed, 477 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ba8e9d8..3520768 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -20,6 +20,7 @@ INCOMPATIBLE CHANGES
Default max limit increased. Should not affect existing users.
ALL CHANGES:
+ TEZ-2410. VertexGroupCommitFinishedEvent & VertexCommitStartedEvent is not logged correctly
TEZ-776. Reduce AM mem usage caused by storing TezEvents
TEZ-2423. Tez UI: Remove Attempt Index column from task->attempts page
TEZ-2416. Tez UI: Make tooltips display faster.
http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index f769565..1726c18 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -212,7 +212,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@VisibleForTesting
Map<OutputKey, ListenableFuture<Void>> commitFutures
= new HashMap<OutputKey, ListenableFuture<Void>>();
- private Set<OutputKey> succeededCommits = new HashSet<OutputKey>();
private static final DiagnosticsUpdateTransition
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
@@ -457,7 +456,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Set<String> outputs;
Map<String, InputDescriptor> edgeMergedInputs;
int successfulMembers;
- boolean committed;
+ int successfulCommits;
+ boolean commitStarted;
+
VertexGroupInfo(PlanVertexGroupInfo groupInfo) {
groupName = groupInfo.getGroupName();
groupMembers = Sets.newHashSet(groupInfo.getGroupMembersList());
@@ -468,10 +469,20 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
outputs = Sets.newHashSet(groupInfo.getOutputsList());
successfulMembers = 0;
- committed = false;
+ successfulCommits = 0;
+ commitStarted = false;
+ }
+
+ public boolean isInCommitting() {
+ return commitStarted && successfulCommits < outputs.size();
+ }
+
+ public boolean isCommitted() {
+ return commitStarted && successfulCommits == outputs.size();
}
}
+
public DAGImpl(TezDAGID dagId,
Configuration amConf,
DAGPlan jobPlan,
@@ -962,7 +973,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// commit all shared outputs
for (final VertexGroupInfo groupInfo : vertexGroups.values()) {
if (!groupInfo.outputs.isEmpty()) {
- groupInfo.committed = true;
+ groupInfo.commitStarted = true;
final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
for (final String outputName : groupInfo.outputs) {
final OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true);
@@ -1920,7 +1931,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ " data, groupName=" + groupInfo.groupName);
continue;
}
- groupInfo.committed = true;
+ groupInfo.commitStarted = true;
final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
try {
appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
@@ -1966,11 +1977,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
+ ", vertexId=" + vertex.getVertexId());
if (!commitAllOutputsOnSuccess) {
- // partial output may already have been committed. fail if so
+ // partial output may already have been in committing or committed. fail if so
List<VertexGroupInfo> groupList = vertexGroupInfo.get(vertex.getName());
if (groupList != null) {
for (VertexGroupInfo groupInfo : groupList) {
- if (groupInfo.committed) {
+ if (groupInfo.isInCommitting()) {
+ String msg = "Aborting job as committing vertex: "
+ + vertex.getLogIdentifier() + " is re-running";
+ LOG.info(msg);
+ addDiagnostic(msg);
+ enactKill(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING,
+ VertexTerminationCause.VERTEX_RERUN_IN_COMMITTING);
+ return true;
+ } else if (groupInfo.isCommitted()) {
String msg = "Aborting job as committed vertex: "
+ vertex.getLogIdentifier() + " is re-running";
LOG.info(msg);
@@ -2091,17 +2110,23 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
boolean recoveryFailed = false;
if (commitCompletedEvent.isSucceeded()) {
LOG.info("Commit succeeded for output:" + commitCompletedEvent.getOutputKey());
- succeededCommits.add(commitCompletedEvent.getOutputKey());
- if (!commitAllOutputsOnSuccess) {
- try {
- appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
- new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(),
- clock.getTime())));
- } catch (IOException e) {
- String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e);
- addDiagnostic(diag);
- LOG.error(diag);
- recoveryFailed = true;
+ OutputKey outputKey = commitCompletedEvent.getOutputKey();
+ if (outputKey.isVertexGroupOutput){
+ VertexGroupInfo vertexGroup = vertexGroups.get(outputKey.getEntityName());
+ vertexGroup.successfulCommits++;
+ if (vertexGroup.isCommitted()) {
+ if (!commitAllOutputsOnSuccess) {
+ try {
+ appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
+ new VertexGroupCommitFinishedEvent(getID(), commitCompletedEvent.getOutputKey().getEntityName(),
+ clock.getTime())));
+ } catch (IOException e) {
+ String diag = "Failed to send commit recovery event to handler, " + ExceptionUtils.getStackTrace(e);
+ addDiagnostic(diag);
+ LOG.error(diag);
+ recoveryFailed = true;
+ }
+ }
}
}
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index a16ee0a..3a9558d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1901,7 +1901,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
vertex.trySetTerminationCause(VertexTerminationCause.RECOVERY_ERROR);
return vertex.finished(VertexState.FAILED);
}
- } else {
firstCommit = false;
}
VertexCommitCallback commitCallback = new VertexCommitCallback(vertex, outputName);
http://git-wip-us.apache.org/repos/asf/tez/blob/4a6808ce/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 0df8a4f..8fc29c2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -26,9 +26,9 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -63,7 +63,6 @@ import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
@@ -96,11 +95,13 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl.OutputKey;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.*;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.api.Event;
+import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
@@ -144,7 +145,7 @@ public class TestCommit {
private TaskHeartbeatHandler thh;
private Clock clock = new SystemClock();
private DAGFinishEventHandler dagFinishEventHandler;
- private HistoryEventHandler historyEventHandler;
+ private MockHistoryEventHandler historyEventHandler;
private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
private ExecutorService rawExecutor;
@@ -305,7 +306,7 @@ public class TestCommit {
execService = MoreExecutors.listeningDecorator(rawExecutor);
doReturn(execService).when(appContext).getExecService();
- historyEventHandler = mock(HistoryEventHandler.class);
+ historyEventHandler = new MockHistoryEventHandler(appContext);
aclManager = new ACLManager("amUser");
doReturn(conf).when(appContext).getAMConf();
doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
@@ -441,6 +442,63 @@ public class TestCommit {
return dag.createDag(conf, null, null, null, true);
}
+ // v1->v3
+ // v2->v3
+ // vertex_group (v1, v2) has 2 shared outputs
+ private DAGPlan createDAGPlanWith2VertexGroupOutputs(boolean vertexGroupCommitSucceeded1,
+ boolean vertexGroupCommitSucceeded2, boolean v3CommitSucceeded) throws Exception {
+ LOG.info("Setting up group dag plan");
+ int dummyTaskCount = 1;
+ Resource dummyTaskResource = Resource.newInstance(1, 1);
+ org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create(
+ "vertex1", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+ dummyTaskResource);
+ org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create(
+ "vertex2", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+ dummyTaskResource);
+ org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create(
+ "vertex3", ProcessorDescriptor.create("Processor"), dummyTaskCount,
+ dummyTaskResource);
+
+ DAG dag = DAG.create("testDag");
+ String groupName1 = "uv12";
+ OutputCommitterDescriptor ocd1 = OutputCommitterDescriptor.create(
+ CountingOutputCommitter.class.getName()).setUserPayload(
+ UserPayload.create(ByteBuffer
+ .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+ !vertexGroupCommitSucceeded1, true).toUserPayload())));
+ OutputCommitterDescriptor ocd2 = OutputCommitterDescriptor.create(
+ CountingOutputCommitter.class.getName()).setUserPayload(
+ UserPayload.create(ByteBuffer
+ .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+ !vertexGroupCommitSucceeded2, true).toUserPayload())));
+ OutputCommitterDescriptor ocd3 = OutputCommitterDescriptor.create(
+ CountingOutputCommitter.class.getName()).setUserPayload(
+ UserPayload.create(ByteBuffer
+ .wrap(new CountingOutputCommitter.CountingOutputCommitterConfig(
+ !v3CommitSucceeded, true).toUserPayload())));
+
+ org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1,
+ v1, v2);
+ OutputDescriptor outDesc = OutputDescriptor.create("output.class");
+ uv12.addDataSink("v12Out1", DataSinkDescriptor.create(outDesc, ocd1, null));
+ uv12.addDataSink("v12Out2", DataSinkDescriptor.create(outDesc, ocd2, null));
+ v3.addDataSink("v3Out", DataSinkDescriptor.create(outDesc, ocd3, null));
+
+ GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("dummy output class"),
+ InputDescriptor.create("dummy input class")), InputDescriptor
+ .create("merge.class"));
+
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ return dag.createDag(conf, null, null, null, true);
+ }
+
private DAGPlan createDAGPlan_SingleVertexWith2Committer(
boolean commit1Succeed, boolean commit2Succeed) throws IOException {
return createDAGPlan_SingleVertexWith2Committer(commit1Succeed, commit2Succeed, false);
@@ -493,7 +551,7 @@ public class TestCommit {
}
@Test(timeout = 5000)
- public void testVertexSucceedWithoutCommit() throws Exception {
+ public void testVertexCommit_OnDAGSuccess() throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
true);
setupDAG(createDAGPlan_SingleVertexWith2Committer(true, true));
@@ -509,6 +567,9 @@ public class TestCommit {
.getOutputCommitter("v1Out_1");
CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_2");
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+
Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
Assert.assertEquals(0, v1OutputCommitter_1.commitCounter);
@@ -544,6 +605,8 @@ public class TestCommit {
waitUntil(v1, VertexState.SUCCEEDED);
Assert.assertNull(v1.getTerminationCause());
Assert.assertTrue(v1.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
@@ -576,8 +639,11 @@ public class TestCommit {
Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
v1.getTerminationCause());
Assert.assertTrue(v1.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_2");
+
Assert.assertEquals(1, v1OutputCommitter_1.initCounter);
Assert.assertEquals(1, v1OutputCommitter_1.setupCounter);
Assert.assertEquals(1, v1OutputCommitter_1.commitCounter);
@@ -606,11 +672,13 @@ public class TestCommit {
v1OutputCommitter_1.unblockCommit();
waitForCommitCompleted(v1, "v1Out_1");
Assert.assertEquals(VertexState.COMMITTING, v1.getState());
-
+
CountingOutputCommitter v1OutputCommitter_2 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_2");
v1OutputCommitter_2.unblockCommit();
waitUntil(v1, VertexState.FAILED);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE,
v1.getTerminationCause());
@@ -647,6 +715,8 @@ public class TestCommit {
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert
.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_1");
@@ -685,6 +755,8 @@ public class TestCommit {
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
dag.getTerminationCause());
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_1");
@@ -727,6 +799,8 @@ public class TestCommit {
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
dag.getTerminationCause());
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_1");
@@ -764,6 +838,8 @@ public class TestCommit {
Assert.assertEquals(DAGState.ERROR, dag.getState());
Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR,
dag.getTerminationCause());
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
CountingOutputCommitter v1OutputCommitter_1 = (CountingOutputCommitter) v1
.getOutputCommitter("v1Out_1");
@@ -811,6 +887,20 @@ public class TestCommit {
waitUntil(dag, DAGState.SUCCEEDED);
Assert.assertTrue(dag.commitFutures.isEmpty());
Assert.assertNull(dag.getTerminationCause());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("v3", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("v3", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -857,10 +947,21 @@ public class TestCommit {
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
-
Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -899,6 +1000,8 @@ public class TestCommit {
.getOutputCommitter("v3Out");
v12OutputCommitter.unblockCommit();
waitUntil(dag, DAGState.FAILED);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
@@ -907,6 +1010,18 @@ public class TestCommit {
Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -953,6 +1068,18 @@ public class TestCommit {
v12OutputCommitter.unblockCommit();
waitUntil(dag, DAGState.SUCCEEDED);
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1001,6 +1128,18 @@ public class TestCommit {
v3OutputCommitter.unblockCommit();
waitUntil(dag, DAGState.SUCCEEDED);
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1013,6 +1152,71 @@ public class TestCommit {
Assert.assertEquals(0, v3OutputCommitter.abortCounter);
}
+ // test DAGCommitSucceeded when vertex group has multiple shared outputs
+ @Test(timeout = 5000)
+ public void testDAGCommitSucceeded3_OnVertexSuccess() throws Exception {
+ conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
+ false);
+ setupDAG(createDAGPlanWith2VertexGroupOutputs(true, true, true));
+ initDAG(dag);
+ startDAG(dag);
+ VertexImpl v1 = (VertexImpl) dag.getVertex("vertex1");
+ VertexImpl v2 = (VertexImpl) dag.getVertex("vertex2");
+ VertexImpl v3 = (VertexImpl) dag.getVertex("vertex3");
+
+ v1.handle(new VertexEventTaskCompleted(v1.getTask(0).getTaskId(),
+ TaskState.SUCCEEDED));
+ v2.handle(new VertexEventTaskCompleted(v2.getTask(0).getTaskId(),
+ TaskState.SUCCEEDED));
+ v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
+ TaskState.SUCCEEDED));
+ Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
+ Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
+ Assert.assertEquals(VertexState.COMMITTING, v3.getState());
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+ CountingOutputCommitter v12OutputCommitter1 = (CountingOutputCommitter) v1
+ .getOutputCommitter("v12Out1");
+ v12OutputCommitter1.unblockCommit();
+ CountingOutputCommitter v12OutputCommitter2 = (CountingOutputCommitter) v1
+ .getOutputCommitter("v12Out2");
+ v12OutputCommitter2.unblockCommit();
+ Assert.assertEquals(DAGState.RUNNING, dag.getState());
+
+ CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
+ .getOutputCommitter("v3Out");
+ v3OutputCommitter.unblockCommit();
+ waitUntil(dag, DAGState.SUCCEEDED);
+ Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("v1", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("v1", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
+ Assert.assertEquals(1, v12OutputCommitter1.initCounter);
+ Assert.assertEquals(1, v12OutputCommitter1.setupCounter);
+ Assert.assertEquals(1, v12OutputCommitter1.commitCounter);
+ Assert.assertEquals(0, v12OutputCommitter1.abortCounter);
+
+ Assert.assertEquals(1, v12OutputCommitter2.initCounter);
+ Assert.assertEquals(1, v12OutputCommitter2.setupCounter);
+ Assert.assertEquals(1, v12OutputCommitter2.commitCounter);
+ Assert.assertEquals(0, v12OutputCommitter2.abortCounter);
+
+ Assert.assertEquals(1, v3OutputCommitter.initCounter);
+ Assert.assertEquals(1, v3OutputCommitter.setupCounter);
+ Assert.assertEquals(1, v3OutputCommitter.commitCounter);
+ Assert.assertEquals(0, v3OutputCommitter.abortCounter);
+ }
+
// commit of vertex group(v1,v2) fail and commit of v3 is not completed
@Test(timeout = 5000)
public void testDAGCommitFail1_OnVertexSuccess() throws Exception {
@@ -1048,6 +1252,16 @@ public class TestCommit {
Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1097,6 +1311,16 @@ public class TestCommit {
Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
@@ -1152,6 +1376,16 @@ public class TestCommit {
Assert.assertEquals(DAGTerminationCause.VERTEX_FAILURE,
dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1202,6 +1436,16 @@ public class TestCommit {
Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE,
dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(1, v12OutputCommitter.initCounter);
Assert.assertEquals(1, v12OutputCommitter.setupCounter);
@@ -1215,7 +1459,7 @@ public class TestCommit {
}
@Test (timeout = 5000)
- public void testDAGInternalErrorWhileCommiting() throws Exception {
+ public void testDAGInternalErrorWhileCommiting_OnDAGSuccess() throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
true);
setupDAG(createDAGPlan(true, true));
@@ -1236,6 +1480,17 @@ public class TestCommit {
waitUntil(dag, DAGState.ERROR);
Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1282,6 +1537,16 @@ public class TestCommit {
Assert
.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
@@ -1335,6 +1600,16 @@ public class TestCommit {
Assert
.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
@@ -1383,7 +1658,16 @@ public class TestCommit {
Assert
.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
-
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
@@ -1424,6 +1708,17 @@ public class TestCommit {
Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_IN_COMMITTING, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1461,6 +1756,17 @@ public class TestCommit {
waitUntil(dag, DAGState.ERROR);
Assert.assertEquals(DAGTerminationCause.INTERNAL_ERROR, dag.getTerminationCause());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1476,14 +1782,12 @@ public class TestCommit {
Assert.assertEquals(1, v3OutputCommitter.abortCounter);
}
- @Test(timeout = 5000)
- public void testVertexGroupCommitFinishedEventFail() throws Exception {
+ @Test (timeout = 5000)
+ public void testVertexGroupCommitFinishedEventFail_OnVertexSuccess() throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
false);
setupDAG(createDAGPlan(true, true));
- MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext);
- doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler();
- mockHistoryEventHandler.failVertexGroupCommitFinishedEvent = true;
+ historyEventHandler.failVertexGroupCommitFinishedEvent = true;
initDAG(dag);
startDAG(dag);
@@ -1503,6 +1807,16 @@ public class TestCommit {
.getOutputCommitter("v3Out");
v12OutputCommitter.unblockCommit();
waitUntil(dag, DAGState.FAILED);
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE,
@@ -1524,13 +1838,11 @@ public class TestCommit {
}
@Test(timeout = 5000)
- public void testDAGCommitStartedEventFail() throws Exception {
+ public void testDAGCommitStartedEventFail_OnDAGSuccess() throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
true);
setupDAG(createDAGPlan(true, true));
- MockHistoryEventHandler mockHistoryEventHandler = new MockHistoryEventHandler(appContext);
- doReturn(mockHistoryEventHandler).when(appContext).getHistoryHandler();
- mockHistoryEventHandler.failDAGCommitStartedEvent = true;
+ historyEventHandler.failDAGCommitStartedEvent = true;
initDAG(dag);
startDAG(dag);
@@ -1547,6 +1859,17 @@ public class TestCommit {
waitUntil(dag, DAGState.FAILED);
Assert.assertEquals(DAGTerminationCause.RECOVERY_FAILURE, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 0);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
+
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
CountingOutputCommitter v3OutputCommitter = (CountingOutputCommitter) v3
@@ -1567,7 +1890,7 @@ public class TestCommit {
// test commit will be canceled no matter it is started or still in the threadpool
// ControlledThreadPoolExecutor is used for to not schedule the commits
@Test(timeout = 5000)
- public void testCommitCanceled() throws Exception {
+ public void testCommitCanceled_OnDAGSuccess() throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
true);
setupDAG(createDAGPlan(true, true));
@@ -1598,6 +1921,16 @@ public class TestCommit {
Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
// mean the commits have been canceled
Assert.assertTrue(dag.commitFutures.isEmpty());
+ historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
+ historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
+ historyEventHandler.verifyVertexCommitStartedEvent(v1.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v1.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v2.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v2.getVertexId(), 1);
+ historyEventHandler.verifyVertexCommitStartedEvent(v3.getVertexId(), 0);
+ historyEventHandler.verifyVertexFinishedEvent(v3.getVertexId(), 1);
+ historyEventHandler.verifyDAGCommitStartedEvent(dag.getID(), 1);
+ historyEventHandler.verifyDAGFinishedEvent(dag.getID(), 1);
CountingOutputCommitter v12OutputCommitter = (CountingOutputCommitter) v1
.getOutputCommitter("v12Out");
@@ -1635,7 +1968,7 @@ public class TestCommit {
public boolean failVertexGroupCommitFinishedEvent = false;
public boolean failDAGCommitStartedEvent = false;
-
+ public List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
public MockHistoryEventHandler(AppContext context) {
super(context);
}
@@ -1650,6 +1983,85 @@ public class TestCommit {
&& failDAGCommitStartedEvent) {
throw new IOException("fail DAGCommitStartedEvent");
}
+ historyEvents.add(event.getHistoryEvent());
+ }
+
+ public void verifyVertexGroupCommitStartedEvent(String groupName, int expectedTimes) {
+ int actualTimes = 0;
+ for (HistoryEvent event : historyEvents) {
+ if (event.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_STARTED) {
+ VertexGroupCommitStartedEvent startedEvent = (VertexGroupCommitStartedEvent)event;
+ if (startedEvent.getVertexGroupName().equals(groupName)) {
+ actualTimes ++;
+ }
+ }
+ }
+ Assert.assertEquals(expectedTimes, actualTimes);
+ }
+
+ public void verifyVertexGroupCommitFinishedEvent(String groupName, int expectedTimes) {
+ int actualTimes = 0;
+ for (HistoryEvent event : historyEvents) {
+ if (event.getEventType() == HistoryEventType.VERTEX_GROUP_COMMIT_FINISHED) {
+ VertexGroupCommitFinishedEvent finishedEvent = (VertexGroupCommitFinishedEvent)event;
+ if (finishedEvent.getVertexGroupName().equals(groupName)) {
+ actualTimes ++;
+ }
+ }
+ }
+ Assert.assertEquals(expectedTimes, actualTimes);
+ }
+
+ public void verifyVertexCommitStartedEvent(TezVertexID vertexId, int expectedTimes) {
+ int actualTimes = 0;
+ for (HistoryEvent event : historyEvents) {
+ if (event.getEventType() == HistoryEventType.VERTEX_COMMIT_STARTED) {
+ VertexCommitStartedEvent startedEvent = (VertexCommitStartedEvent)event;
+ if (startedEvent.getVertexID().equals(vertexId)) {
+ actualTimes ++;
+ }
+ }
+ }
+ Assert.assertEquals(expectedTimes, actualTimes);
+ }
+
+ public void verifyVertexFinishedEvent(TezVertexID vertexId, int expectedTimes) {
+ int actualTimes = 0;
+ for (HistoryEvent event : historyEvents) {
+ if (event.getEventType() == HistoryEventType.VERTEX_FINISHED) {
+ VertexFinishedEvent finishedEvent = (VertexFinishedEvent)event;
+ if (finishedEvent.getVertexID().equals(vertexId)) {
+ actualTimes ++;
+ }
+ }
+ }
+ Assert.assertEquals(expectedTimes, actualTimes);
+ }
+
+ public void verifyDAGCommitStartedEvent(TezDAGID dagId, int expectedTimes) {
+ int actualTimes = 0;
+ for (HistoryEvent event : historyEvents) {
+ if (event.getEventType() == HistoryEventType.DAG_COMMIT_STARTED) {
+ DAGCommitStartedEvent startedEvent = (DAGCommitStartedEvent)event;
+ if (startedEvent.getDagID().equals(dagId)) {
+ actualTimes ++;
+ }
+ }
+ }
+ Assert.assertEquals(expectedTimes, actualTimes);
+ }
+
+ public void verifyDAGFinishedEvent(TezDAGID dagId, int expectedTimes) {
+ int actualTimes = 0;
+ for (HistoryEvent event : historyEvents) {
+ if (event.getEventType() == HistoryEventType.DAG_FINISHED) {
+ DAGFinishedEvent startedEvent = (DAGFinishedEvent)event;
+ if (startedEvent.getDagID().equals(dagId)) {
+ actualTimes ++;
+ }
+ }
+ }
+ Assert.assertEquals(expectedTimes, actualTimes);
}
}