You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/17 00:15:04 UTC

git commit: TEZ-816. Add unit test for cascading input failure (Tassapol Athiapinya via bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 59b9bbaba -> b3b981a2f


TEZ-816. Add unit test for cascading input failure (Tassapol Athiapinya via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b3b981a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b3b981a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b3b981a2

Branch: refs/heads/master
Commit: b3b981a2fac36ab6e5cf6e21313cba3dfea967bd
Parents: 59b9bba
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Feb 16 15:14:54 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Feb 16 15:14:54 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/tez/test/SimpleTestDAG.java |  8 ++
 .../apache/tez/test/SimpleTestDAG3Vertices.java | 78 +++++++++++++++++++
 .../org/apache/tez/test/TestFaultTolerance.java | 79 ++++++++++++++++++++
 .../java/org/apache/tez/test/TestInput.java     | 21 ++++++
 4 files changed, 186 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
index 85c5d8b..5dfd179 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
@@ -29,6 +29,14 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 
+/**
+ * Simple Test DAG with 2 vertices using TestProcessor/TestInput/TestOutput.
+ * 
+ * v1
+ * |
+ * v2
+ *
+ */
 public class SimpleTestDAG {
   static Resource defaultResource = Resource.newInstance(100, 0);
   public static String TEZ_SIMPLE_DAG_NUM_TASKS =

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
new file mode 100644
index 0000000..fd29afc
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+
+/**
+ * Simple Test DAG with 3 vertices using TestProcessor/TestInput/TestOutput.
+ * 
+ * v1
+ * |
+ * v2
+ * |
+ * v3
+ *
+ */
+public class SimpleTestDAG3Vertices {
+  static Resource defaultResource = Resource.newInstance(100, 0);
+  public static String TEZ_SIMPLE_DAG_NUM_TASKS =
+      "tez.simple-test-dag-3-vertices.num-tasks";
+  public static int TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT = 2;
+  
+  public static DAG createDAG(String name, 
+      Configuration conf) throws Exception {
+    byte[] payload = null;
+    int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
+    if (conf != null) {
+      taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);
+      payload = TezUtils.createUserPayloadFromConf(conf);
+    }
+    DAG dag = new DAG(name);
+    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2, 
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, 
+            SchedulingType.SEQUENTIAL, 
+            TestOutput.getOutputDesc(payload), 
+            TestInput.getInputDesc(payload))));
+    dag.addVertex(v3).addEdge(new Edge(v2, v3, 
+            new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+                DataSourceType.PERSISTED, 
+                SchedulingType.SEQUENTIAL, 
+                TestOutput.getOutputDesc(payload), 
+                TestInput.getInputDesc(payload))));
+    return dag;
+  }
+  
+  public static DAG createDAG(Configuration conf) throws Exception {
+    return createDAG("SimpleTestDAG3Vertices", conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index db259b1..0be60ee 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -368,5 +368,84 @@ public class TestFaultTolerance {
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
   
+  /**
+   * Sets configuration for cascading input failure tests that
+   * use SimpleTestDAG3Vertices.
+   * @param testConf configuration
+   * @param failAndExit whether input failure should trigger attempt exit 
+   */
+  private void setCascadingInputFailureConfig(Configuration testConf, 
+                                              boolean failAndExit) {
+    // v2 attempt0 succeeds.
+    // v2 task0 attempt1 input0 fails up to version 0.
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit);
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+    testConf.setInt(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"),
+            0);
+
+    //v3 all-tasks attempt0 input0 fails up to version 0.
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+    testConf.setBoolean(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit);
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+    testConf.set(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0");
+    testConf.setInt(TestInput.getVertexConfName(
+            TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"),
+            0);
+  }
+  
+  /**
+   * Test cascading input failure without exit. Expecting success.
+   * v1 -- v2 -- v3
+   * v3 all-tasks attempt0 input0 fails. Wait. Triggering v2 rerun.
+   * v2 task0 attempt1 input0 fails. Wait. Triggering v1 rerun.
+   * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt1 succeeds.
+   * v3 attempt0 accepts v2 attempt1 output.
+   * 
+   * AM vertex succeeded order is v1, v2, v1, v2, v3.
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
+    Configuration testConf = new Configuration(false);
+    setCascadingInputFailureConfig(testConf, false);
+    DAG dag = SimpleTestDAG3Vertices.createDAG(
+              "testCascadingInputFailureWithoutExitSuccess", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  /**
+   * Test cascading input failure with exit. Expecting success.
+   * v1 -- v2 -- v3
+   * v3 all-tasks attempt0 input0 fails. v3 attempt0 exits. Triggering v2 rerun.
+   * v2 task0 attempt1 input0 fails. v2 attempt1 exits. Triggering v1 rerun.
+   * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt2 succeeds.
+   * v3 attempt1 accepts v2 attempt2 output.
+   * 
+   * AM vertex succeeded order is v1, v2, v3, v1, v2, v3.
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testCascadingInputFailureWithExitSuccess() throws Exception {
+    Configuration testConf = new Configuration(false);
+    setCascadingInputFailureConfig(testConf, true);
+    DAG dag = SimpleTestDAG3Vertices.createDAG(
+              "testCascadingInputFailureWithExitSuccess", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 5ed9faa..a49f5b2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -170,7 +170,28 @@ public class TestInput implements LogicalInput {
           } else {
             done = false;
           }
+        } else if ((failingTaskIndices.contains(failAll) ||
+            failingTaskIndices.contains(inputContext.getTaskIndex()))){
+          boolean previousAttemptReadFailed = false;
+          if (failingTaskAttempts.contains(failAll)) {
+            previousAttemptReadFailed = true;
+          } else {
+            for (int i=0 ; i<inputContext.getTaskAttemptNumber(); ++i) {
+              if (failingTaskAttempts.contains(new Integer(i))) {
+                previousAttemptReadFailed = true;
+                break;
+              }
+            }
+          }
+          if (previousAttemptReadFailed && 
+              (lastInputReadyValue <= failingInputUpto)) {
+            // if any previous attempt has failed then dont be done when we see
+            // a previously failed input
+            LOG.info("Previous task attempt failed and input version less than failing upto version");
+            done = false;
+          }
         }
+        
       }
     } while (!done);
     return numInputs;