You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/02/13 20:20:19 UTC

tez git commit: TEZ-2093. Add events to MockDAGAppMaster and add e2e test for event routing (bikas)

Repository: tez
Updated Branches:
  refs/heads/master ada11cc4d -> 48fd4d6b5


TEZ-2093. Add events to MockDAGAppMaster and add e2e test for event routing (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/48fd4d6b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/48fd4d6b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/48fd4d6b

Branch: refs/heads/master
Commit: 48fd4d6b5bca8782821e70ba0689ad43ca27e442
Parents: ada11cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Feb 13 11:20:05 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Feb 13 11:20:05 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  3 -
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  7 ++
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 26 ++++++-
 .../tez/dag/app/TestMockDAGAppMaster.java       | 82 ++++++++++++++++++++
 5 files changed, 115 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9f33a85..d316290 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2093. Add events to MockDAGAppMaster and add e2e test for event routing
   TEZ-2075. Incompatible issue caused by TEZ-1233 that TezConfiguration.TEZ_SITE_XML is made private
   TEZ-2082. Race condition in TaskAttemptListenerImpTezDag.getTask()
   TEZ-1233. Allow configuration of framework parameters per vertex.

http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index d8e24c0..4cb5e99 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -62,7 +62,6 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.apache.tez.common.security.JobTokenSecretManager;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 @SuppressWarnings("unchecked")
@@ -223,8 +222,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           LOG.info("Container with id: " + containerId
               + " is valid, but no longer registered, and will be killed. Race condition.");          
         } else {
-          Preconditions.checkState(task != null && task != TASK_FOR_INVALID_JVM, "CId: "
-              + containerId);
           context.getEventHandler().handle(
               new TaskAttemptEventStartedRemotely(task.getTaskSpec()
                   .getTaskAttemptID(), containerId, context

http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 3f9e2cf..125eeed 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -37,6 +37,7 @@ import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -1443,6 +1444,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       this.writeLock.unlock();
     }
   }
+  
+  @Private
+  @VisibleForTesting
+  public List<TezEvent> getTaskEvents() {
+    return tezEventsForTaskAttempts;
+  }
 
   private static class KillTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {

http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index d34532b..04a47c6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -54,9 +53,14 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
@@ -69,6 +73,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   MockContainerLauncher containerLauncher;
   boolean initFailFlag;
   boolean startFailFlag;
+  boolean sendDMEvents;
 
   // mock container launcher does not launch real tasks.
   // Upon, launch of a container is simulates the container asking for tasks
@@ -99,6 +104,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       ContainerId cId;
       TezTaskAttemptID taId;
       String vName;
+      TaskSpec taskSpec;
       ContainerLaunchContext launchContext;
       int numUpdates = 0;
       boolean completed;
@@ -111,6 +117,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       void clear() {
         taId = null;
         vName = null;
+        taskSpec = null;
         completed = false;
         launchContext = null;
       }
@@ -215,7 +222,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
     @Override
     public void run() {
       // wait for test to sync with us and get a reference to us. Go when sync is done
+      LOG.info("Waiting to go");
       waitToGo();
+      LOG.info("Signal to go");
       while(true) {
         if (!startScheduling.get()) { // schedule when asked to do so by the test code
           continue;
@@ -236,6 +245,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
               } else {
                 cData.taId = cTask.getTaskSpec().getTaskAttemptID();
                 cData.vName = cTask.getTaskSpec().getVertexName();
+                cData.taskSpec = cTask.getTaskSpec();
               }
             } catch (IOException e) {
               e.printStackTrace();
@@ -261,6 +271,20 @@ public class MockDAGAppMaster extends DAGAppMaster {
               // send a done notification
               TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
               cData.completed = true;
+              if (sendDMEvents) {
+                Event event = null;
+                for (OutputSpec output : cData.taskSpec.getOutputs()) {
+                  if (output.getPhysicalEdgeCount() == 1) {
+                    event = DataMovementEvent.create(0, null);
+                  } else {
+                    event = CompositeDataMovementEvent.create(0, output.getPhysicalEdgeCount(), null);
+                  }
+                  getContext().getEventHandler().handle(
+                      new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
+                          event, new EventMetaData(EventProducerConsumerType.OUTPUT, cData.vName,
+                              output.getDestinationVertexName(), cData.taId)))));
+                }
+              }
               getContext().getEventHandler().handle(
                   new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent(
                       new TaskAttemptCompletedEvent(), new EventMetaData(

http://git-wip-us.apache.org/repos/asf/tez/blob/48fd4d6b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index a46821a..f4734d5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -31,9 +32,16 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -43,9 +51,13 @@ import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.app.dag.impl.TaskImpl;
+import org.apache.tez.dag.app.dag.impl.VertexImpl;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -140,6 +152,76 @@ public class TestMockDAGAppMaster {
     tezClient.stop();
   }
   
+  @Test (timeout = 5000)
+  public void testBasicEvents() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
+    tezClient.start();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.sendDMEvents = true;
+    DAG dag = DAG.create("test");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
+    Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
+    Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 2);
+    Vertex vD = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 2);
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addVertex(vC)
+        .addVertex(vD)
+        .addEdge(
+            Edge.create(vA, vB, EdgeProperty.create(DataMovementType.BROADCAST,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+        .addEdge(
+            Edge.create(vA, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
+        .addEdge(
+            Edge.create(vA, vD, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vB.getName());
+    TaskImpl tImpl = (TaskImpl) vImpl.getTask(1);
+    List<TezEvent> tEvents = tImpl.getTaskEvents();
+    Assert.assertEquals(2, tEvents.size()); // 2 from vA
+    Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
+    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
+    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
+    Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
+    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
+    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+    vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
+    tImpl = (TaskImpl) vImpl.getTask(1);
+    tEvents = tImpl.getTaskEvents();
+    Assert.assertEquals(2, tEvents.size()); // 2 from vA
+    Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
+    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
+    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
+    Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
+    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex());
+    Assert.assertEquals(1, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
+    vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
+    tImpl = (TaskImpl) vImpl.getTask(1);
+    tEvents = tImpl.getTaskEvents();
+    Assert.assertEquals(1, tEvents.size()); // 1 from vA
+    Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
+    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
+    Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
+
+    tezClient.stop();
+  }
+  
   @Test (timeout = 10000)
   public void testMultipleSubmissions() throws Exception {
     Map<String, LocalResource> lrDAG = Maps.newHashMap();