You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/12/10 04:30:27 UTC
[17/27] tez git commit: TEZ-1060 Add randomness to fault tolerance
tests
TEZ-1060 Add randomness to fault tolerance tests
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ac26ade4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ac26ade4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ac26ade4
Branch: refs/heads/TEZ-8
Commit: ac26ade401d49a5c772e9f85312f1c5b5f952e8d
Parents: 6b53107
Author: Tassapol Athiapinya <ta...@hortonworks.com>
Authored: Wed Dec 3 16:35:20 2014 -0800
Committer: Tassapol Athiapinya <ta...@hortonworks.com>
Committed: Wed Dec 3 16:35:20 2014 -0800
----------------------------------------------------------------------
.../org/apache/tez/test/TestFaultTolerance.java | 20 ++-
.../java/org/apache/tez/test/TestInput.java | 152 ++++++++++++-------
.../java/org/apache/tez/test/TestProcessor.java | 65 ++++++--
3 files changed, 174 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ac26ade4/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 1c1e846..c834dee 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -713,5 +713,23 @@ public class TestFaultTolerance {
DAG dag = SimpleTestDAG.createDAG("testTwoTasksHaveInputFailuresSuccess", testConf);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
-
+
+ @Test (timeout=60000)
+ public void testRandomFailingTasks() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setBoolean(TestProcessor.TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL, true);
+ testConf.setFloat(TestProcessor.TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY, 0.5f);
+ DAG dag = SixLevelsFailingDAG.createDAG("testRandomFailingTasks", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
+ @Test (timeout=60000)
+ public void testRandomFailingInputs() throws Exception {
+ Configuration testConf = new Configuration(false);
+ testConf.setBoolean(TestInput.TEZ_FAILING_INPUT_DO_RANDOM_FAIL, true);
+ testConf.setFloat(TestInput.TEZ_FAILING_INPUT_RANDOM_FAIL_PROBABILITY, 0.5f);
+ DAG dag = SixLevelsFailingDAG.createDAG("testRandomFailingInputs", testConf);
+ runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ac26ade4/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 465dd9c..0050961 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.Event;
@@ -65,6 +66,8 @@ public class TestInput extends AbstractLogicalInput {
int failingInputUpto = 0;
boolean doFail = false;
+ boolean doRandomFail = false;
+ float randomFailProbability = 0.0f;
boolean doFailAndExit = false;
Set<Integer> failingTaskIndices = Sets.newHashSet();
Set<Integer> failingTaskAttempts = Sets.newHashSet();
@@ -78,6 +81,16 @@ public class TestInput extends AbstractLogicalInput {
public static String TEZ_FAILING_INPUT_DO_FAIL =
"tez.failing-input.do-fail";
/**
+ * Enable failure for this logical input. The config is set per DAG.
+ */
+ public static String TEZ_FAILING_INPUT_DO_RANDOM_FAIL =
+ "tez.failing-input.do-random-fail";
+ /**
+ * Probability to random fail an input. Range is 0 to 1. The number is set per DAG.
+ */
+ public static String TEZ_FAILING_INPUT_RANDOM_FAIL_PROBABILITY =
+ "tez.failing-input.random-fail-probability";
+ /**
* Logical input will exit (and cause task failure) after reporting failure to
* read.
*/
@@ -146,66 +159,96 @@ public class TestInput extends AbstractLogicalInput {
lastInputReadyValue = inputReady.get();
LOG.info("Done for inputReady: " + lastInputReadyValue);
}
- if (doFail) {
- if (
- (failingTaskIndices.contains(failAll) ||
- failingTaskIndices.contains(getContext().getTaskIndex())) &&
- (failingTaskAttempts.contains(failAll) ||
- failingTaskAttempts.contains(getContext().getTaskAttemptNumber())) &&
- (lastInputReadyValue <= failingInputUpto)) {
- List<Event> events = Lists.newLinkedList();
- if (failingInputIndices.contains(failAll)) {
- for (int i=0; i<getNumPhysicalInputs(); ++i) {
- String msg = ("FailingInput: " + getContext().getUniqueIdentifier() +
- " index: " + i + " version: " + lastInputReadyValue);
- events.add(InputReadErrorEvent.create(msg, i, lastInputReadyValue));
- LOG.info("Failing input: " + msg);
- }
- } else {
- for (Integer index : failingInputIndices) {
- if (index.intValue() >= getNumPhysicalInputs()) {
- throwException("InputIndex: " + index.intValue() +
- " should be less than numInputs: " + getNumPhysicalInputs());
+ if (!doRandomFail) {
+ // not random fail
+ if (doFail) {
+ if (
+ (failingTaskIndices.contains(failAll) ||
+ failingTaskIndices.contains(getContext().getTaskIndex())) &&
+ (failingTaskAttempts.contains(failAll) ||
+ failingTaskAttempts.contains(getContext().getTaskAttemptNumber())) &&
+ (lastInputReadyValue <= failingInputUpto)) {
+ List<Event> events = Lists.newLinkedList();
+ if (failingInputIndices.contains(failAll)) {
+ for (int i=0; i<getNumPhysicalInputs(); ++i) {
+ String msg = ("FailingInput: " + getContext().getUniqueIdentifier() +
+ " index: " + i + " version: " + lastInputReadyValue);
+ events.add(InputReadErrorEvent.create(msg, i, lastInputReadyValue));
+ LOG.info("Failing input: " + msg);
}
- if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
- continue; // dont fail a previous version now.
+ } else {
+ for (Integer index : failingInputIndices) {
+ if (index.intValue() >= getNumPhysicalInputs()) {
+ throwException("InputIndex: " + index.intValue() +
+ " should be less than numInputs: " + getNumPhysicalInputs());
+ }
+ if (completedInputVersion[index.intValue()] < lastInputReadyValue) {
+ continue; // dont fail a previous version now.
+ }
+ String msg = ("FailingInput: " + getContext().getUniqueIdentifier() +
+ " index: " + index.intValue() + " version: " + lastInputReadyValue);
+ events.add(InputReadErrorEvent.create(msg, index.intValue(), lastInputReadyValue));
+ LOG.info("Failing input: " + msg);
}
- String msg = ("FailingInput: " + getContext().getUniqueIdentifier() +
- " index: " + index.intValue() + " version: " + lastInputReadyValue);
- events.add(InputReadErrorEvent.create(msg, index.intValue(), lastInputReadyValue));
- LOG.info("Failing input: " + msg);
}
- }
- getContext().sendEvents(events);
- if (doFailAndExit) {
- String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
- LOG.info(msg);
- throwException(msg);
- } else {
- done = false;
- }
- } else if ((failingTaskIndices.contains(failAll) ||
- failingTaskIndices.contains(getContext().getTaskIndex()))){
- boolean previousAttemptReadFailed = false;
- if (failingTaskAttempts.contains(failAll)) {
- previousAttemptReadFailed = true;
- } else {
- for (int i=0 ; i<getContext().getTaskAttemptNumber(); ++i) {
- if (failingTaskAttempts.contains(new Integer(i))) {
- previousAttemptReadFailed = true;
- break;
+ getContext().sendEvents(events);
+ if (doFailAndExit) {
+ String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
+ LOG.info(msg);
+ throwException(msg);
+ } else {
+ done = false;
+ }
+ } else if ((failingTaskIndices.contains(failAll) ||
+ failingTaskIndices.contains(getContext().getTaskIndex()))){
+ boolean previousAttemptReadFailed = false;
+ if (failingTaskAttempts.contains(failAll)) {
+ previousAttemptReadFailed = true;
+ } else {
+ for (int i=0 ; i<getContext().getTaskAttemptNumber(); ++i) {
+ if (failingTaskAttempts.contains(new Integer(i))) {
+ previousAttemptReadFailed = true;
+ break;
+ }
}
}
+ if (previousAttemptReadFailed &&
+ (lastInputReadyValue <= failingInputUpto)) {
+ // if any previous attempt has failed then dont be done when we see
+ // a previously failed input
+ LOG.info("Previous task attempt failed and input version less than failing upto version");
+ done = false;
+ }
}
- if (previousAttemptReadFailed &&
- (lastInputReadyValue <= failingInputUpto)) {
- // if any previous attempt has failed then dont be done when we see
- // a previously failed input
- LOG.info("Previous task attempt failed and input version less than failing upto version");
- done = false;
+
+ }
+ } else {
+ // random fail
+ List<Event> events = Lists.newLinkedList();
+ for (int index=0; index<getNumPhysicalInputs(); ++index) {
+ // completedInputVersion[index] has DataMovementEvent.getVersion() value.
+ int sourceInputVersion = completedInputVersion[index];
+ int maxFailedAttempt = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+ if (sourceInputVersion < maxFailedAttempt - 1) {
+ float rollNumber = (float) Math.random();
+ String msg = "FailingInput random fail turned on." +
+ "Do a roll:" + getContext().getUniqueIdentifier() +
+ " index: " + index + " version: " + sourceInputVersion +
+ " rollNumber: " + rollNumber +
+ " randomFailProbability " + randomFailProbability;
+ LOG.info(msg);
+ if (rollNumber < randomFailProbability) {
+ // fail the source input
+ msg = "FailingInput: rollNumber < randomFailProbability. Do fail." +
+ getContext().getUniqueIdentifier() +
+ " index: " + index + " version: " + sourceInputVersion;
+ LOG.info(msg);
+ events.add(InputReadErrorEvent.create(msg, index, sourceInputVersion));
+ }
}
}
-
+ getContext().sendEvents(events);
}
} while (!done);
@@ -265,6 +308,11 @@ public class TestInput extends AbstractLogicalInput {
failingInputIndices.add(Integer.valueOf(failingIndex));
}
}
+ doRandomFail = conf
+ .getBoolean(TEZ_FAILING_INPUT_DO_RANDOM_FAIL, false);
+ randomFailProbability = conf.getFloat(TEZ_FAILING_INPUT_RANDOM_FAIL_PROBABILITY, 0.0f);
+ LOG.info("doRandomFail: " + doRandomFail);
+ LOG.info("randomFailProbability: " + randomFailProbability);
}
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ac26ade4/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index ed37ea9..90a4f13 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
@@ -51,6 +52,8 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
Configuration conf;
boolean doFail = false;
+ boolean doRandomFail = false;
+ float randomFailProbability = 0.0f;
long sleepMs;
Set<Integer> failingTaskIndices = Sets.newHashSet();
int failingTaskAttemptUpto = 0;
@@ -65,6 +68,15 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
public static String TEZ_FAILING_PROCESSOR_DO_FAIL =
"tez.failing-processor.do-fail";
/**
+ * Enable random failure for all processors.
+ */
+ public static String TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL = "tez.failing-processor.do-random-fail";
+ /**
+ * Probability to random fail a task attempt. Range is 0 to 1. The number is set per DAG.
+ */
+ public static String TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY = "tez.failing-processor.random-fail-probability";
+
+ /**
* Time to sleep in the processor in milliseconds.
*/
public static String TEZ_FAILING_PROCESSOR_SLEEP_MS =
@@ -154,6 +166,12 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
LOG.info("Adding failing attempt : " + failingTaskAttemptUpto +
" dag: " + getContext().getDAGName());
}
+
+ doRandomFail = conf
+ .getBoolean(TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL, false);
+ randomFailProbability = conf.getFloat(TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY, 0.0f);
+ LOG.info("doRandomFail: " + doRandomFail);
+ LOG.info("randomFailProbability: " + randomFailProbability);
}
}
@@ -179,21 +197,48 @@ public class TestProcessor extends AbstractLogicalIOProcessor {
Thread.sleep(sleepMs);
- if (doFail) {
- if (
- (failingTaskIndices.contains(failAll) ||
- failingTaskIndices.contains(getContext().getTaskIndex())) &&
- (failingTaskAttemptUpto == failAll.intValue() ||
- failingTaskAttemptUpto >= getContext().getTaskAttemptNumber())) {
- String msg = "FailingProcessor: " + getContext().getUniqueIdentifier() +
+ if (!doRandomFail) {
+ // not random fail
+ if (doFail) {
+ if (
+ (failingTaskIndices.contains(failAll) ||
+ failingTaskIndices.contains(getContext().getTaskIndex())) &&
+ (failingTaskAttemptUpto == failAll.intValue() ||
+ failingTaskAttemptUpto >= getContext().getTaskAttemptNumber())) {
+ String msg = "FailingProcessor: " + getContext().getUniqueIdentifier() +
+ " dag: " + getContext().getDAGName() +
+ " taskIndex: " + getContext().getTaskIndex() +
+ " taskAttempt: " + getContext().getTaskAttemptNumber();
+ LOG.info(msg);
+ throwException(msg);
+ }
+ }
+ } else {
+ // random fail
+ // If task attempt number is below limit, try to randomly fail the attempt.
+ int taskAttemptNumber = getContext().getTaskAttemptNumber();
+ int maxFailedAttempt = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
+ TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+ if (taskAttemptNumber < maxFailedAttempt - 1) {
+ float rollNumber = (float) Math.random();
+ String msg = "FailingProcessor random fail turned on." +
+ " Do a roll: " + getContext().getUniqueIdentifier() +
" dag: " + getContext().getDAGName() +
" taskIndex: " + getContext().getTaskIndex() +
- " taskAttempt: " + getContext().getTaskAttemptNumber();
+ " taskAttempt: " + taskAttemptNumber +
+ " maxFailedAttempt: " + maxFailedAttempt +
+ " rollNumber: " + rollNumber +
+ " randomFailProbability " + randomFailProbability;
LOG.info(msg);
- throwException(msg);
+ if (rollNumber < randomFailProbability) {
+ // fail the attempt
+ msg = "FailingProcessor: rollNumber < randomFailProbability. Do fail.";
+ LOG.info(msg);
+ throwException(msg);
+ }
}
}
-
+
if (inputs.entrySet().size() > 0) {
String msg = "Reading input of current FailingProcessor: " + getContext().getUniqueIdentifier() +
" dag: " + getContext().getDAGName() +