You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/19 22:57:00 UTC
git commit: TEZ-823. Add DAGs with a vertex connecting with 2
downstream/upstream vertices and unit tests for fault tolerance on these DAGs
(Tassapol Athiapinya via bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 87f3ea351 -> 89e6ab246
TEZ-823. Add DAGs with a vertex connecting with 2 downstream/upstream vertices and unit tests for fault tolerance on these DAGs (Tassapol Athiapinya via bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/89e6ab24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/89e6ab24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/89e6ab24
Branch: refs/heads/master
Commit: 89e6ab2462a464ee43f8589de14b0841c39beea9
Parents: 87f3ea3
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 19 13:56:48 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 19 13:56:48 2014 -0800
----------------------------------------------------------------------
.../org/apache/tez/test/TestFaultTolerance.java | 100 +++++++++++++++++++
.../tez/test/dag/SimpleReverseVTestDAG.java | 79 +++++++++++++++
.../org/apache/tez/test/dag/SimpleVTestDAG.java | 79 +++++++++++++++
3 files changed, 258 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/89e6ab24/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 0be60ee..b45aec4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -45,6 +45,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.test.dag.SimpleReverseVTestDAG;
+import org.apache.tez.test.dag.SimpleVTestDAG;
import org.apache.tez.test.dag.SixLevelsFailingDAG;
import org.apache.tez.test.dag.ThreeLevelsFailingDAG;
import org.apache.tez.test.dag.TwoLevelsFailingDAG;
@@ -447,5 +449,103 @@ public class TestFaultTolerance {
"testCascadingInputFailureWithExitSuccess", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
+
+ /**
+ * Input failure of v3 causes rerun of both both v1 and v2 vertices.
+ * v1 v2
+ * \ /
+ * v3
+ *
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0,1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "1");
+
+ DAG dag = SimpleVTestDAG.createDAG(
+ "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+ /**
+ * Downstream(v3) attempt failure of a vertex connected with
+ * 2 upstream vertices..
+ * v1 v2
+ * \ /
+ * v3
+ *
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() throws Exception {
+ Configuration testConf = new Configuration(false);
+
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v3"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v3"), "0,1");
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v3"), 1);
+
+ DAG dag = SimpleVTestDAG.createDAG(
+ "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+ /**
+ * Input failure of v2,v3 trigger v1 rerun.
+ * Reruns can send output to 2 downstream vertices.
+ * v1
+ * / \
+ * v2 v3
+ *
+ * Also covers multiple consumer vertices report failure against same producer task.
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), false);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), "0");
+
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0");
+
+ DAG dag = SimpleReverseVTestDAG.createDAG(
+ "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/89e6ab24/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
new file mode 100644
index 0000000..6c2bfbf
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with 3 vertices of which their graph can be depicted easily with reverse V shape.
+ * v1
+ * / \
+ * v2 v3
+ *
+ */
+public class SimpleReverseVTestDAG {
+ static Resource defaultResource = Resource.newInstance(100, 0);
+ public static String TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS =
+ "tez.simple-reverse-v-test-dag.num-tasks";
+ public static int TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT = 2;
+
+ public static DAG createDAG(String name,
+ Configuration conf) throws Exception {
+ byte[] payload = null;
+ int taskCount = TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT;
+ if (conf != null) {
+ taskCount = conf.getInt(TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT);
+ payload = TezUtils.createUserPayloadFromConf(conf);
+ }
+ DAG dag = new DAG(name);
+ Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ dag.addVertex(v1).addVertex(v2).addVertex(v3);
+ dag.addEdge(new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ dag.addEdge(new Edge(v1, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ return dag;
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("SimpleReverseVTestDAG", conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/89e6ab24/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
new file mode 100644
index 0000000..9fcfe11
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with 3 vertices of which their graph can be depicted easily with V shape.
+ * v1 v2
+ * \ /
+ * v3
+ *
+ */
+public class SimpleVTestDAG {
+ static Resource defaultResource = Resource.newInstance(100, 0);
+ public static String TEZ_SIMPLE_V_DAG_NUM_TASKS =
+ "tez.simple-v-test-dag.num-tasks";
+ public static int TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT = 2;
+
+ public static DAG createDAG(String name,
+ Configuration conf) throws Exception {
+ byte[] payload = null;
+ int taskCount = TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT;
+ if (conf != null) {
+ taskCount = conf.getInt(TEZ_SIMPLE_V_DAG_NUM_TASKS, TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT);
+ payload = TezUtils.createUserPayloadFromConf(conf);
+ }
+ DAG dag = new DAG(name);
+ Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ dag.addVertex(v1).addVertex(v2).addVertex(v3);
+ dag.addEdge(new Edge(v1, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ dag.addEdge(new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ return dag;
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("SimpleVTestDAG", conf);
+ }
+}