You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/19 22:57:00 UTC

git commit: TEZ-823. Add DAGs with a vertex connecting with 2 downstream/upstream vertices and unit tests for fault tolerance on these DAGs (Tassapol Athiapinya via bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 87f3ea351 -> 89e6ab246


TEZ-823. Add DAGs with a vertex connecting with 2 downstream/upstream vertices and unit tests for fault tolerance on these DAGs (Tassapol Athiapinya via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/89e6ab24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/89e6ab24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/89e6ab24

Branch: refs/heads/master
Commit: 89e6ab2462a464ee43f8589de14b0841c39beea9
Parents: 87f3ea3
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 19 13:56:48 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 19 13:56:48 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/test/TestFaultTolerance.java | 100 +++++++++++++++++++
 .../tez/test/dag/SimpleReverseVTestDAG.java     |  79 +++++++++++++++
 .../org/apache/tez/test/dag/SimpleVTestDAG.java |  79 +++++++++++++++
 3 files changed, 258 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/89e6ab24/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 0be60ee..b45aec4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -45,6 +45,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.test.dag.SimpleReverseVTestDAG;
+import org.apache.tez.test.dag.SimpleVTestDAG;
 import org.apache.tez.test.dag.SixLevelsFailingDAG;
 import org.apache.tez.test.dag.ThreeLevelsFailingDAG;
 import org.apache.tez.test.dag.TwoLevelsFailingDAG;
@@ -447,5 +449,103 @@ public class TestFaultTolerance {
               "testCascadingInputFailureWithExitSuccess", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
+  
+  /**
+   * Input failure of v3 causes rerun of both both v1 and v2 vertices. 
+   *   v1  v2
+   *    \ /
+   *    v3
+   * 
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "0,1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "1");
+    
+    DAG dag = SimpleVTestDAG.createDAG(
+            "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  /**
+   * Downstream(v3) attempt failure of a vertex connected with 
+   * 2 upstream vertices.. 
+   *   v1  v2
+   *    \ /
+   *    v3
+   * 
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() throws Exception {
+    Configuration testConf = new Configuration(false);
+    
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v3"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v3"), "0,1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v3"), 1);
+    
+    DAG dag = SimpleVTestDAG.createDAG(
+            "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+    
+  /**
+   * Input failure of v2,v3 trigger v1 rerun. 
+   * Reruns can send output to 2 downstream vertices. 
+   *     v1
+   *    /  \
+   *   v2   v3 
+   * 
+   * Also covers multiple consumer vertices report failure against same producer task.
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), false);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), "0");
+    
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), false);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0");
+        
+    DAG dag = SimpleReverseVTestDAG.createDAG(
+            "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/89e6ab24/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
new file mode 100644
index 0000000..6c2bfbf
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with 3 vertices of which their graph can be depicted easily with reverse V shape.
+ *     v1
+ *   /    \
+ *  v2     v3
+ *
+ */
+public class SimpleReverseVTestDAG {
+  static Resource defaultResource = Resource.newInstance(100, 0);
+  public static String TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS =
+      "tez.simple-reverse-v-test-dag.num-tasks";
+  public static int TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT = 2;
+  
+  public static DAG createDAG(String name, 
+      Configuration conf) throws Exception {
+    byte[] payload = null;
+    int taskCount = TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT;
+    if (conf != null) {
+      taskCount = conf.getInt(TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT);
+      payload = TezUtils.createUserPayloadFromConf(conf);
+    }
+    DAG dag = new DAG(name);
+    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addVertex(v3);
+    dag.addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            TestOutput.getOutputDesc(payload), 
+            TestInput.getInputDesc(payload))));
+    dag.addEdge(new Edge(v1, v3, 
+            new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+                DataSourceType.PERSISTED, 
+                SchedulingType.SEQUENTIAL, 
+                TestOutput.getOutputDesc(payload), 
+                TestInput.getInputDesc(payload))));
+    return dag;
+  }
+  
+  public static DAG createDAG(Configuration conf) throws Exception {
+    return createDAG("SimpleReverseVTestDAG", conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/89e6ab24/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
new file mode 100644
index 0000000..9fcfe11
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+/**
+ * A DAG with 3 vertices of which their graph can be depicted easily with V shape.
+ *  v1     v2
+ *    \   /
+ *     v3
+ *
+ */
+public class SimpleVTestDAG {
+  static Resource defaultResource = Resource.newInstance(100, 0);
+  public static String TEZ_SIMPLE_V_DAG_NUM_TASKS =
+      "tez.simple-v-test-dag.num-tasks";
+  public static int TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT = 2;
+  
+  public static DAG createDAG(String name, 
+      Configuration conf) throws Exception {
+    byte[] payload = null;
+    int taskCount = TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT;
+    if (conf != null) {
+      taskCount = conf.getInt(TEZ_SIMPLE_V_DAG_NUM_TASKS, TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT);
+      payload = TezUtils.createUserPayloadFromConf(conf);
+    }
+    DAG dag = new DAG(name);
+    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addVertex(v3);
+    dag.addEdge(new Edge(v1, v3, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            TestOutput.getOutputDesc(payload), 
+            TestInput.getInputDesc(payload))));
+    dag.addEdge(new Edge(v2, v3, 
+            new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+                DataSourceType.PERSISTED, 
+                SchedulingType.SEQUENTIAL, 
+                TestOutput.getOutputDesc(payload), 
+                TestInput.getInputDesc(payload))));
+    return dag;
+  }
+  
+  public static DAG createDAG(Configuration conf) throws Exception {
+    return createDAG("SimpleVTestDAG", conf);
+  }
+}