You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/06 04:34:06 UTC

git commit: TEZ-781. Add unit test for fault tolerance (input failure causes re-run of previous task under allowed maximum failed attempt) (Tassapol Athiapinya via bikas)

Updated Branches:
  refs/heads/master 2c5343998 -> f272fbf0a


TEZ-781. Add unit test for fault tolerance (input failure causes re-run of previous task under allowed maximum failed attempt) (Tassapol Athiapinya via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f272fbf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f272fbf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f272fbf0

Branch: refs/heads/master
Commit: f272fbf0a543bb9eab369e8cc61487b7f2254ebb
Parents: 2c53439
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 5 19:33:45 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 5 19:33:45 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/test/TestFaultTolerance.java | 49 ++++++++++++++++++++
 .../java/org/apache/tez/test/TestInput.java     |  9 ++--
 2 files changed, 55 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f272fbf0/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 8b24264..63c3dd9 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -308,5 +308,54 @@ public class TestFaultTolerance {
     DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
+  
+  /**
+   * Test input failure.
+   * v1-task0    v1-task1
+   * |       \ /     |
+   * v2-task0    v2-task1
+   * 
+   * Use maximum allowed failed attempt of 4 (default value during session creation).
+   * v1-task1-attempt0 fails. Attempt 1 succeeds.
+   * v2-task0-attempt0 runs. Its input1-inputversion0 fails. 
+   * This will trigger rerun of v1-task1.
+   * v1-task1-attempt2 is re-run and succeeds.
+   * v2-task0-attempt0 (no attempt bump) runs. Check its input1. 
+   * The input version is now 1. The attempt will now succeed.
+   * @throws Exception
+   */
+  @Test (timeout=60000)
+  public void testInputFailureCausesRerunAttemptWithinMaxAttemptSuccess() throws Exception {
+    Configuration testConf = new Configuration();
+    //at v1, task 1 has attempt 0 failing. Attempt 1 succeeds. 1 attempt fails so far.
+    testConf.setBoolean(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_DO_FAIL, "v1"), true);
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_TASK_INDEX, "v1"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
+    //at v2, task 0 attempt 0 input 1 input-version 0 fails.
+    //This will trigger re-run of v1's task 1. 
+    //At v1, attempt 2 will kicks off. This attempt is still ok because 
+    //failed attempt so far at v1-task1 is 1 (not greater than 4).
+    testConf.setBoolean(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_DO_FAIL, "v2"), true);
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_INDEX, "v2"), "0");
+    //at v2, attempt 0 have input failures.
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT, "v2"), "0");
+    testConf.set(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "1");
+    //at v2-task0-attempt0/1-input1 has input failure at input version 0 only.
+    testConf.setInt(TestInput.getVertexConfName(
+        TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+  
+    DAG dag = SimpleTestDAG.createDAG(
+            "testInputFailureCausesRerunAttemptWithinMaxAttemptSuccess", testConf);
+    //Job should succeed.
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f272fbf0/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index aa8655b..8da0c96 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -128,8 +128,8 @@ public class TestInput implements LogicalInput {
             e.printStackTrace();
           }
         }
-        LOG.info("Done for inputReady: " + inputReady.get());
         lastInputReadyValue = inputReady.get();
+        LOG.info("Done for inputReady: " + lastInputReadyValue);
       }
       if (doFail) {
         if (
@@ -250,16 +250,19 @@ public class TestInput implements LogicalInput {
       }
     }
     if (numCompletedInputs == numInputs) {
+      int maxInputVersionSeen = -1;  
       for (int i=0; i<numInputs; ++i) {
         if (completedInputVersion[i] < 0) {
           LOG.info("Not received completion for input " + i);
           return;
+        } else if (maxInputVersionSeen < completedInputVersion[i]) {
+          maxInputVersionSeen = completedInputVersion[i];
         }
       }
       LOG.info("Received all inputs");
       synchronized (inputReady) {
-        int newVal = inputReady.incrementAndGet();
-        LOG.info("Notifying done with " + newVal);
+        inputReady.set(maxInputVersionSeen);
+        LOG.info("Notifying done with " + maxInputVersionSeen);
         inputReady.notifyAll();
       }
     }