You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/05 22:15:15 UTC

git commit: TEZ-783. Add standard DAGs using failing processors/inputs for test purpose (Tassapol Athiapinya via bikas)

Updated Branches:
  refs/heads/master c9cef2bf8 -> bd5624179


TEZ-783. Add standard DAGs using failing processors/inputs for test purpose (Tassapol Athiapinya via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bd562417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bd562417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bd562417

Branch: refs/heads/master
Commit: bd562417916598fb9d78a3508d945aca0e24d2e0
Parents: c9cef2b
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 5 13:15:07 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 5 13:15:07 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/test/TestFaultTolerance.java |  50 +++++++++
 .../tez/test/dag/SixLevelsFailingDAG.java       |  84 ++++++++++++++
 .../tez/test/dag/ThreeLevelsFailingDAG.java     |  65 +++++++++++
 .../tez/test/dag/TwoLevelsFailingDAG.java       | 112 +++++++++++++++++++
 4 files changed, 311 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index b56365e..8b24264 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -44,6 +44,9 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.test.dag.SixLevelsFailingDAG;
+import org.apache.tez.test.dag.ThreeLevelsFailingDAG;
+import org.apache.tez.test.dag.TwoLevelsFailingDAG;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -258,5 +261,52 @@ public class TestFaultTolerance {
     DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
+  
+  @Test (timeout=60000)
+  public void testTwoLevelsFailingDAGSuccess() throws Exception {
+    Configuration testConf = new Configuration();
+    DAG dag = TwoLevelsFailingDAG.createDAG("testTwoLevelsFailingDAGSuccess", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testThreeLevelsFailingDAGSuccess() throws Exception {
+    Configuration testConf = new Configuration();
+    DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAGSuccess", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testSixLevelsFailingDAGSuccess() throws Exception {
+    Configuration testConf = new Configuration();
+    DAG dag = SixLevelsFailingDAG.createDAG("testSixLevelsFailingDAGSuccess", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds() throws Exception {
+    Configuration testConf = new Configuration();
+    //set maximum number of task attempts to 4
+    testConf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS, 4);
+    //l2v1 failure
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "l2v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "l2v1"), "1");
+    //3 attempts fail
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "l2v1"), 2);
+    
+    //l3v1 failure
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "l3v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "l3v1"), "0");
+    //3 attempts fail
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "l3v1"), 2);
+    DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
new file mode 100644
index 0000000..f74d819
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with vertices divided into 6 levels.
+ * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
+ * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
+ * 
+ * DAG is shown with a diagram below.
+ * Each vertex has its degree of parallelism indicated in a bracket following its name.
+ * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
+ * 
+ * l1v1(1)     l1v2(2)     l1v3(3)     l1v4(2)
+ *   |s          |s          |s          |b
+ *   |           |           |           |
+ * l2v1(1)     l2v2(3)     l2v3(2)     l2v4(3)
+ *    \s        /s    \b     |s       /s
+ *     \       /        \    |      /
+ *      l3v1(4)            l3v2(4)
+ *          \s              /s
+ *            \            /
+ *               l4v1 (10)
+ *              /s   |s   \s
+ *            /      |      \
+ *     l5v1(2)    l5v2(4)     l5v3(1)
+ *           \s      |s      /s
+ *             \     |      /
+ *                l6v1(4)
+ *
+ */
+public class SixLevelsFailingDAG extends ThreeLevelsFailingDAG {
+    
+    protected static Vertex l4v1;
+    protected static Vertex l5v1, l5v2, l5v3;
+    protected static Vertex l6v1;
+    
+    protected static void addDAGVerticesAndEdges() {
+        ThreeLevelsFailingDAG.addDAGVerticesAndEdges();
+        l4v1 = new Vertex("l4v1", TestProcessor.getProcDesc(payload), 10, defaultResource);
+        dag.addVertex(l4v1);
+        addEdge(l3v1, l4v1, DataMovementType.SCATTER_GATHER);
+        addEdge(l3v2, l4v1, DataMovementType.SCATTER_GATHER);
+        l5v1 = new Vertex("l5v1", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        dag.addVertex(l5v1);
+        addEdge(l4v1, l5v1, DataMovementType.SCATTER_GATHER);
+        l5v2 = new Vertex("l5v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        dag.addVertex(l5v2);
+        addEdge(l4v1, l5v2, DataMovementType.SCATTER_GATHER);
+        l5v3 = new Vertex("l5v3", TestProcessor.getProcDesc(payload), 1, defaultResource);
+        dag.addVertex(l5v3);
+        addEdge(l4v1, l5v3, DataMovementType.SCATTER_GATHER);
+        l6v1 = new Vertex("l6v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        dag.addVertex(l6v1);
+        addEdge(l5v1, l6v1, DataMovementType.SCATTER_GATHER);
+        addEdge(l5v2, l6v1, DataMovementType.SCATTER_GATHER);
+        addEdge(l5v3, l6v1, DataMovementType.SCATTER_GATHER);
+    }
+        
+    public static DAG createDAG(Configuration conf) throws Exception {
+      return createDAG("SixLevelsFailingDAG", conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
new file mode 100644
index 0000000..8062285
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.test.TestProcessor;
+import org.apache.tez.test.dag.TwoLevelsFailingDAG;
+
+/**
+ * A DAG with vertices divided into 3 levels.
+ * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
+ * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
+ * 
+ * DAG is shown with a diagram below.
+ * Each vertex has its degree of parallelism indicated in a bracket following its name.
+ * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
+ * 
+ * l1v1(1)     l1v2(2)     l1v3(3)     l1v4(2)
+ *   |s          |s          |s          |b
+ *   |           |           |           |
+ * l2v1(1)     l2v2(3)     l2v3(2)     l2v4(3)
+ *    \s        /s    \b     |s       /s
+ *     \       /        \    |      /
+ *      l3v1(4)            l3v2(4)
+ *
+ */
+public class ThreeLevelsFailingDAG extends TwoLevelsFailingDAG {
+    
+    protected static Vertex l3v1, l3v2;
+    
+    protected static void addDAGVerticesAndEdges() {
+        TwoLevelsFailingDAG.addDAGVerticesAndEdges();
+        l3v1 = new Vertex("l3v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        dag.addVertex(l3v1);
+        addEdge(l2v1, l3v1, DataMovementType.SCATTER_GATHER);
+        addEdge(l2v2, l3v1, DataMovementType.SCATTER_GATHER);
+        l3v2 = new Vertex("l3v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
+        dag.addVertex(l3v2);
+        addEdge(l2v2, l3v2, DataMovementType.BROADCAST);
+        addEdge(l2v3, l3v2, DataMovementType.SCATTER_GATHER);
+        addEdge(l2v4, l3v2, DataMovementType.SCATTER_GATHER);
+    }
+        
+    public static DAG createDAG(Configuration conf) throws Exception {
+      return createDAG("ThreeLevelsFailingDAG", conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
new file mode 100644
index 0000000..de74931
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with vertices divided into 2 levels.
+ * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
+ * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
+ * 
+ * DAG is shown with a diagram below.
+ * Each vertex has its degree of parallelism indicated in a bracket following its name.
+ * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
+ * 
+ * l1v1(1)     l1v2(2)     l1v3(3)     l1v4(2)
+ *   |s          |s          |s          |b
+ *   |           |           |           |
+ * l2v1(1)     l2v2(3)     l2v3(2)     l2v4(3)
+ *
+ */
+public class TwoLevelsFailingDAG {
+    static Resource defaultResource = Resource.newInstance(100, 0);
+    protected static DAG dag;
+    protected static byte[] payload;
+    protected static Vertex l1v1, l1v2, l1v3, l1v4;
+    protected static Vertex l2v1, l2v2, l2v3, l2v4;
+
+    public static DAG createDAG(String name, 
+            Configuration conf) throws Exception {
+        if (conf != null) {
+          payload = TezUtils.createUserPayloadFromConf(conf);
+        } 
+        dag = new DAG(name);
+        addDAGVerticesAndEdges();
+        return dag;
+    }
+    
+    protected static void addDAGVerticesAndEdges() {
+        l1v1 = new Vertex("l1v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
+        l2v1 = new Vertex("l2v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
+        addVerticesAndEdgeInternal(l1v1, l2v1, DataMovementType.SCATTER_GATHER);
+        l1v2 = new Vertex("l1v2", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        l2v2 = new Vertex("l2v2", TestProcessor.getProcDesc(payload), 3, defaultResource);
+        addVerticesAndEdgeInternal(l1v2, l2v2, DataMovementType.SCATTER_GATHER);
+        l1v3 = new Vertex("l1v3", TestProcessor.getProcDesc(payload), 3, defaultResource);
+        l2v3 = new Vertex("l2v3", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        addVerticesAndEdgeInternal(l1v3, l2v3, DataMovementType.SCATTER_GATHER);
+        l1v4 = new Vertex("l1v4", TestProcessor.getProcDesc(payload), 2, defaultResource);
+        l2v4 = new Vertex("l2v4", TestProcessor.getProcDesc(payload), 3, defaultResource);
+        addVerticesAndEdgeInternal(l1v4, l2v4, DataMovementType.BROADCAST);
+    }
+    
+    /**
+     * Adds 2 vertices and an edge connecting them.
+     * Given two vertices must not exist.
+     * 
+     * @param v1 vertice 1
+     * @param v2 vertice 2
+     * @param dataMovementType Data movement type
+     */
+    protected static void addVerticesAndEdgeInternal(Vertex v1, Vertex v2, DataMovementType dataMovementType) {
+        dag.addVertex(v1).addVertex(v2);
+        addEdge(v1, v2, dataMovementType);
+    }
+    
+    /**
+     * Adds an edge to given 2 vertices.
+     * @param v1 vertice 1
+     * @param v2 vertice 2
+     * @param dataMovementType Data movement type
+     */
+    protected static void addEdge(Vertex v1, Vertex v2, DataMovementType dataMovementType) {
+        dag.addEdge(new Edge(v1, v2, 
+            new EdgeProperty(dataMovementType, 
+                DataSourceType.PERSISTED, 
+                SchedulingType.SEQUENTIAL, 
+                TestOutput.getOutputDesc(payload), 
+                TestInput.getInputDesc(payload))));
+    }
+    
+    public static DAG createDAG(Configuration conf) throws Exception {
+      return createDAG("TwoLevelsFailingDAG", conf);
+    }
+}