You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/01 01:23:55 UTC

git commit: TEZ-784. Add TestDriver to allow cmd line submission of tests to a cluster (bikas)

Updated Branches:
  refs/heads/master f9094d469 -> 00aecc826


TEZ-784. Add TestDriver to allow cmd line submission of tests to a cluster (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/00aecc82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/00aecc82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/00aecc82

Branch: refs/heads/master
Commit: 00aecc826c6a2c5050fee085d579fa7127db099c
Parents: f9094d4
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 31 16:22:55 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 31 16:23:23 2014 -0800

----------------------------------------------------------------------
 tez-tests/pom.xml                               |  11 ++
 .../tez/test/FaultToleranceTestRunner.java      | 164 +++++++++++++++++
 .../java/org/apache/tez/test/SimpleTestDAG.java |  61 +++++++
 .../java/org/apache/tez/test/TestDriver.java    |  39 ++++
 .../org/apache/tez/test/TestFaultTolerance.java | 183 +++++--------------
 .../java/org/apache/tez/test/TestInput.java     |  38 ++--
 .../java/org/apache/tez/test/TestOutput.java    |   6 +
 .../java/org/apache/tez/test/TestProcessor.java |  30 +--
 8 files changed, 363 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
index 7941da1..ea4ec49 100644
--- a/tez-tests/pom.xml
+++ b/tez-tests/pom.xml
@@ -84,6 +84,17 @@
   <build>
     <plugins>
       <plugin>
+		    <groupId>org.apache.maven.plugins</groupId>
+		     <artifactId>maven-jar-plugin</artifactId>
+		      <configuration>
+		       <archive>
+		         <manifest>
+		           <mainClass>org.apache.tez.test.TestDriver</mainClass>
+		         </manifest>
+		       </archive>
+		     </configuration>
+		    </plugin>
+      <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java b/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
new file mode 100644
index 0000000..fb12352
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/FaultToleranceTestRunner.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+/**
+ * Run a DAG on a cluster with the given configuration. Starts a TezSession
+ * using default cluster configuration from installation. Then uses reflection
+ * to get the class from the first class-name argument. That class must have a
+ * static method - createDAG(org.apache.hadoop.conf.Configuration) that returns
+ * a DAG. Configuration is picked up by reading the file specified via the
+ * second path argument. The static method is invoked to get the DAG. The DAG is
+ * then executed in the session. Returns success if DAG succeeds.
+ */
+public class FaultToleranceTestRunner {
+  
+  static String TEST_ROOT_DIR = "tmp";
+  
+  TezSession tezSession = null;
+  Resource defaultResource = Resource.newInstance(100, 0);
+  
+  
+  void setup() throws Exception {
+    TezConfiguration tezConf = new TezConfiguration(new YarnConfiguration());
+    FileSystem defaultFs = FileSystem.get(tezConf);
+    
+    Path remoteStagingDir = defaultFs.makeQualified(new Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(tezConf, remoteStagingDir);
+    
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+
+    AMConfiguration amConfig = new AMConfiguration(
+        new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    tezSession = new TezSession("FaultToleranceTestRunner", tezSessionConfig);
+    tezSession.start();
+  }
+  
+  void tearDown() throws Exception {
+    if (tezSession != null) {
+      tezSession.stop();
+    }
+  }
+  
+  DAG getDAG(String className, String confFilePath) throws Exception {
+    Class<?> clazz = Class.forName(className);
+    Method method = clazz.getMethod("createDAG", Configuration.class);
+    
+    Configuration testConf = new Configuration(false);
+    if (confFilePath != null) {
+      Path confPath = new Path(confFilePath);
+      testConf.addResource(confPath);
+    }
+    
+    DAG dag = (DAG) method.invoke(null, testConf);
+    
+    return dag;
+  }
+  
+  boolean run(String className, String confFilePath) throws Exception {
+    setup();
+    
+    try {
+      TezSessionStatus status = tezSession.getSessionStatus();
+      while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN) {
+        System.out.println("Waiting for session to be ready. Current: " + status);
+        Thread.sleep(500);
+        status = tezSession.getSessionStatus();
+      }
+      if (status == TezSessionStatus.SHUTDOWN) {
+        throw new TezUncheckedException("Unexpected Session shutdown");
+      }
+      
+      DAG dag = getDAG(className, confFilePath);
+      
+      DAGClient dagClient = tezSession.submitDAG(dag);
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        System.out.println("Waiting for dag to complete. Sleeping for 500ms."
+            + " DAG name: " + dag.getName()
+            + " DAG appId: " + dagClient.getApplicationId()
+            + " Current state: " + dagStatus.getState());
+        Thread.sleep(500);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      
+      if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+        return true;
+      }
+      
+    } finally {
+      tearDown();
+    }
+    
+    return false;
+  }
+  
+  static void printUsage() {
+    System.err.println(
+        "Usage: " + " FaultToleranceTestRunner <dag-class-name> <test-conf-path>");
+  }
+  
+  public static void main(String[] args) throws Exception {
+    String className = null;
+    String confFilePath = null;
+    if (args.length == 1) {
+      className = args[0];
+    } else if (args.length == 2) {
+      className = args[0];
+      confFilePath = args[1];
+    } else {
+      printUsage();
+      System.exit(1);
+    }
+    
+    FaultToleranceTestRunner job = new FaultToleranceTestRunner();
+    if (job.run(className, confFilePath)) {
+      System.out.println("Succeeded.");
+    } else {
+      System.out.println("Failed.");
+      System.exit(2);
+    } 
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
new file mode 100644
index 0000000..85c5d8b
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+
+public class SimpleTestDAG {
+  static Resource defaultResource = Resource.newInstance(100, 0);
+  public static String TEZ_SIMPLE_DAG_NUM_TASKS =
+      "tez.simple-test-dag.num-tasks";
+  public static int TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT = 2;
+  
+  public static DAG createDAG(String name, 
+      Configuration conf) throws Exception {
+    byte[] payload = null;
+    int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
+    if (conf != null) {
+      taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);
+      payload = TezUtils.createUserPayloadFromConf(conf);
+    }
+    DAG dag = new DAG(name);
+    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            TestOutput.getOutputDesc(payload), 
+            TestInput.getInputDesc(payload))));
+    return dag;
+  }
+  
+  public static DAG createDAG(Configuration conf) throws Exception {
+    return createDAG("SimpleTestDAG", conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java b/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
new file mode 100644
index 0000000..bf04fd5
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.hadoop.util.ProgramDriver;
+
+public class TestDriver {
+
+  public static void main(String argv[]){
+    int exitCode = -1;
+    ProgramDriver pgd = new ProgramDriver();
+    try {
+      pgd.addClass("FaultToleranceTestRunner", FaultToleranceTestRunner.class,
+          "Run different DAGs for fault tolerance testing");
+      exitCode = pgd.run(argv);
+    }
+    catch(Throwable e){
+      e.printStackTrace();
+    }
+
+    System.exit(exitCode);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 3a43eca..0ce2bed 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
@@ -40,15 +39,11 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -63,7 +58,6 @@ public class TestFaultTolerance {
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
       + TestFaultTolerance.class.getName() + "-tmpDir";
   
-  private static Resource defaultResource = Resource.newInstance(100, 0);
   private static TezSession tezSession = null;
   
   @BeforeClass
@@ -135,226 +129,133 @@ public class TestFaultTolerance {
     Assert.assertEquals(finalState, dagStatus.getState());
   }
   
-  ProcessorDescriptor getProcDesc(byte[] payload) {
-    return new ProcessorDescriptor(TestProcessor.class.getName()).
-        setUserPayload(payload);
-  }
-  
-  InputDescriptor getInputDesc(byte[] payload) {
-    return new InputDescriptor(TestInput.class.getName()).
-        setUserPayload(payload);
-  }
-  
-  OutputDescriptor getOutputDesc(byte[] payload) {
-    return new OutputDescriptor(TestOutput.class.getName()).
-        setUserPayload(payload);
-  }
-  
   @Test (timeout=60000)
   public void testBasicSuccessScatterGather() throws Exception {
-    DAG dag = new DAG("testBasicSuccessScatterGather");
-    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(null))));
+    DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", null);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
   @Test (timeout=60000)
   public void testBasicSuccessBroadcast() throws Exception {
     DAG dag = new DAG("testBasicSuccessBroadcast");
-    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
+    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
+    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(null), 2, SimpleTestDAG.defaultResource);
     dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
         new EdgeProperty(DataMovementType.BROADCAST, 
             DataSourceType.PERSISTED, 
             SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(null))));
+            TestOutput.getOutputDesc(null), 
+            TestInput.getInputDesc(null))));
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
   @Test (timeout=60000)
   public void testBasicTaskFailure() throws Exception {
-    Configuration testConf = new Configuration();
+    Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
     testConf.set(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
     testConf.setInt(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
-    
-    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
     
-    DAG dag = new DAG("testBasicTaskFailure");
-    Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(null))));
+    DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
   @Test (timeout=60000)
   public void testTaskMultipleFailures() throws Exception {
-    Configuration testConf = new Configuration();
+    Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
     testConf.set(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0,1");
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0,1");
     testConf.setInt(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
     
-    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
-    
-    DAG dag = new DAG("testTaskMultipleFailures");
-    Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(null))));
+    DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
   @Test (timeout=60000)
   public void testTaskMultipleFailuresDAGFail() throws Exception {
-    Configuration testConf = new Configuration();
+    Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
     testConf.set(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "0");
     testConf.setInt(TestProcessor.getVertexConfName(
-        TestProcessor.TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), -1);
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), -1);
     
