You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/12/10 04:30:27 UTC

[17/27] tez git commit: TEZ-1060 Add randomness to fault tolerance tests

TEZ-1060 Add randomness to fault tolerance tests


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ac26ade4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ac26ade4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ac26ade4

Branch: refs/heads/TEZ-8
Commit: ac26ade401d49a5c772e9f85312f1c5b5f952e8d
Parents: 6b53107
Author: Tassapol Athiapinya <ta...@hortonworks.com>
Authored: Wed Dec 3 16:35:20 2014 -0800
Committer: Tassapol Athiapinya <ta...@hortonworks.com>
Committed: Wed Dec 3 16:35:20 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/test/TestFaultTolerance.java |  20 ++-
 .../java/org/apache/tez/test/TestInput.java     | 152 ++++++++++++-------
 .../java/org/apache/tez/test/TestProcessor.java |  65 ++++++--
 3 files changed, 174 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ac26ade4/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 1c1e846..c834dee 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -713,5 +713,23 @@ public class TestFaultTolerance {
     DAG dag = SimpleTestDAG.createDAG("testTwoTasksHaveInputFailuresSuccess", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
-
+  
+  @Test (timeout=60000)
+  public void testRandomFailingTasks() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setBoolean(TestProcessor.TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL, true);
+    testConf.setFloat(TestProcessor.TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY, 0.5f);
+    DAG dag = SixLevelsFailingDAG.createDAG("testRandomFailingTasks", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
+  @Test (timeout=60000)
+  public void testRandomFailingInputs() throws Exception {
+    Configuration testConf = new Configuration(false);
+    testConf.setBoolean(TestInput.TEZ_FAILING_INPUT_DO_RANDOM_FAIL, true);
+    testConf.setFloat(TestInput.TEZ_FAILING_INPUT_RANDOM_FAIL_PROBABILITY, 0.5f);
+    DAG dag = SixLevelsFailingDAG.createDAG("testRandomFailingInputs", testConf);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ac26ade4/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 465dd9c..0050961 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
@@ -65,6 +66,8 @@ public class TestInput extends AbstractLogicalInput {
   int failingInputUpto = 0;
   
   boolean doFail = false;
+  boolean doRandomFail = false;
+  float randomFailProbability = 0.0f;
   boolean doFailAndExit = false;
   Set<Integer> failingTaskIndices = Sets.newHashSet();
   Set<Integer> failingTaskAttempts = Sets.newHashSet();
@@ -78,6 +81,16 @@ public class TestInput extends AbstractLogicalInput {
   public static String TEZ_FAILING_INPUT_DO_FAIL =
       "tez.failing-input.do-fail";
   /**
+   * Enable failure for this logical input. The config is set per DAG.
+   */
+  public static String TEZ_FAILING_INPUT_DO_RANDOM_FAIL =
+      "tez.failing-input.do-random-fail";
+  /**
+   * Probability to random fail an input. Range is 0 to 1. The number is set per DAG.
+   */
+  public static String TEZ_FAILING_INPUT_RANDOM_FAIL_PROBABILITY =
+      "tez.failing-input.random-fail-probability";
+  /**
    * Logical input will exit (and cause task failure) after reporting failure to 
    * read.
    */
@@ -146,66 +159,96 @@ public class TestInput extends AbstractLogicalInput {
         lastInputReadyValue = inputReady.get();
         LOG.info("Done for inputReady: " + lastInputReadyValue);
       }
-      if (doFail) {
-        if (
-            (failingTaskIndices.contains(failAll) ||
-            failingTaskIndices.contains(getContext().getTaskIndex())) &&
-            (failingTaskAttempts.contains(failAll) || 
-             failingTaskAttempts.contains(getContext().getTaskAttemptNumber())) &&
-             (lastInputReadyValue <= failingInputUpto)) {
-          List<Event> events = Lists.newLinkedList();
-          if (failingInputIndices.contains(failAll)) {
-            for (int i=0; i<getNumPhysicalInputs(); ++i) {
-              String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
-                  " index: " + i + " version: " + lastInputReadyValue);
-              events.add(InputReadErrorEvent.create(msg, i, lastInputReadyValue));
-              LOG.info("Failing input: " + msg);
-            }
-          } else {
-            for (Integer index : failingInputIndices) {
-              if (index.intValue() >= getNumPhysicalInputs()) {
-                throwException("InputIndex: " + index.intValue() + 
-                    " should be less than numInputs: " + getNumPhysicalInputs());
+      if (!doRandomFail) {
+        // not random fail
+        if (doFail) {
+          if (
+              (failingTaskIndices.contains(failAll) ||
+              failingTaskIndices.contains(getContext().getTaskIndex())) &&
+              (failingTaskAttempts.contains(failAll) || 
+               failingTaskAttempts.contains(getContext().getTaskAttemptNumber())) &&
+               (lastInputReadyValue <= failingInputUpto)) {
+            List<Event> events = Lists.newLinkedList();
+            if (failingInputIndices.contains(failAll)) {
+              for (int i=0; i<getNumPhysicalInputs(); ++i) {
+                String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
+                    " index: " + i + " version: " + lastInputReadyValue);
+                events.add(InputReadErrorEvent.create(msg, i, lastInputReadyValue));
+                LOG.info("Failing input: " + msg);
               }
-              if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
-                continue; // dont fail a previous version now.
+            } else {
+              for (Integer index : failingInputIndices) {
+                if (index.intValue() >= getNumPhysicalInputs()) {
+                  throwException("InputIndex: " + index.intValue() + 
+                      " should be less than numInputs: " + getNumPhysicalInputs());
+                }
+                if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
+                  continue; // dont fail a previous version now.
+                }
+                String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
+                    " index: " + index.intValue() + " version: " + lastInputReadyValue);
+                events.add(InputReadErrorEvent.create(msg, index.intValue(), lastInputReadyValue));
+                LOG.info("Failing input: " + msg);
               }
-              String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + 
-                  " index: " + index.intValue() + " version: " + lastInputReadyValue);
-              events.add(InputReadErrorEvent.create(msg, index.intValue(), lastInputReadyValue));
-              LOG.info("Failing input: " + msg);
             }
-          }
-          getContext().sendEvents(events);
-          if (doFailAndExit) {
-            String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
-            LOG.info(msg);
-            throwException(msg);
-          } else {
-            done = false;
-          }
-        } else if ((failingTaskIndices.contains(failAll) ||
-            failingTaskIndices.contains(getContext().getTaskIndex()))){
-          boolean previousAttemptReadFailed = false;
-          if (failingTaskAttempts.contains(failAll)) {
-            previousAttemptReadFailed = true;
-          } else {
-            for (int i=0 ; i<getContext().getTaskAttemptNumber(); ++i) {
-              if (failingTaskAttempts.contains(new Integer(i))) {
-                previousAttemptReadFailed = true;
-                break;
+            getContext().sendEvents(events);
+            if (doFailAndExit) {
+              String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
+              LOG.info(msg);
+              throwException(msg);
+            } else {
+              done = false;
+            }
+          } else if ((failingTaskIndices.contains(failAll) ||
+              failingTaskIndices.contains(getContext().getTaskIndex()))){
+            boolean previousAttemptReadFailed = false;
+            if (failingTaskAttempts.contains(failAll)) {
+              previousAttemptReadFailed = true;
+            } else {
+              for (int i=0 ; i<getContext().getTaskAttemptNumber(); ++i) {
+                if (failingTaskAttempts.contains(new Integer(i))) {
+                  previousAttemptReadFailed = true;
+                  break;
+                }
               }
             }
+            if (previousAttemptReadFailed && 
+                (lastInputReadyValue <= failingInputUpto)) {
+              // if any previous attempt has failed then dont be done when we see
+              // a previously failed input
+              LOG.info("Previous task attempt failed and input version less than failing upto version");
+              done = false;
+            }
           }
-          if (previousAttemptReadFailed && 
-              (lastInputReadyValue <= failingInputUpto)) {
-            // if any previous attempt has failed then dont be done when we see
-            // a previously failed input
-            LOG.info("Previous task attempt failed and input version less than failing upto version");
-            done = false;
+          
+        }
+      } else {
+        // random fail
+        List<Event> events = Lists.newLinkedList();
+        for (int index=0; index<getNumPhysicalInputs(); ++index) {
+          // completedInputVersion[index] has DataMovementEvent.getVersion() value.
+          int sourceInputVersion = completedInputVersion[index];
+          int maxFailedAttempt = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 
+              TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+          if (sourceInputVersion < maxFailedAttempt - 1) {
+            float rollNumber = (float) Math.random();
+            String msg = "FailingInput random fail turned on." +
+                "Do a roll:" + getContext().getUniqueIdentifier() + 
+                " index: " + index + " version: " + sourceInputVersion +
+                " rollNumber: " + rollNumber + 
+                " randomFailProbability " + randomFailProbability;
+            LOG.info(msg);
+            if (rollNumber < randomFailProbability) {
+              // fail the source input
+              msg = "FailingInput: rollNumber < randomFailProbability. Do fail." + 
+                            getContext().getUniqueIdentifier() + 
+                            " index: " + index + " version: " + sourceInputVersion;
+              LOG.info(msg);
+              events.add(InputReadErrorEvent.create(msg, index, sourceInputVersion));
+            }
           }
         }
-        
+        getContext().sendEvents(events);
       }
     } while (!done);
     
@@ -265,6 +308,11 @@ public class TestInput extends AbstractLogicalInput {
           failingInputIndices.add(Integer.valueOf(failingIndex));
         }
       }
+      doRandomFail = conf
+          .getBoolean(TEZ_FAILING_INPUT_DO_RANDOM_FAIL, false);
+      randomFailProbability = conf.getFloat(TEZ_FAILING_INPUT_RANDOM_FAIL_PROBABILITY, 0.0f);
+      LOG.info("doRandomFail: " + doRandomFail);
+      LOG.info("randomFailProbability: " + randomFailProbability);
     }
     return Collections.emptyList();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/ac26ade4/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index ed37ea9..90a4f13 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
@@ -51,6 +52,8 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
   Configuration conf;
   
   boolean doFail = false;
+  boolean doRandomFail = false;
+  float randomFailProbability = 0.0f;
   long sleepMs;
   Set<Integer> failingTaskIndices = Sets.newHashSet();
   int failingTaskAttemptUpto = 0;
@@ -65,6 +68,15 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
   public static String TEZ_FAILING_PROCESSOR_DO_FAIL =
       "tez.failing-processor.do-fail";
   /**
+   * Enable random failure for all processors.
+   */
+  public static String TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL = "tez.failing-processor.do-random-fail";
+  /**
+   * Probability to random fail a task attempt. Range is 0 to 1. The number is set per DAG.
+   */
+  public static String TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY = "tez.failing-processor.random-fail-probability";
+  
+  /**
    * Time to sleep in the processor in milliseconds.
    */
   public static String TEZ_FAILING_PROCESSOR_SLEEP_MS =
@@ -154,6 +166,12 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
         LOG.info("Adding failing attempt : " + failingTaskAttemptUpto + 
             " dag: " + getContext().getDAGName());
       }
+      
+      doRandomFail = conf
+          .getBoolean(TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL, false);
+      randomFailProbability = conf.getFloat(TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY, 0.0f);
+      LOG.info("doRandomFail: " + doRandomFail);
+      LOG.info("randomFailProbability: " + randomFailProbability);
     }
   }
 
@@ -179,21 +197,48 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
 
     Thread.sleep(sleepMs);
     
-    if (doFail) {
-      if (
-          (failingTaskIndices.contains(failAll) ||
-          failingTaskIndices.contains(getContext().getTaskIndex())) &&
-          (failingTaskAttemptUpto == failAll.intValue() || 
-           failingTaskAttemptUpto >= getContext().getTaskAttemptNumber())) {
-        String msg = "FailingProcessor: " + getContext().getUniqueIdentifier() + 
+    if (!doRandomFail) {
+      // not random fail
+      if (doFail) {
+        if (
+            (failingTaskIndices.contains(failAll) ||
+            failingTaskIndices.contains(getContext().getTaskIndex())) &&
+            (failingTaskAttemptUpto == failAll.intValue() || 
+             failingTaskAttemptUpto >= getContext().getTaskAttemptNumber())) {
+          String msg = "FailingProcessor: " + getContext().getUniqueIdentifier() + 
+              " dag: " + getContext().getDAGName() +
+              " taskIndex: " + getContext().getTaskIndex() +
+              " taskAttempt: " + getContext().getTaskAttemptNumber();
+          LOG.info(msg);
+          throwException(msg);
+        }
+      }
+    } else {
+      // random fail
+      // If task attempt number is below limit, try to randomly fail the attempt.
+      int taskAttemptNumber = getContext().getTaskAttemptNumber();
+      int maxFailedAttempt = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 
+                                     TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+      if (taskAttemptNumber < maxFailedAttempt - 1) {
+        float rollNumber = (float) Math.random();
+        String msg = "FailingProcessor random fail turned on." + 
+            " Do a roll: " + getContext().getUniqueIdentifier() + 
             " dag: " + getContext().getDAGName() +
             " taskIndex: " + getContext().getTaskIndex() +
-            " taskAttempt: " + getContext().getTaskAttemptNumber();
+            " taskAttempt: " + taskAttemptNumber +
+            " maxFailedAttempt: " + maxFailedAttempt +
+            " rollNumber: " + rollNumber + 
+            " randomFailProbability " + randomFailProbability;
         LOG.info(msg);
-        throwException(msg);
+        if (rollNumber < randomFailProbability) {
+          // fail the attempt
+          msg = "FailingProcessor: rollNumber < randomFailProbability. Do fail.";
+          LOG.info(msg);
+          throwException(msg);
+        }
       }
     }
-    
+      
     if (inputs.entrySet().size() > 0) {
         String msg = "Reading input of current FailingProcessor: " + getContext().getUniqueIdentifier() + 
             " dag: " + getContext().getDAGName() +