You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:47 UTC

[38/50] [abbrv] tez git commit: TEZ-1642. TestAMRecovery sometimes fail. (zjffdu)

TEZ-1642. TestAMRecovery sometimes fail. (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6dda1d77
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6dda1d77
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6dda1d77

Branch: refs/heads/TEZ-8
Commit: 6dda1d77bcd6460780144ae2aab905c2a255e94a
Parents: a7d55e4
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Nov 11 09:08:04 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Nov 11 09:08:04 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/test/TestAMRecovery.java     | 62 ++++++++++++++------
 2 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6dda1d77/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57a0c7f..57ca0a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@ ALL CHANGES:
   TEZ-1749. Increase test timeout for TestLocalMode.testMultipleClientsWithSession
   TEZ-1750. Add a DAGScheduler which schedules tasks only when sources have been scheduled.
   TEZ-1761. TestRecoveryParser::testGetLastInProgressDAG fails in similar manner to TEZ-1686.
+  TEZ-1642. TestAMRecovery sometimes fail.
 
 Release 0.5.2: 2014-11-07
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6dda1d77/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 42d6f5c..867c7ab 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -104,7 +104,7 @@ public class TestAMRecovery {
     }
     if (miniTezCluster == null) {
       miniTezCluster =
-          new MiniTezCluster(TestAMRecovery.class.getName(), 1, 1, 1);
+          new MiniTezCluster(TestAMRecovery.class.getName(), 2, 1, 1);
       Configuration miniTezconf = new Configuration(conf);
       miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, MAX_AM_ATTEMPT);
       miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
@@ -127,6 +127,7 @@ public class TestAMRecovery {
       try {
         LOG.info("Stopping MiniTezCluster");
         miniTezCluster.stop();
+        miniTezCluster = null;
       } catch (Exception e) {
         e.printStackTrace();
       }
@@ -165,6 +166,7 @@ public class TestAMRecovery {
     tezConf.setBoolean(
         RecoveryService.TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
         true);
+    tezConf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
   }
@@ -182,6 +184,16 @@ public class TestAMRecovery {
     tezSession = null;
   }
 
+  private void printHistoryEvents(List<HistoryEvent> historyEvents, int attemptId) {
+    LOG.info("RecoveryLogs from attempt:" + attemptId);
+    for(HistoryEvent historyEvent : historyEvents) {
+      LOG.info("Parsed event from recovery stream"
+          + ", eventType=" + historyEvent.getEventType()
+          + ", event=" + historyEvent);
+    }
+    LOG.info("");
+  }
+
   /**
    * Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
    * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
@@ -195,14 +207,16 @@ public class TestAMRecovery {
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.BROADCAST, true);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents2, 2);
+
     assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
     assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
-    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
-    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -227,14 +241,16 @@ public class TestAMRecovery {
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.BROADCAST, false);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents2, 2);
+
     assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
     assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
-    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
-    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -259,14 +275,16 @@ public class TestAMRecovery {
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.ONE_TO_ONE, true);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents2, 2);
+
     assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
     assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
-    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
-    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -292,14 +310,16 @@ public class TestAMRecovery {
         createDAG(ControlledInputReadyVertexManager.class,
             DataMovementType.ONE_TO_ONE, false);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents2, 2);
+
     assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
     assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
-    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
-    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -325,14 +345,16 @@ public class TestAMRecovery {
         createDAG(ControlledShuffleVertexManager.class,
             DataMovementType.SCATTER_GATHER, true);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents2, 2);
+
     assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
     assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
-    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
-    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -358,14 +380,16 @@ public class TestAMRecovery {
         createDAG(ControlledShuffleVertexManager.class,
             DataMovementType.SCATTER_GATHER, false);
     TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+    printHistoryEvents(historyEvents1, 1);
+    printHistoryEvents(historyEvents2, 2);
+
     assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
     assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
     assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
     assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
 
-    List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
-    List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
     // task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
     // attempt 1
     assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());