-    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
-    
-    DAG dag = new DAG("testTaskMultipleFailuresDAGFail");
-    Vertex v1 = new Vertex("v1", getProcDesc(payload), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(payload), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(null))));
+    DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailuresDAGFail", testConf);
     runDAGAndVerify(dag, DAGStatus.State.FAILED);
   }
   
   @Test (timeout=60000)
   public void testBasicInputFailureWithExit() throws Exception {
-    Configuration testConf = new Configuration();
+    Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
     testConf.setBoolean(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), true);
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
-    
-    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
     
-    DAG dag = new DAG("testBasicInputFailureWithExit");
-    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(payload))));
+    DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
   @Test (timeout=60000)
   public void testBasicInputFailureWithoutExit() throws Exception {
-    Configuration testConf = new Configuration();
+    Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
-    
-    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
     
-    DAG dag = new DAG("testBasicInputFailureWithoutExit");
-    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(payload))));
+    DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
   @Test (timeout=60000)
   public void testMultipleInputFailureWithoutExit() throws Exception {
-    Configuration testConf = new Configuration();
+    Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0,1");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
-    
-    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
     
-    DAG dag = new DAG("testMultipleInputFailureWithoutExit");
-    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(payload))));
+    DAG dag = SimpleTestDAG.createDAG("testMultipleInputFailureWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
   @Test (timeout=60000)
   public void testMultiVersionInputFailureWithoutExit() throws Exception {
-    Configuration testConf = new Configuration();
+    Configuration testConf = new Configuration(false);
     testConf.setBoolean(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_DO_FAIL, "v2"), true);
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "1");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
     testConf.set(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
     testConf.setInt(TestInput.getVertexConfName(
-        TestInput.TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
     
-    byte[] payload = MRHelpers.createUserPayloadFromConf(testConf);
-    
-    DAG dag = new DAG("testMultiVersionInputFailureWithoutExit");
-    Vertex v1 = new Vertex("v1", getProcDesc(null), 2, defaultResource);
-    Vertex v2 = new Vertex("v2", getProcDesc(null), 2, defaultResource);
-    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, 
-            SchedulingType.SEQUENTIAL, 
-            getOutputDesc(null), 
-            getInputDesc(payload))));
+    DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 5d0ac71..b5decb5 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -72,41 +73,46 @@ public class TestInput implements LogicalInput {
   /**
    * Enable failure for this logical input
    */
-  public static String TEZ_AM_FAILING_INPUT_DO_FAIL =
-      "tez.am.failing-input.do-fail";
+  public static String TEZ_FAILING_INPUT_DO_FAIL =
+      "tez.failing-input.do-fail";
   /**
    * Logical input will exit (and cause task failure) after reporting failure to 
    * read.
    */
-  public static String TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT =
-      "tez.am.failing-input.do-fail-and-exit";
+  public static String TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT =
+      "tez.failing-input.do-fail-and-exit";
   /**
    * Which physical inputs to fail. This is a comma separated list of +ve integers.
    * -1 means fail all.
    */
-  public static String TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX =
-      "tez.am.failing-input.failing-input-index";
+  public static String TEZ_FAILING_INPUT_FAILING_INPUT_INDEX =
+      "tez.failing-input.failing-input-index";
   /**
    * Up to which version of the above physical inputs to fail. 0 will fail the 
    * first version. 1 will fail the first and second versions. And so on.
    */
-  public static String TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT =
-      "tez.am.failing-input.failing-upto-input-attempt";
+  public static String TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT =
+      "tez.failing-input.failing-upto-input-attempt";
   /**
    * Indices of the tasks in the first for which this input will fail. Comma 
    * separated list of +ve integers. -1 means all tasks. E.g. 0 means the first
    * task in the vertex will have failing inputs.
    */
   public static String TEZ_AM_FAILING_INPUT_FAILING_TASK_INDEX =
-      "tez.am.failing-input.failing-task-index";
+      "tez.failing-input.failing-task-index";
   /**
    * Which task attempts will fail the input. This is a comma separated list of
    * +ve integers. -1 means all will fail. E.g. specifying 1 means the first
    * attempt will not fail the input but a re-run (the second attempt) will
    * trigger input failure. So this can be used to simulate cascading failures.
    */
-  public static String TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT =
-      "tez.am.failing-input.failing-task-attempt";
+  public static String TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT =
+      "tez.failing-input.failing-task-attempt";
+  
+  public static InputDescriptor getInputDesc(byte[] payload) {
+    return new InputDescriptor(TestInput.class.getName()).
+        setUserPayload(payload);
+  }
 
   public int doRead() {
     boolean done = true;
@@ -185,9 +191,9 @@ public class TestInput implements LogicalInput {
     if (inputContext.getUserPayload() != null) {
       String vName = inputContext.getTaskVertexName();
       conf = MRHelpers.createConfFromUserPayload(inputContext.getUserPayload());
-      doFail = conf.getBoolean(getVertexConfName(TEZ_AM_FAILING_INPUT_DO_FAIL, vName), false);
+      doFail = conf.getBoolean(getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL, vName), false);
       doFailAndExit = conf.getBoolean(
-          getVertexConfName(TEZ_AM_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
+          getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
       LOG.info("doFail: " + doFail + " doFailAndExit: " + doFailAndExit);
       if (doFail) {
         for (String failingIndex : 
@@ -198,16 +204,16 @@ public class TestInput implements LogicalInput {
         }
         for (String failingIndex : 
           conf.getTrimmedStringCollection(
-              getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_TASK_ATTEMPT, vName))) {
+              getVertexConfName(TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, vName))) {
           LOG.info("Adding failing task attempt: " + failingIndex);
           failingTaskAttempts.add(Integer.valueOf(failingIndex));
         }
         failingInputUpto = conf.getInt(
-            getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, vName), 0);
+            getVertexConfName(TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, vName), 0);
         LOG.info("Adding failing input upto: " + failingInputUpto);
         for (String failingIndex : 
           conf.getTrimmedStringCollection(
-              getVertexConfName(TEZ_AM_FAILING_INPUT_FAILING_INPUT_INDEX, vName))) {
+              getVertexConfName(TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, vName))) {
           LOG.info("Adding failing input index: " + failingIndex);
           failingInputIndices.add(Integer.valueOf(failingIndex));
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index bb4f7b5..40e56f1 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezOutputContext;
@@ -33,6 +34,11 @@ import com.google.common.collect.Lists;
 public class TestOutput implements LogicalOutput {
   private static final Log LOG = LogFactory.getLog(TestOutput.class);
   
+  public static OutputDescriptor getOutputDesc(byte[] payload) {
+    return new OutputDescriptor(TestOutput.class.getName()).
+        setUserPayload(payload);
+  }
+  
   int numOutputs;
   TezOutputContext outputContext;
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00aecc82/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index d539585..c314042 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
@@ -59,25 +60,30 @@ public class TestProcessor implements LogicalIOProcessor {
   /**
    * Enable failure for this processor
    */
-  public static String TEZ_AM_FAILING_PROCESSOR_DO_FAIL =
-      "tez.am.failing-processor.do-fail";
+  public static String TEZ_FAILING_PROCESSOR_DO_FAIL =
+      "tez.failing-processor.do-fail";
   /**
    * Time to sleep in the processor in milliseconds.
    */
-  public static String TEZ_AM_FAILING_PROCESSOR_SLEEP_MS =
-      "tez.am.failing-processor.sleep-ms";
+  public static String TEZ_FAILING_PROCESSOR_SLEEP_MS =
+      "tez.failing-processor.sleep-ms";
   /**
    * The indices of tasks in the vertex for which the processor will fail. This 
    * is a comma-separated list of +ve integeres. -1 means all fail.
    */
-  public static String TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX =
-      "tez.am.failing-processor.failing-task-index";
+  public static String TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX =
+      "tez.failing-processor.failing-task-index";
   /**
    * Up to which attempt of the tasks will fail. Specifying 0 means the first
    * attempt will fail. 1 means first and second attempt will fail. And so on.
    */
-  public static String TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT =
-      "tez.am.failing-processor.failing-upto-task-attempt";
+  public static String TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT =
+      "tez.failing-processor.failing-upto-task-attempt";
+  
+  public static ProcessorDescriptor getProcDesc(byte[] payload) {
+    return new ProcessorDescriptor(TestProcessor.class.getName()).
+        setUserPayload(payload);
+  }
 
   void throwException(String msg) {
     RuntimeException e = new RuntimeException(msg);
@@ -97,19 +103,19 @@ public class TestProcessor implements LogicalIOProcessor {
       conf = MRHelpers.createConfFromUserPayload(processorContext
           .getUserPayload());
       doFail = conf.getBoolean(
-          getVertexConfName(TEZ_AM_FAILING_PROCESSOR_DO_FAIL, vName), false);
+          getVertexConfName(TEZ_FAILING_PROCESSOR_DO_FAIL, vName), false);
       sleepMs = conf.getLong(
-          getVertexConfName(TEZ_AM_FAILING_PROCESSOR_SLEEP_MS, vName), 0);
+          getVertexConfName(TEZ_FAILING_PROCESSOR_SLEEP_MS, vName), 0);
       LOG.info("doFail: " + doFail);
       if (doFail) {
         for (String failingIndex : conf
             .getTrimmedStringCollection(
-                getVertexConfName(TEZ_AM_FAILING_PROCESSOR_FAILING_TASK_INDEX, vName))) {
+                getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, vName))) {
           LOG.info("Adding failing task index: " + failingIndex);
           failingTaskIndices.add(Integer.valueOf(failingIndex));
         }
         failingTaskAttemptUpto = conf.getInt(
-            getVertexConfName(TEZ_AM_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0);
+            getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0);
         LOG.info("Adding failing attempt : " + failingTaskAttemptUpto + 
             " dag: " + processorContext.getDAGName());
       }