You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/13 20:20:19 UTC
tez git commit: TEZ-2093. Add events to MockDAGAppMaster and add e2e
test for event routing (bikas)
Repository: tez
Updated Branches:
refs/heads/master ada11cc4d -> 48fd4d6b5
TEZ-2093. Add events to MockDAGAppMaster and add e2e test for event routing (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/48fd4d6b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/48fd4d6b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/48fd4d6b
Branch: refs/heads/master
Commit: 48fd4d6b5bca8782821e70ba0689ad43ca27e442
Parents: ada11cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Feb 13 11:20:05 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Feb 13 11:20:05 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dag/app/TaskAttemptListenerImpTezDag.java | 3 -
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 7 ++
.../apache/tez/dag/app/MockDAGAppMaster.java | 26 ++++++-
.../tez/dag/app/TestMockDAGAppMaster.java | 82 ++++++++++++++++++++
5 files changed, 115 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9f33a85..d316290 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2093. Add events to MockDAGAppMaster and add e2e test for event routing
TEZ-2075. Incompatible issue caused by TEZ-1233 that TezConfiguration.TEZ_SITE_XML is made private
TEZ-2082. Race condition in TaskAttemptListenerImpTezDag.getTask()
TEZ-1233. Allow configuration of framework parameters per vertex.
http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d8e24c0..4cb5e99 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -62,7 +62,6 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.common.security.JobTokenSecretManager;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@SuppressWarnings("unchecked")
@@ -223,8 +222,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
LOG.info("Container with id: " + containerId
+ " is valid, but no longer registered, and will be killed. Race condition.");
} else {
- Preconditions.checkState(task != null && task != TASK_FOR_INVALID_JVM, "CId: "
- + containerId);
context.getEventHandler().handle(
new TaskAttemptEventStartedRemotely(task.getTaskSpec()
.getTaskAttemptID(), containerId, context
http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 3f9e2cf..125eeed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -1443,6 +1444,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
this.writeLock.unlock();
}
}
+
+ @Private
+ @VisibleForTesting
+ public List<TezEvent> getTaskEvents() {
+ return tezEventsForTaskAttempts;
+ }
private static class KillTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index d34532b..04a47c6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -54,9 +53,14 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
@@ -69,6 +73,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
MockContainerLauncher containerLauncher;
boolean initFailFlag;
boolean startFailFlag;
+ boolean sendDMEvents;
// mock container launcher does not launch real tasks.
// Upon, launch of a container is simulates the container asking for tasks
@@ -99,6 +104,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
ContainerId cId;
TezTaskAttemptID taId;
String vName;
+ TaskSpec taskSpec;
ContainerLaunchContext launchContext;
int numUpdates = 0;
boolean completed;
@@ -111,6 +117,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
void clear() {
taId = null;
vName = null;
+ taskSpec = null;
completed = false;
launchContext = null;
}
@@ -215,7 +222,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
@Override
public void run() {
// wait for test to sync with us and get a reference to us. Go when sync is done
+ LOG.info("Waiting to go");
waitToGo();
+ LOG.info("Signal to go");
while(true) {
if (!startScheduling.get()) { // schedule when asked to do so by the test code
continue;
@@ -236,6 +245,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
} else {
cData.taId = cTask.getTaskSpec().getTaskAttemptID();
cData.vName = cTask.getTaskSpec().getVertexName();
+ cData.taskSpec = cTask.getTaskSpec();
}
} catch (IOException e) {
e.printStackTrace();
@@ -261,6 +271,20 @@ public class MockDAGAppMaster extends DAGAppMaster {
// send a done notification
TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
cData.completed = true;
+ if (sendDMEvents) {
+ Event event = null;
+ for (OutputSpec output : cData.taskSpec.getOutputs()) {
+ if (output.getPhysicalEdgeCount() == 1) {
+ event = DataMovementEvent.create(0, null);
+ } else {
+ event = CompositeDataMovementEvent.create(0, output.getPhysicalEdgeCount(), null);
+ }
+ getContext().getEventHandler().handle(
+ new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
+ event, new EventMetaData(EventProducerConsumerType.OUTPUT, cData.vName,
+ output.getDestinationVertexName(), cData.taId)))));
+ }
+ }
getContext().getEventHandler().handle(
new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
new TaskAttemptCompletedEvent(), new EventMetaData(
http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index a46821a..f4734d5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,9 +32,16 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -43,9 +51,13 @@ import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.TaskImpl;
+import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
import org.junit.Test;
@@ -140,6 +152,76 @@ public class TestMockDAGAppMaster {
tezClient.stop();
}
+ @Test (timeout = 5000)
+ public void testBasicEvents() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+ mockApp.sendDMEvents = true;
+ DAG dag = DAG.create("test");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
+ Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
+ Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 2);
+ Vertex vD = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 2);
+ dag.addVertex(vA)
+ .addVertex(vB)
+ .addVertex(vC)
+ .addVertex(vD)
+ .addEdge(
+ Edge.create(vA, vB, EdgeProperty.create(DataMovementType.BROADCAST,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+ .addEdge(
+ Edge.create(vA, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+ .addEdge(
+ Edge.create(vA, vD, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ mockLauncher.waitTillContainersLaunched();
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vB.getName());
+ TaskImpl tImpl = (TaskImpl) vImpl.getTask(1);
+ List<TezEvent> tEvents = tImpl.getTaskEvents();
+ Assert.assertEquals(2, tEvents.size()); // 2 from vA
+ Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
+ Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
+ Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
+ Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
+ Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
+ Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+ vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
+ tImpl = (TaskImpl) vImpl.getTask(1);
+ tEvents = tImpl.getTaskEvents();
+ Assert.assertEquals(2, tEvents.size()); // 2 from vA
+ Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
+ Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
+ Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
+ Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
+ Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
+ Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+ vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
+ tImpl = (TaskImpl) vImpl.getTask(1);
+ tEvents = tImpl.getTaskEvents();
+ Assert.assertEquals(1, tEvents.size()); // 1 from vA
+ Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
+ Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
+ Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
+
+ tezClient.stop();
+ }
+
@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
Map<String, LocalResource> lrDAG = Maps.newHashMap();