You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/25 19:35:49 UTC

tez git commit: TEZ-2398. Flaky test: TestFaultTolerance (bikas) (cherry picked from commit 406721ab17b58e29e5bf3585d556700c2ef04f05)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 91fd5a607 -> 7dac26e75


TEZ-2398. Flaky test: TestFaultTolerance (bikas)
(cherry picked from commit 406721ab17b58e29e5bf3585d556700c2ef04f05)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7dac26e7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7dac26e7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7dac26e7

Branch: refs/heads/branch-0.7
Commit: 7dac26e75f094f0486d4ce4390885dc468703799
Parents: 91fd5a6
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 25 10:30:53 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 25 10:34:53 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                            |  1 +
 .../java/org/apache/tez/test/TestFaultTolerance.java   |  5 +----
 .../src/test/java/org/apache/tez/test/TestInput.java   | 13 +++++++++----
 3 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7dac26e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 74a83f1..fa2e62d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES
+  TEZ-2398. Flaky test: TestFaultTolerance
   TEZ-2808. Race condition between preemption and container assignment
   TEZ-2853. Tez UI: task attempt page is coming empty
   TEZ-2716. DefaultSorter.isRleNeeded not thread safe

http://git-wip-us.apache.org/repos/asf/tez/blob/7dac26e7/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 0d27032..ec89c4b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -74,7 +74,7 @@ public class TestFaultTolerance {
     }
     if (miniTezCluster == null) {
       miniTezCluster = new MiniTezCluster(TestFaultTolerance.class.getName(),
-          4, 1, 1);
+          3, 1, 1);
       Configuration miniTezconf = new Configuration(conf);
       miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
       miniTezCluster.init(miniTezconf);
@@ -242,9 +242,6 @@ public class TestFaultTolerance {
             TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0,1");
     testConf.setInt(TestProcessor.getVertexConfName(
             TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 5);
-    //v2 task0 attempt 0 succeeds instantly.
-    testConf.setInt(TestProcessor.getVertexConfName(
-            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 3);
     
     DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);

http://git-wip-us.apache.org/repos/asf/tez/blob/7dac26e7/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index fb42c8e..ec27a45 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -75,7 +75,6 @@ public class TestInput extends AbstractLogicalInput {
   Set<Integer> failingInputIndices = Sets.newHashSet();
   Integer failAll = new Integer(-1);
   int[] inputValues;
-  AtomicInteger numEventsReceived = new AtomicInteger(0);
   
   /**
    * Enable failure for this logical input
@@ -193,7 +192,6 @@ public class TestInput extends AbstractLogicalInput {
                 LOG.info("Failing input: " + msg);
               }
             }
-            int numEvents = numEventsReceived.get();
             getContext().sendEvents(events);
             if (doFailAndExit) {
               String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier();
@@ -201,7 +199,15 @@ public class TestInput extends AbstractLogicalInput {
               throwException(msg);
             } else {
               try {
-                while (numEvents == numEventsReceived.get()) {
+                // keep sending input read error until we receive the new input
+                // this check breaks the loop when we see a new input version
+                // thus, when multiple input versions arrive, this methods gets triggered
+                // for each version via wait-notify. But all events may have been processed in 
+                // handleEvents() before the code reaches this point. Having this loop, makes 
+                // it quickly exit for an older version if a newer version has been seen. 
+                // however, if a newer version is not seen then it keeps sending input error 
+                // indefinitely, by design.
+                while (lastInputReadyValue == inputReady.get()) {
                   // keep sending events
                   Thread.sleep(500);
                   getContext().sendEvents(events);
@@ -341,7 +347,6 @@ public class TestInput extends AbstractLogicalInput {
   @Override
   public void handleEvents(List<Event> inputEvents) throws Exception {
     for (Event event : inputEvents) {
-      numEventsReceived.incrementAndGet();
       if (event instanceof DataMovementEvent) {
         DataMovementEvent dmEvent = (DataMovementEvent) event;
         numCompletedInputs++;