You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/05 22:15:15 UTC
git commit: TEZ-783. Add standard DAGs using failing
processors/inputs for test purpose (Tassapol Athiapinya via bikas)
Updated Branches:
refs/heads/master c9cef2bf8 -> bd5624179
TEZ-783. Add standard DAGs using failing processors/inputs for test purpose (Tassapol Athiapinya via bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bd562417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bd562417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bd562417
Branch: refs/heads/master
Commit: bd562417916598fb9d78a3508d945aca0e24d2e0
Parents: c9cef2b
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 5 13:15:07 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 5 13:15:07 2014 -0800
----------------------------------------------------------------------
.../org/apache/tez/test/TestFaultTolerance.java | 50 +++++++++
.../tez/test/dag/SixLevelsFailingDAG.java | 84 ++++++++++++++
.../tez/test/dag/ThreeLevelsFailingDAG.java | 65 +++++++++++
.../tez/test/dag/TwoLevelsFailingDAG.java | 112 +++++++++++++++++++
4 files changed, 311 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index b56365e..8b24264 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -44,6 +44,9 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.test.dag.SixLevelsFailingDAG;
+import org.apache.tez.test.dag.ThreeLevelsFailingDAG;
+import org.apache.tez.test.dag.TwoLevelsFailingDAG;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -258,5 +261,52 @@ public class TestFaultTolerance {
DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
+
+ @Test (timeout=60000)
+ public void testTwoLevelsFailingDAGSuccess() throws Exception {
+ Configuration testConf = new Configuration();
+ DAG dag = TwoLevelsFailingDAG.createDAG("testTwoLevelsFailingDAGSuccess", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+ @Test (timeout=60000)
+ public void testThreeLevelsFailingDAGSuccess() throws Exception {
+ Configuration testConf = new Configuration();
+ DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAGSuccess", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+ @Test (timeout=60000)
+ public void testSixLevelsFailingDAGSuccess() throws Exception {
+ Configuration testConf = new Configuration();
+ DAG dag = SixLevelsFailingDAG.createDAG("testSixLevelsFailingDAGSuccess", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+ @Test (timeout=60000)
+ public void testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds() throws Exception {
+ Configuration testConf = new Configuration();
+ //set maximum number of task attempts to 4
+ testConf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_ATTEMPTS, 4);
+ //l2v1 failure
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "l2v1"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "l2v1"), "1");
+ //3 attempts fail
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "l2v1"), 2);
+
+ //l3v1 failure
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "l3v1"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "l3v1"), "0");
+ //3 attempts fail
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "l3v1"), 2);
+ DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
new file mode 100644
index 0000000..f74d819
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with vertices divided into 6 levels.
+ * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
+ * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
+ *
+ * DAG is shown with a diagram below.
+ * Each vertex has its degree of parallelism indicated in a bracket following its name.
+ * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
+ *
+ * l1v1(1) l1v2(2) l1v3(3) l1v4(2)
+ * |s |s |s |b
+ * | | | |
+ * l2v1(1) l2v2(3) l2v3(2) l2v4(3)
+ * \s /s \b |s /s
+ * \ / \ | /
+ * l3v1(4) l3v2(4)
+ * \s /s
+ * \ /
+ * l4v1 (10)
+ * /s |s \s
+ * / | \
+ * l5v1(2) l5v2(4) l5v3(1)
+ * \s |s /s
+ * \ | /
+ * l6v1(4)
+ *
+ */
+public class SixLevelsFailingDAG extends ThreeLevelsFailingDAG {
+
+ protected static Vertex l4v1;
+ protected static Vertex l5v1, l5v2, l5v3;
+ protected static Vertex l6v1;
+
+ protected static void addDAGVerticesAndEdges() {
+ ThreeLevelsFailingDAG.addDAGVerticesAndEdges();
+ l4v1 = new Vertex("l4v1", TestProcessor.getProcDesc(payload), 10, defaultResource);
+ dag.addVertex(l4v1);
+ addEdge(l3v1, l4v1, DataMovementType.SCATTER_GATHER);
+ addEdge(l3v2, l4v1, DataMovementType.SCATTER_GATHER);
+ l5v1 = new Vertex("l5v1", TestProcessor.getProcDesc(payload), 2, defaultResource);
+ dag.addVertex(l5v1);
+ addEdge(l4v1, l5v1, DataMovementType.SCATTER_GATHER);
+ l5v2 = new Vertex("l5v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
+ dag.addVertex(l5v2);
+ addEdge(l4v1, l5v2, DataMovementType.SCATTER_GATHER);
+ l5v3 = new Vertex("l5v3", TestProcessor.getProcDesc(payload), 1, defaultResource);
+ dag.addVertex(l5v3);
+ addEdge(l4v1, l5v3, DataMovementType.SCATTER_GATHER);
+ l6v1 = new Vertex("l6v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
+ dag.addVertex(l6v1);
+ addEdge(l5v1, l6v1, DataMovementType.SCATTER_GATHER);
+ addEdge(l5v2, l6v1, DataMovementType.SCATTER_GATHER);
+ addEdge(l5v3, l6v1, DataMovementType.SCATTER_GATHER);
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("SixLevelsFailingDAG", conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
new file mode 100644
index 0000000..8062285
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.test.TestProcessor;
+import org.apache.tez.test.dag.TwoLevelsFailingDAG;
+
+/**
+ * A DAG with vertices divided into 3 levels.
+ * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
+ * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
+ *
+ * DAG is shown with a diagram below.
+ * Each vertex has its degree of parallelism indicated in a bracket following its name.
+ * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
+ *
+ * l1v1(1) l1v2(2) l1v3(3) l1v4(2)
+ * |s |s |s |b
+ * | | | |
+ * l2v1(1) l2v2(3) l2v3(2) l2v4(3)
+ * \s /s \b |s /s
+ * \ / \ | /
+ * l3v1(4) l3v2(4)
+ *
+ */
+public class ThreeLevelsFailingDAG extends TwoLevelsFailingDAG {
+
+ protected static Vertex l3v1, l3v2;
+
+ protected static void addDAGVerticesAndEdges() {
+ TwoLevelsFailingDAG.addDAGVerticesAndEdges();
+ l3v1 = new Vertex("l3v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
+ dag.addVertex(l3v1);
+ addEdge(l2v1, l3v1, DataMovementType.SCATTER_GATHER);
+ addEdge(l2v2, l3v1, DataMovementType.SCATTER_GATHER);
+ l3v2 = new Vertex("l3v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
+ dag.addVertex(l3v2);
+ addEdge(l2v2, l3v2, DataMovementType.BROADCAST);
+ addEdge(l2v3, l3v2, DataMovementType.SCATTER_GATHER);
+ addEdge(l2v4, l3v2, DataMovementType.SCATTER_GATHER);
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("ThreeLevelsFailingDAG", conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd562417/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
new file mode 100644
index 0000000..de74931
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with vertices divided into 2 levels.
+ * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
+ * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
+ *
+ * DAG is shown with a diagram below.
+ * Each vertex has its degree of parallelism indicated in a bracket following its name.
+ * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
+ *
+ * l1v1(1) l1v2(2) l1v3(3) l1v4(2)
+ * |s |s |s |b
+ * | | | |
+ * l2v1(1) l2v2(3) l2v3(2) l2v4(3)
+ *
+ */
+public class TwoLevelsFailingDAG {
+ static Resource defaultResource = Resource.newInstance(100, 0);
+ protected static DAG dag;
+ protected static byte[] payload;
+ protected static Vertex l1v1, l1v2, l1v3, l1v4;
+ protected static Vertex l2v1, l2v2, l2v3, l2v4;
+
+ public static DAG createDAG(String name,
+ Configuration conf) throws Exception {
+ if (conf != null) {
+ payload = TezUtils.createUserPayloadFromConf(conf);
+ }
+ dag = new DAG(name);
+ addDAGVerticesAndEdges();
+ return dag;
+ }
+
+ protected static void addDAGVerticesAndEdges() {
+ l1v1 = new Vertex("l1v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
+ l2v1 = new Vertex("l2v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
+ addVerticesAndEdgeInternal(l1v1, l2v1, DataMovementType.SCATTER_GATHER);
+ l1v2 = new Vertex("l1v2", TestProcessor.getProcDesc(payload), 2, defaultResource);
+ l2v2 = new Vertex("l2v2", TestProcessor.getProcDesc(payload), 3, defaultResource);
+ addVerticesAndEdgeInternal(l1v2, l2v2, DataMovementType.SCATTER_GATHER);
+ l1v3 = new Vertex("l1v3", TestProcessor.getProcDesc(payload), 3, defaultResource);
+ l2v3 = new Vertex("l2v3", TestProcessor.getProcDesc(payload), 2, defaultResource);
+ addVerticesAndEdgeInternal(l1v3, l2v3, DataMovementType.SCATTER_GATHER);
+ l1v4 = new Vertex("l1v4", TestProcessor.getProcDesc(payload), 2, defaultResource);
+ l2v4 = new Vertex("l2v4", TestProcessor.getProcDesc(payload), 3, defaultResource);
+ addVerticesAndEdgeInternal(l1v4, l2v4, DataMovementType.BROADCAST);
+ }
+
+ /**
+ * Adds 2 vertices and an edge connecting them.
+ * Given two vertices must not exist.
+ *
+ * @param v1 vertice 1
+ * @param v2 vertice 2
+ * @param dataMovementType Data movement type
+ */
+ protected static void addVerticesAndEdgeInternal(Vertex v1, Vertex v2, DataMovementType dataMovementType) {
+ dag.addVertex(v1).addVertex(v2);
+ addEdge(v1, v2, dataMovementType);
+ }
+
+ /**
+ * Adds an edge to given 2 vertices.
+ * @param v1 vertice 1
+ * @param v2 vertice 2
+ * @param dataMovementType Data movement type
+ */
+ protected static void addEdge(Vertex v1, Vertex v2, DataMovementType dataMovementType) {
+ dag.addEdge(new Edge(v1, v2,
+ new EdgeProperty(dataMovementType,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("TwoLevelsFailingDAG", conf);
+ }
+}