You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/11/17 20:22:47 UTC
[38/50] [abbrv] tez git commit: TEZ-1642. TestAMRecovery sometimes
fail. (zjffdu)
TEZ-1642. TestAMRecovery sometimes fail. (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6dda1d77
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6dda1d77
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6dda1d77
Branch: refs/heads/TEZ-8
Commit: 6dda1d77bcd6460780144ae2aab905c2a255e94a
Parents: a7d55e4
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Nov 11 09:08:04 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Nov 11 09:08:04 2014 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/test/TestAMRecovery.java | 62 ++++++++++++++------
2 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/6dda1d77/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57a0c7f..57ca0a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,7 @@ ALL CHANGES:
TEZ-1749. Increase test timeout for TestLocalMode.testMultipleClientsWithSession
TEZ-1750. Add a DAGScheduler which schedules tasks only when sources have been scheduled.
TEZ-1761. TestRecoveryParser::testGetLastInProgressDAG fails in similar manner to TEZ-1686.
+ TEZ-1642. TestAMRecovery sometimes fail.
Release 0.5.2: 2014-11-07
http://git-wip-us.apache.org/repos/asf/tez/blob/6dda1d77/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 42d6f5c..867c7ab 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -104,7 +104,7 @@ public class TestAMRecovery {
}
if (miniTezCluster == null) {
miniTezCluster =
- new MiniTezCluster(TestAMRecovery.class.getName(), 1, 1, 1);
+ new MiniTezCluster(TestAMRecovery.class.getName(), 2, 1, 1);
Configuration miniTezconf = new Configuration(conf);
miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, MAX_AM_ATTEMPT);
miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
@@ -127,6 +127,7 @@ public class TestAMRecovery {
try {
LOG.info("Stopping MiniTezCluster");
miniTezCluster.stop();
+ miniTezCluster = null;
} catch (Exception e) {
e.printStackTrace();
}
@@ -165,6 +166,7 @@ public class TestAMRecovery {
tezConf.setBoolean(
RecoveryService.TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
true);
+ tezConf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
tezSession = TezClient.create("TestDAGRecovery", tezConf);
tezSession.start();
}
@@ -182,6 +184,16 @@ public class TestAMRecovery {
tezSession = null;
}
+ private void printHistoryEvents(List<HistoryEvent> historyEvents, int attemptId) {
+ LOG.info("RecoveryLogs from attempt:" + attemptId);
+ for(HistoryEvent historyEvent : historyEvents) {
+ LOG.info("Parsed event from recovery stream"
+ + ", eventType=" + historyEvent.getEventType()
+ + ", event=" + historyEvent);
+ }
+ LOG.info("");
+ }
+
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is running. History flush happens. AM dies. Once AM is recovered, task 0 is
@@ -195,14 +207,16 @@ public class TestAMRecovery {
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.BROADCAST, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents2, 2);
+
assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
- List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
- List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -227,14 +241,16 @@ public class TestAMRecovery {
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.BROADCAST, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents2, 2);
+
assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
- List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
- List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -259,14 +275,16 @@ public class TestAMRecovery {
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents2, 2);
+
assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
- List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
- List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -292,14 +310,16 @@ public class TestAMRecovery {
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents2, 2);
+
assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
- List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
- List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -325,14 +345,16 @@ public class TestAMRecovery {
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents2, 2);
+
assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
- List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
- List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -358,14 +380,16 @@ public class TestAMRecovery {
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+ List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
+ List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents2, 2);
+
assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
- List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
- List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());