You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zj...@apache.org on 2015/01/22 02:56:01 UTC
[1/2] tez git commit: TEZ-1642. TestAMRecovery sometimes fail (zjffdu)
Repository: tez
Updated Branches:
refs/heads/branch-0.5 6607e9448 -> 119003eff
TEZ-1642. TestAMRecovery sometimes fail (zjffdu)
(cherry picked from commit 1dd725f7976de91a93f7cf25fb922278c4993af8)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c033965e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c033965e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c033965e
Branch: refs/heads/branch-0.5
Commit: c033965e7ae582c91884c86e0bc5edadb75c68ea
Parents: 6607e94
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Dec 19 10:43:04 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Jan 22 09:45:53 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/test/TestAMRecovery.java | 181 +++++++++++--------
2 files changed, 107 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/c033965e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 81ab0e2..a21afa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1642. TestAMRecovery sometimes fail.
TEZ-1931. Publish tez version info to Timeline.
TEZ-1942. Number of tasks show in Tez UI with auto-reduce parallelism is misleading.
TEZ-1962. Fix a thread leak in LocalMode.
http://git-wip-us.apache.org/repos/asf/tez/blob/c033965e/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 42d6f5c..8c3eff4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -47,12 +47,15 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -65,7 +68,9 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.junit.After;
import org.junit.AfterClass;
@@ -74,6 +79,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class TestAMRecovery {
private static final Log LOG = LogFactory.getLog(TestAMRecovery.class);
@@ -127,6 +134,7 @@ public class TestAMRecovery {
try {
LOG.info("Stopping MiniTezCluster");
miniTezCluster.stop();
+ miniTezCluster = null;
} catch (Exception e) {
e.printStackTrace();
}
@@ -184,7 +192,7 @@ public class TestAMRecovery {
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
- * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+ * is not started. History flush happens. AM dies. Once AM is recovered, task 0 is
* not re-run. Task 1 is re-run. (Broadcast)
*
* @throws Exception
@@ -192,17 +200,16 @@ public class TestAMRecovery {
@Test(timeout = 120000)
public void testVertexPartiallyFinished_Broadcast() throws Exception {
DAG dag =
- createDAG(ControlledInputReadyVertexManager.class,
+ createDAG(ControlledImmediateStartVertexManager.class,
DataMovementType.BROADCAST, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
- assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
- assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -224,17 +231,16 @@ public class TestAMRecovery {
@Test(timeout = 120000)
public void testVertexCompletelyFinished_Broadcast() throws Exception {
DAG dag =
- createDAG(ControlledInputReadyVertexManager.class,
+ createDAG(ControlledImmediateStartVertexManager.class,
DataMovementType.BROADCAST, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
- assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
- assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -248,7 +254,7 @@ public class TestAMRecovery {
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
- * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+ * is not started. History flush happens. AM dies. Once AM is recovered, task 0 is
* not re-run. Task 1 is re-run. (ONE_TO_ONE)
*
* @throws Exception
@@ -259,14 +265,13 @@ public class TestAMRecovery {
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
- assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
- assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -292,14 +297,13 @@ public class TestAMRecovery {
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
- assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
- assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -314,7 +318,7 @@ public class TestAMRecovery {
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
- * is running. History flush happens. AM dies. Once AM is recovered, task 0 is
+ * is not started. History flush happens. AM dies. Once AM is recovered, task 0 is
* not re-run. Task 1 is re-run. (SCATTER_GATHER)
*
* @throws Exception
@@ -325,14 +329,13 @@ public class TestAMRecovery {
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
- assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
- assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -358,14 +361,13 @@ public class TestAMRecovery {
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
- assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
- assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
-
+ printHistoryEvents(historyEvents1, 1);
+ printHistoryEvents(historyEvents1, 2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
@@ -407,6 +409,7 @@ public class TestAMRecovery {
/**
* v1 --> v2 <br>
+ * v1 has a customized VM to control whether to schedule only one second task when it is partiallyFinished test case.
* v2 has a customized VM which could control when to kill AM
*
* @param vertexManagerClass
@@ -425,6 +428,9 @@ public class TestAMRecovery {
DAG dag = DAG.create("dag");
UserPayload payload = UserPayload.create(null);
Vertex v1 = Vertex.create("v1", MyProcessor.getProcDesc(), 2);
+ v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
+ ScheduleControlledVertexManager.class.getName()).setUserPayload(
+ TezUtils.createUserPayloadFromConf(tezConf)));
Vertex v2 = Vertex.create("v2", DoNothingProcessor.getProcDesc(), 2);
v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
vertexManagerClass.getName()).setUserPayload(
@@ -471,6 +477,16 @@ public class TestAMRecovery {
return RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath));
}
+ private void printHistoryEvents(List<HistoryEvent> historyEvents, int attemptId) {
+ LOG.info("RecoveryLogs from attempt:" + attemptId);
+ for(HistoryEvent historyEvent : historyEvents) {
+ LOG.info("Parsed event from recovery stream"
+ + ", eventType=" + historyEvent.getEventType()
+ + ", event=" + historyEvent);
+ }
+ LOG.info("");
+ }
+
public static class ControlledInputReadyVertexManager extends
InputReadyVertexManager {
@@ -496,29 +512,16 @@ public class TestAMRecovery {
super.onSourceTaskCompleted(srcVertexName, taskId);
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
- if (taskId == 0) {
+ if (taskId.intValue() == 0) {
System.exit(-1);
}
} else {
- if (taskId == 1) {
+ if (taskId.intValue() == 1) {
System.exit(-1);
}
}
}
}
-
- @Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
- // sleep for 1 seconds to delay the running of task in v2.
- // this could keep the case that task of v1 is partial finished or completely
- // finished, and at the same time the task of v2 is not started
- try {
- Thread.sleep(1*1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- super.onVertexStarted(completions);
- }
}
public static class ControlledShuffleVertexManager extends
@@ -543,33 +546,19 @@ public class TestAMRecovery {
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
- int dagAttemptNumber = getContext().getDAGAttemptNumber();
super.onSourceTaskCompleted(srcVertexName, taskId);
- if (dagAttemptNumber == 1) {
+ if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
- if (taskId == 0) {
+ if (taskId.intValue() == 0) {
System.exit(-1);
}
} else {
- if (taskId == 1) {
+ if (taskId.intValue() == 1) {
System.exit(-1);
}
}
}
}
-
- @Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
- // sleep for 1 seconds to delay the running of task in v2.
- // this could keep the case that task of v1 is partial finished or completely
- // finished, and at the same time the task of v2 is not started
- try {
- Thread.sleep(1*1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- super.onVertexStarted(completions);
- }
}
public static class ControlledImmediateStartVertexManager extends
@@ -598,28 +587,76 @@ public class TestAMRecovery {
super.onSourceTaskCompleted(srcVertexName, taskId);
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
- if (taskId == 0) {
+ if (taskId.intValue() == 0) {
System.exit(-1);
}
} else {
- if (taskId == 1) {
+ if (taskId.intValue() == 1) {
System.exit(-1);
}
}
}
}
+ }
+
+
+ /**
+ * VertexManager which control schedule only one task when it is test case of partially-finished.
+ *
+ */
+ public static class ScheduleControlledVertexManager extends VertexManagerPlugin {
+
+ private Configuration conf;
+
+ public ScheduleControlledVertexManager(VertexManagerPluginContext context) {
+ super(context);
+ }
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
- // sleep for 1 seconds to delay the running of task in v2.
- // this could keep the case that task of v1 is partial finished or completely
- // finished, and at the same time the task of v2 is not started
+ public void initialize() {
try {
- Thread.sleep(1*1000);
- } catch (InterruptedException e) {
+ conf =
+ TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ } catch (IOException e) {
e.printStackTrace();
}
- super.onVertexStarted(completions);
+ }
+
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions)
+ throws Exception {
+ if (getContext().getDAGAttemptNumber() == 1) {
+ // only schedule one task if it is partiallyFinished case
+ if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
+ getContext().scheduleVertexTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+ return ;
+ }
+ }
+ // schedule all tasks when it is not partiallyFinished
+ int taskNum = getContext().getVertexNumTasks(getContext().getVertexName());
+ List<TaskWithLocationHint> taskWithLocationHints = new ArrayList<TaskWithLocationHint>();
+ for (int i=0;i<taskNum;++i) {
+ taskWithLocationHints.add(new TaskWithLocationHint(i, null));
+ }
+ getContext().scheduleVertexTasks(taskWithLocationHints);
+ }
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId)
+ throws Exception {
+
+ }
+
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
+ throws Exception {
+
+ }
+
+ @Override
+ public void onRootVertexInitialized(String inputName,
+ InputDescriptor inputDescriptor, List<Event> events) throws Exception {
+
}
}
@@ -663,11 +700,6 @@ public class TestAMRecovery {
Counter_1,
}
- /**
- * Do nothing if it is in task 0, sleep 3 seconds for other tasks. This enable
- * us to kill AM in VM when some tasks are still running.
- *
- */
public static class MyProcessor extends SimpleProcessor {
public MyProcessor(ProcessorContext context) {
@@ -677,12 +709,6 @@ public class TestAMRecovery {
@Override
public void run() throws Exception {
getContext().getCounters().findCounter(TestCounter.Counter_1).increment(1);
- if (getContext().getTaskIndex() == 0) {
- // keep task_0 running for 1 seconds to wait for task_1 start running
- Thread.sleep(1 * 1000);;
- } else {
- Thread.sleep(3 * 1000);
- }
}
public static ProcessorDescriptor getProcDesc() {
@@ -698,6 +724,11 @@ public class TestAMRecovery {
@Override
public void run() throws Exception {
+ // Sleep 3 second in vertex2 to avoid that vertex2 completed
+ // before vertex2 get the SourceVertexTaskAttemptCompletedEvent.
+ // SourceVertexTaskAttemptCompletedEvent will been ingored if vertex in SUCCEEDED,
+ // so AM won't been killed in the VM of vertex2
+ Thread.sleep(3000);
}
public static ProcessorDescriptor getProcDesc() {
[2/2] tez git commit: TEZ-1934. TestAMRecovery may fail due to the
execution order is not determined. (zjffdu)
Posted by zj...@apache.org.
TEZ-1934. TestAMRecovery may fail due to the execution order is not determined. (zjffdu)
(cherry picked from commit 880d4f38f4ccb2de987a7c9ca4cbbbe67b0a833e)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/119003ef
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/119003ef
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/119003ef
Branch: refs/heads/branch-0.5
Commit: 119003effd4acfcba6f26397fddc8f4d8b0f4671
Parents: c033965
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Jan 16 09:47:46 2015 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Thu Jan 22 09:48:41 2015 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/test/TestAMRecovery.java | 18 ++++++++++++------
2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/119003ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a21afa0..84a0a4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
Release 0.5.4: Unreleased
ALL CHANGES:
+ TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
TEZ-1642. TestAMRecovery sometimes fail.
TEZ-1931. Publish tez version info to Timeline.
TEZ-1942. Number of tasks show in Tez UI with auto-reduce parallelism is misleading.
http://git-wip-us.apache.org/repos/asf/tez/blob/119003ef/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 8c3eff4..b8195a4 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -491,6 +491,7 @@ public class TestAMRecovery {
InputReadyVertexManager {
private Configuration conf;
+ private int completedTaskNum = 0;
public ControlledInputReadyVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -510,13 +511,14 @@ public class TestAMRecovery {
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
super.onSourceTaskCompleted(srcVertexName, taskId);
+ completedTaskNum ++;
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
- if (taskId.intValue() == 0) {
+ if (completedTaskNum == 1) {
System.exit(-1);
}
} else {
- if (taskId.intValue() == 1) {
+ if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
System.exit(-1);
}
}
@@ -528,6 +530,7 @@ public class TestAMRecovery {
ShuffleVertexManager {
private Configuration conf;
+ private int completedTaskNum = 0;
public ControlledShuffleVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -547,13 +550,14 @@ public class TestAMRecovery {
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
super.onSourceTaskCompleted(srcVertexName, taskId);
+ completedTaskNum ++;
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
- if (taskId.intValue() == 0) {
+ if (completedTaskNum == 1) {
System.exit(-1);
}
} else {
- if (taskId.intValue() == 1) {
+ if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
System.exit(-1);
}
}
@@ -565,6 +569,7 @@ public class TestAMRecovery {
ImmediateStartVertexManager {
private Configuration conf;
+ private int completedTaskNum = 0;
public ControlledImmediateStartVertexManager(
VertexManagerPluginContext context) {
@@ -585,13 +590,14 @@ public class TestAMRecovery {
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
super.onSourceTaskCompleted(srcVertexName, taskId);
+ completedTaskNum ++;
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
- if (taskId.intValue() == 0) {
+ if (completedTaskNum == 1) {
System.exit(-1);
}
} else {
- if (taskId.intValue() == 1) {
+ if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
System.exit(-1);
}
}