You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/06 04:34:06 UTC
git commit: TEZ-781. Add unit test for fault tolerance (input failure
causes re-run of previous task under allowed maximum failed attempt)
(Tassapol Athiapinya via bikas)
Updated Branches:
refs/heads/master 2c5343998 -> f272fbf0a
TEZ-781. Add unit test for fault tolerance (input failure causes re-run of previous task under allowed maximum failed attempt) (Tassapol Athiapinya via bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f272fbf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f272fbf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f272fbf0
Branch: refs/heads/master
Commit: f272fbf0a543bb9eab369e8cc61487b7f2254ebb
Parents: 2c53439
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 5 19:33:45 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 5 19:33:45 2014 -0800
----------------------------------------------------------------------
.../org/apache/tez/test/TestFaultTolerance.java | 49 ++++++++++++++++++++
.../java/org/apache/tez/test/TestInput.java | 9 ++--
2 files changed, 55 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f272fbf0/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 8b24264..63c3dd9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -308,5 +308,54 @@ public class TestFaultTolerance {
DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
+
+ /**
+ * Test input failure.
+ * v1-task0 v1-task1
+ * | \ / |
+ * v2-task0 v2-task1
+ *
+ * Use maximum allowed failed attempt of 4 (default value during session creation).
+ * v1-task1-attempt0 fails. Attempt 1 succeeds.
+ * v2-task0-attempt0 runs. Its input1-inputversion0 fails.
+ * This will trigger rerun of v1-task1.
+ * v1-task1-attempt2 is re-run and succeeds.
+ * v2-task0-attempt0 (no attempt bump) runs. Check its input1.
+ * The input version is now 1. The attempt will now succeed.
+ * @throws Exception
+ */
+ @Test (timeout=60000)
+ public void testInputFailureCausesRerunAttemptWithinMaxAttemptSuccess() throws Exception {
+ Configuration testConf = new Configuration();
+ //at v1, task 1 has attempt 0 failing. Attempt 1 succeeds. 1 attempt fails so far.
+ testConf.setBoolean(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+ testConf.set(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "1");
+ testConf.setInt(TestProcessor.getVertexConfName(
+ TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
+ //at v2, task 0 attempt 0 input 1 input-version 0 fails.
+ //This will trigger re-run of v1's task 1.
+ //At v1, attempt 2 will kicks off. This attempt is still ok because
+ //failed attempt so far at v1-task1 is 1 (not greater than 4).
+ testConf.setBoolean(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+ //at v2, attempt 0 have input failures.
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+ testConf.set(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "1");
+ //at v2-task0-attempt0/1-input1 has input failure at input version 0 only.
+ testConf.setInt(TestInput.getVertexConfName(
+ TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+
+ DAG dag = SimpleTestDAG.createDAG(
+ "testInputFailureCausesRerunAttemptWithinMaxAttemptSuccess", testConf);
+ //Job should succeed.
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f272fbf0/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index aa8655b..8da0c96 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -128,8 +128,8 @@ public class TestInput implements LogicalInput {
e.printStackTrace();
}
}
- LOG.info("Done for inputReady: " + inputReady.get());
lastInputReadyValue = inputReady.get();
+ LOG.info("Done for inputReady: " + lastInputReadyValue);
}
if (doFail) {
if (
@@ -250,16 +250,19 @@ public class TestInput implements LogicalInput {
}
}
if (numCompletedInputs == numInputs) {
+ int maxInputVersionSeen = -1;
for (int i=0; i<numInputs; ++i) {
if (completedInputVersion[i] < 0) {
LOG.info("Not received completion for input " + i);
return;
+ } else if (maxInputVersionSeen < completedInputVersion[i]) {
+ maxInputVersionSeen = completedInputVersion[i];
}
}
LOG.info("Received all inputs");
synchronized (inputReady) {
- int newVal = inputReady.incrementAndGet();
- LOG.info("Notifying done with " + newVal);
+ inputReady.set(maxInputVersionSeen);
+ LOG.info("Notifying done with " + maxInputVersionSeen);
inputReady.notifyAll();
}
}