You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/17 00:15:04 UTC
git commit: TEZ-816. Add unit test for cascading input failure
(Tassapol Athiapinya via bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 59b9bbaba -> b3b981a2f
TEZ-816. Add unit test for cascading input failure (Tassapol Athiapinya via bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b3b981a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b3b981a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b3b981a2
Branch: refs/heads/master
Commit: b3b981a2fac36ab6e5cf6e21313cba3dfea967bd
Parents: 59b9bba
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Feb 16 15:14:54 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Feb 16 15:14:54 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/tez/test/SimpleTestDAG.java | 8 ++
.../apache/tez/test/SimpleTestDAG3Vertices.java | 78 +++++++++++++++++++
.../org/apache/tez/test/TestFaultTolerance.java | 79 ++++++++++++++++++++
.../java/org/apache/tez/test/TestInput.java | 21 ++++++
4 files changed, 186 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
index 85c5d8b..5dfd179 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
@@ -29,6 +29,14 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+/**
+ * Simple Test DAG with 2 vertices using TestProcessor/TestInput/TestOutput.
+ *
+ * v1
+ * |
+ * v2
+ *
+ */
public class SimpleTestDAG {
static Resource defaultResource = Resource.newInstance(100, 0);
public static String TEZ_SIMPLE_DAG_NUM_TASKS =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
new file mode 100644
index 0000000..fd29afc
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+
+/**
+ * Simple Test DAG with 3 vertices using TestProcessor/TestInput/TestOutput.
+ *
+ * v1
+ * |
+ * v2
+ * |
+ * v3
+ *
+ */
+public class SimpleTestDAG3Vertices {
+ static Resource defaultResource = Resource.newInstance(100, 0);
+ public static String TEZ_SIMPLE_DAG_NUM_TASKS =
+ "tez.simple-test-dag-3-vertices.num-tasks";
+ public static int TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT = 2;
+
+ public static DAG createDAG(String name,
+ Configuration conf) throws Exception {
+ byte[] payload = null;
+ int taskCount = TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT;
+ if (conf != null) {
+ taskCount = conf.getInt(TEZ_SIMPLE_DAG_NUM_TASKS, TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT);
+ payload = TezUtils.createUserPayloadFromConf(conf);
+ }
+ DAG dag = new DAG(name);
+ Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+ dag.addVertex(v1).addVertex(v2).addEdge(new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ dag.addVertex(v3).addEdge(new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ return dag;
+ }
+
+ public static DAG createDAG(Configuration conf) throws Exception {
+ return createDAG("SimpleTestDAG3Vertices", conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index db259b1..0be60ee 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -368,5 +368,84 @@ public class TestFaultTolerance {
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
+ /**
+ * Sets configuration for cascading input failure tests that
+ * use SimpleTestDAG3Vertices.
+ * @param testConf configuration
+ * @param failAndExit whether input failure should trigger attempt exit
+ */
+ private void setCascadingInputFailureConfig(Configuration testConf,
+ boolean failAndExit) {
+ // v2 attempt0 succeeds.
+ // v2 task0 attempt1 input0 fails up to version 0.
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v2"), failAndExit);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
+ testConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"),
+ 0);
+
+ //v3 all-tasks attempt0 input0 fails up to version 0.
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v3"), true);
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, "v3"), failAndExit);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v3"), "-1");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v3"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "0");
+ testConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"),
+ 0);
+ }
+
+ /**
+ * Test cascading input failure without exit. Expecting success.
+ * v1 -- v2 -- v3
+ * v3 all-tasks attempt0 input0 fails. Wait. Triggering v2 rerun.
+ * v2 task0 attempt1 input0 fails. Wait. Triggering v1 rerun.
+ * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt1 succeeds.
+ * v3 attempt0 accepts v2 attempt1 output.
+ *
+ * AM vertex succeeded order is v1, v2, v1, v2, v3.
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
+ Configuration testConf = new Configuration(false);
+ setCascadingInputFailureConfig(testConf, false);
+ DAG dag = SimpleTestDAG3Vertices.createDAG(
+ "testCascadingInputFailureWithoutExitSuccess", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+ /**
+ * Test cascading input failure with exit. Expecting success.
+ * v1 -- v2 -- v3
+ * v3 all-tasks attempt0 input0 fails. v3 attempt0 exits. Triggering v2 rerun.
+ * v2 task0 attempt1 input0 fails. v2 attempt1 exits. Triggering v1 rerun.
+ * v1 attempt1 rerun and succeeds. v2 accepts v1 attempt1 output. v2 attempt2 succeeds.
+ * v3 attempt1 accepts v2 attempt2 output.
+ *
+ * AM vertex succeeded order is v1, v2, v3, v1, v2, v3.
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testCascadingInputFailureWithExitSuccess() throws Exception {
+ Configuration testConf = new Configuration(false);
+ setCascadingInputFailureConfig(testConf, true);
+ DAG dag = SimpleTestDAG3Vertices.createDAG(
+ "testCascadingInputFailureWithExitSuccess", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b3b981a2/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 5ed9faa..a49f5b2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -170,7 +170,28 @@ public class TestInput implements LogicalInput {
} else {
done = false;
}
+ } else if ((failingTaskIndices.contains(failAll) ||
+ failingTaskIndices.contains(inputContext.getTaskIndex()))){
+ boolean previousAttemptReadFailed = false;
+ if (failingTaskAttempts.contains(failAll)) {
+ previousAttemptReadFailed = true;
+ } else {
+ for (int i=0 ; i<inputContext.getTaskAttemptNumber(); ++i) {
+ if (failingTaskAttempts.contains(new Integer(i))) {
+ previousAttemptReadFailed = true;
+ break;
+ }
+ }
+ }
+ if (previousAttemptReadFailed &&
+ (lastInputReadyValue <= failingInputUpto)) {
+ // if any previous attempt has failed then dont be done when we see
+ // a previously failed input
+ LOG.info("Previous task attempt failed and input version less than failing upto version");
+ done = false;
+ }
}
+
}
} while (!done);
return numInputs;