You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/02/18 10:58:15 UTC
[21/24] tez git commit: TEZ-3029. Add an onError method to service
plugin contexts. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 28670ff..fd56495 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -84,6 +84,7 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -711,11 +712,11 @@ public class TestCommit {
TaskState.SUCCEEDED));
Assert.assertEquals(VertexState.COMMITTING, v1.getState());
// kill dag which will trigger the vertex killed event
- dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+ dag.handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, null));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v1.getState());
Assert.assertTrue(v1.commitFutures.isEmpty());
- Assert.assertEquals(VertexTerminationCause.DAG_KILL,
+ Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED,
v1.getTerminationCause());
Assert.assertEquals(DAGState.KILLED, dag.getState());
Assert
@@ -1514,10 +1515,20 @@ public class TestCommit {
// Assert.assertEquals(0, v3OutputCommitter.abortCounter);
}
- // Kill dag while it is in COMMITTING in the case of
- // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true
+
@Test(timeout = 5000)
public void testDAGKilledWhileCommitting1_OnDAGSuccess() throws Exception {
+ _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testServiceErrorWhileCommitting1_OnDAGSuccess() throws Exception {
+ _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
+ // Kill dag while it is in COMMITTING in the case of
+ // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is true
+ private void _testDAGTerminatedWhileCommitting1_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
true);
setupDAG(createDAGPlan(true, true));
@@ -1534,14 +1545,14 @@ public class TestCommit {
v3.handle(new VertexEventTaskCompleted(v3.getTask(0).getTaskId(),
TaskState.SUCCEEDED));
waitUntil(dag, DAGState.COMMITTING);
- dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
- waitUntil(dag, DAGState.KILLED);
+ dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+ waitUntil(dag, terminationCause.getFinishedState());
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
Assert
- .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ .assertEquals(terminationCause, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
@@ -1569,10 +1580,20 @@ public class TestCommit {
Assert.assertEquals(1, v3OutputCommitter.abortCounter);
}
- // Kill dag while it is in COMMITTING in the case of
- // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false
+
@Test(timeout = 5000)
public void testDAGKilledWhileCommitting1_OnVertexSuccess() throws Exception {
+ _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testServiceErrorWhileCommitting1_OnVertexSuccess() throws Exception {
+ _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
+ // Kill dag while it is in COMMITTING in the case of
+ // TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS is false
+ private void _testDAGTerminatedWhileCommitting1_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
false);
setupDAG(createDAGPlan(true, true));
@@ -1596,15 +1617,15 @@ public class TestCommit {
v3OutputCommitter.unblockCommit();
// dag go to COMMITTING due to the pending commit of v12Out
waitUntil(dag, DAGState.COMMITTING);
- dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
- waitUntil(dag, DAGState.KILLED);
+ dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+ waitUntil(dag, terminationCause.getFinishedState());
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v3.getState());
- Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
Assert
- .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ .assertEquals(terminationCause, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 1);
historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
@@ -1631,9 +1652,18 @@ public class TestCommit {
Assert.assertEquals(1, v3OutputCommitter.abortCounter);
}
- // DAG killed while dag is still in RUNNING and vertex is in COMMITTING
@Test(timeout = 5000)
public void testDAGKilledWhileRunning_OnVertexSuccess() throws Exception {
+ _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testServiceErrorWhileRunning_OnVertexSuccess() throws Exception {
+ _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
+ // DAG killed while dag is still in RUNNING and vertex is in COMMITTING
+ private void _testDAGKilledWhileRunning_OnVertexSuccess(DAGTerminationCause terminationCause) throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
false);
setupDAG(createDAGPlan(true, true));
@@ -1652,17 +1682,17 @@ public class TestCommit {
Assert.assertEquals(VertexState.COMMITTING, v3.getState());
// dag is still in RUNNING because v3 has not completed
Assert.assertEquals(DAGState.RUNNING, dag.getState());
- dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
- waitUntil(dag, DAGState.KILLED);
+ dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+ waitUntil(dag, terminationCause.getFinishedState());
Assert.assertEquals(VertexState.SUCCEEDED, v1.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v2.getState());
Assert.assertEquals(VertexState.KILLED, v3.getState());
- Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause());
+ Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v3.getTerminationCause());
Assert.assertTrue(v3.commitFutures.isEmpty());
- Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
Assert
- .assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ .assertEquals(terminationCause, dag.getTerminationCause());
Assert.assertTrue(dag.commitFutures.isEmpty());
// commit uv12 may not have started, so can't verify the VertexGroupCommitStartedEvent
historyEventHandler.verifyVertexGroupCommitFinishedEvent("uv12", 0);
@@ -1903,10 +1933,19 @@ public class TestCommit {
Assert.assertEquals(1, v3OutputCommitter.abortCounter);
}
- // test commit will be canceled no matter it is started or still in the threadpool
- // ControlledThreadPoolExecutor is used for to not schedule the commits
@Test(timeout = 5000)
public void testCommitCanceled_OnDAGSuccess() throws Exception {
+ _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testCommitCanceled_OnDAGSuccess2() throws Exception {
+ _testCommitCanceled_OnDAGSuccess(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
+ // test commit will be canceled no matter it is started or still in the threadpool
+ // ControlledThreadPoolExecutor is used for to not schedule the commits
+ private void _testCommitCanceled_OnDAGSuccess(DAGTerminationCause terminationCause) throws Exception {
conf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
true);
setupDAG(createDAGPlan(true, true));
@@ -1931,10 +1970,10 @@ public class TestCommit {
// mean the commits have been submitted to ThreadPool
Assert.assertEquals(2, dag.commitFutures.size());
- dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
- waitUntil(dag, DAGState.KILLED);
+ dag.handle(new DAGEventTerminateDag(dag.getID(), terminationCause, null));
+ waitUntil(dag, terminationCause.getFinishedState());
- Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ Assert.assertEquals(terminationCause, dag.getTerminationCause());
// mean the commits have been canceled
Assert.assertTrue(dag.commitFutures.isEmpty());
historyEventHandler.verifyVertexGroupCommitStartedEvent("uv12", 0);
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 2158368..480e3cf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -21,7 +21,6 @@ package org.apache.tez.dag.app.dag.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -41,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.slf4j.Logger;
@@ -1641,8 +1641,7 @@ public class TestDAGImpl {
startDAG(dag);
dispatcher.await();
- dispatcher.getEventHandler().handle(
- new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null));
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
@@ -1654,9 +1653,18 @@ public class TestDAGImpl {
}
- @SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testKillRunningDAG() {
+ _testTerminateRunningDAG(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testServiceErrorRunningDAG() {
+ _testTerminateRunningDAG(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void _testTerminateRunningDAG(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
@@ -1674,7 +1682,7 @@ public class TestDAGImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
Assert.assertEquals(VertexState.RUNNING, v1.getState());
- dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
Assert.assertEquals(DAGState.TERMINATING, dag.getState());
@@ -1817,7 +1825,7 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.FAILED));
} else if (testState == DAGStatus.State.KILLED) {
- dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null));
} else if (testState == DAGStatus.State.ERROR) {
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagId, new LinkedList<URL>()));
} else {
@@ -1871,11 +1879,21 @@ public class TestDAGImpl {
}
}
+
+ @Test(timeout = 5000)
+ public void testDAGKill() {
+ _testDAGTerminate(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testDAGServiceError() {
+ _testDAGTerminate(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
// Couple of vertices succeed. DAG_KILLED processed, which causes the rest of the vertices to be
// marked as KILLED.
@SuppressWarnings("unchecked")
- @Test(timeout = 5000)
- public void testDAGKill() {
+ private void _testDAGTerminate(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
@@ -1887,10 +1905,10 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
- dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
- Assert.assertEquals(DAGState.KILLED, dag.getState());
- Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
+ Assert.assertEquals(terminationCause, dag.getTerminationCause());
Assert.assertEquals(2, dag.getSuccessfulVertices());
int killedCount = 0;
@@ -1902,16 +1920,25 @@ public class TestDAGImpl {
Assert.assertEquals(4, killedCount);
for (Vertex v : dag.getVertices().values()) {
- Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
+ Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause());
}
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
+ @Test(timeout = 5000)
+ public void testDAGKillVertexSuccessAfterTerminated() {
+ _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testDAGServiceErrorVertexSuccessAfterTerminated() {
+ _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
// Vertices succeed after a DAG kill has been processed. Should be ignored.
@SuppressWarnings("unchecked")
- @Test(timeout = 5000)
- public void testDAGKillVertexSuccessAfterKill() {
+ private void _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
@@ -1923,10 +1950,10 @@ public class TestDAGImpl {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
- dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
- Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
// Vertex SUCCESS gets processed after the DAG has reached the KILLED state. Should be ignored.
for (int i = 2; i < 6; ++i) {
@@ -1943,18 +1970,27 @@ public class TestDAGImpl {
}
Assert.assertEquals(4, killedCount);
- Assert.assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+ Assert.assertEquals(terminationCause, dag.getTerminationCause());
Assert.assertEquals(2, dag.getSuccessfulVertices());
for (Vertex v : dag.getVertices().values()) {
- Assert.assertEquals(VertexTerminationCause.DAG_KILL, v.getTerminationCause());
+ Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause());
}
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
- // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED
- @SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGKillPending() {
+ _testDAGKillPending(DAGTerminationCause.DAG_KILL);
+ }
+
+ @Test(timeout = 5000)
+ public void testDAGServiceErrorPending() {
+ _testDAGKillPending(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
+ }
+
+ // Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED
+ @SuppressWarnings("unchecked")
+ private void _testDAGKillPending(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
@@ -1972,17 +2008,17 @@ public class TestDAGImpl {
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
- dispatcher.getEventHandler().handle(new DAGEvent(dagId, DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
- Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.KILLED));
dispatcher.await();
- Assert.assertEquals(DAGState.KILLED, dag.getState());
+ Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(),
- VertexTerminationCause.DAG_KILL);
+ VertexTerminationCause.DAG_TERMINATED);
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 986f64d..659d099 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -143,7 +143,6 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
@@ -189,7 +188,6 @@ import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.EdgeManagerForTest;
import org.apache.tez.test.VertexManagerPluginForTest;
@@ -206,7 +204,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
@@ -2515,10 +2512,10 @@ public class TestVertexImpl {
private void killVertex(VertexImpl v) {
dispatcher.getEventHandler().handle(
- new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+ new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
- Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_KILL);
+ Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_TERMINATED);
}
private void startVertex(VertexImpl v,
@@ -3322,7 +3319,7 @@ public class TestVertexImpl {
StringUtils.join(v3.getDiagnostics(), ",").toLowerCase(Locale.ENGLISH);
assertTrue(diagnostics.contains(
"vertex received kill while in running state"));
- Assert.assertEquals(VertexTerminationCause.DAG_KILL, v3.getTerminationCause());
+ Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v3.getTerminationCause());
assertTrue(diagnostics.contains(v3.getTerminationCause().name().toLowerCase(Locale.ENGLISH)));
}
@@ -3334,7 +3331,7 @@ public class TestVertexImpl {
startVertex(v);
dispatcher.getEventHandler().handle(
- new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+ new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
@@ -3359,7 +3356,7 @@ public class TestVertexImpl {
startVertex(v);
dispatcher.getEventHandler().handle(
- new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL));
+ new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
dispatcher.await();
Assert.assertEquals(VertexState.KILLED, v.getState());
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
index 1f75afb..b3568eb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -52,15 +53,21 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
import org.apache.tez.dag.app.rm.ContainerLauncherStopRequestEvent;
+import org.apache.tez.dag.helpers.DagInfoImplForTest;
+import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.ServicePluginException;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -244,6 +251,75 @@ public class TestContainerLauncherManager {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
+ public void testReportFailureFromContainerLauncher() throws ServicePluginException, TezException {
+ final String dagName = DAG_NAME;
+ final int dagIndex = DAG_INDEX;
+ TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(0, 0), dagIndex);
+ DAG dag = mock(DAG.class);
+ doReturn(dagName).when(dag).getName();
+ doReturn(dagId).when(dag).getID();
+ EventHandler eventHandler = mock(EventHandler.class);
+ AppContext appContext = mock(AppContext.class);
+ doReturn(eventHandler).when(appContext).getEventHandler();
+ doReturn(dag).when(appContext).getCurrentDAG();
+ doReturn("testlauncher").when(appContext).getContainerLauncherName(0);
+
+ NamedEntityDescriptor<TaskCommunicatorDescriptor> taskCommDescriptor =
+ new NamedEntityDescriptor<>("testlauncher", ContainerLauncherForTest.class.getName());
+ List<NamedEntityDescriptor> list = new LinkedList<>();
+ list.add(taskCommDescriptor);
+ ContainerLauncherManager containerLauncherManager =
+ new ContainerLauncherManager(appContext, mock(TaskCommunicatorManagerInterface.class), "",
+ list, false);
+
+ try {
+ ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class);
+ Container container1 = mock(Container.class);
+ ContainerLauncherLaunchRequestEvent launchRequestEvent =
+ new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0);
+
+
+ containerLauncherManager.handle(launchRequestEvent);
+
+ ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+ Event rawEvent = argumentCaptor.getValue();
+ assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+ DAGAppMasterEventUserServiceFatalError event =
+ (DAGAppMasterEventUserServiceFatalError) rawEvent;
+ assertEquals(DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, event.getType());
+ assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError"));
+ assertTrue(
+ event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
+ assertTrue(event.getDiagnosticInfo().contains("[0:testlauncher]"));
+
+ reset(eventHandler);
+ // stop container
+
+ ContainerId containerId2 = mock(ContainerId.class);
+ NodeId nodeId2 = mock(NodeId.class);
+ ContainerLauncherStopRequestEvent stopRequestEvent =
+ new ContainerLauncherStopRequestEvent(containerId2, nodeId2, null, 0, 0, 0);
+
+ argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+ containerLauncherManager.handle(stopRequestEvent);
+ verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+ rawEvent = argumentCaptor.getValue();
+ assertTrue(rawEvent instanceof DAGEventTerminateDag);
+ DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent;
+ assertTrue(killEvent.getDiagnosticInfo().contains("ReportError"));
+ assertTrue(killEvent.getDiagnosticInfo()
+ .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name()));
+ assertTrue(killEvent.getDiagnosticInfo().contains("[0:testlauncher]"));
+ } finally {
+ containerLauncherManager.stop();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
public void testContainerLauncherUserError() throws ServicePluginException {
ContainerLauncher containerLauncher = mock(ContainerLauncher.class);
@@ -256,7 +332,8 @@ public class TestContainerLauncherManager {
Configuration conf = new Configuration(false);
ContainerLauncherManager containerLauncherManager =
- new ContainerLauncherManager(containerLauncher, appContext);
+ new ContainerLauncherManager(appContext);
+ containerLauncherManager.setContainerLauncher(containerLauncher);
try {
containerLauncherManager.init(conf);
containerLauncherManager.start();
@@ -437,4 +514,26 @@ public class TestContainerLauncherManager {
}
}
+ private static final String DAG_NAME = "dagName";
+ private static final int DAG_INDEX = 1;
+ public static class ContainerLauncherForTest extends ContainerLauncher {
+
+ public ContainerLauncherForTest(
+ ContainerLauncherContext containerLauncherContext) {
+ super(containerLauncherContext);
+ }
+
+ @Override
+ public void launchContainer(ContainerLaunchRequest launchRequest) throws
+ ServicePluginException {
+ getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null);
+ }
+
+ @Override
+ public void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException {
+ getContext()
+ .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index f69d8be..a3e5ff5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -49,6 +50,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.IOExceptionWithCause;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -74,6 +76,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDr
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
+import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
import org.junit.After;
@@ -503,10 +506,14 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
verify(mockApp).nodesUpdated(mockUpdatedNodes);
- Exception mockException = mock(Exception.class);
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+ Exception mockException = new IOException("mockexception");
scheduler.onError(mockException);
drainableAppCallback.drain();
- verify(mockApp).onError(mockException);
+ verify(mockApp)
+ .reportError(eq(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR), argumentCaptor.capture(),
+ any(DagInfo.class));
+ assertTrue(argumentCaptor.getValue().contains("mockexception"));
scheduler.onShutdownRequest();
drainableAppCallback.drain();
@@ -1220,10 +1227,14 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
verify(mockApp).nodesUpdated(mockUpdatedNodes);
- Exception mockException = mock(Exception.class);
+
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+ Exception mockException = new IOException("mockexception");
scheduler.onError(mockException);
drainableAppCallback.drain();
- verify(mockApp).onError(mockException);
+ verify(mockApp).reportError(eq(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR), argumentCaptor.capture(),
+ any(DagInfo.class));
+ assertTrue(argumentCaptor.getValue().contains("mockexception"));
scheduler.onShutdownRequest();
drainableAppCallback.drain();
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index b54d024..ab85751 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -68,6 +70,8 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -283,9 +287,10 @@ class TestTaskSchedulerHelpers {
}
@Override
- public void onError(Throwable t) {
+ public void reportError(@Nonnull ServicePluginError servicePluginError, String message,
+ DagInfo dagInfo) {
invocations++;
- real.onError(t);
+ real.reportError(servicePluginError, message, dagInfo);
}
@Override
@@ -327,6 +332,12 @@ class TestTaskSchedulerHelpers {
return real.getApplicationAttemptId();
}
+ @Nullable
+ @Override
+ public DagInfo getCurrentDagInfo() {
+ return real.getCurrentDagInfo();
+ }
+
@Override
public String getAppHostName() {
return real.getAppHostName();
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 4d828e2..791bb7f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -28,11 +28,13 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
@@ -71,9 +73,11 @@ import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
@@ -84,16 +88,19 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.web.WebUIService;
+import org.apache.tez.dag.helpers.DagInfoImplForTest;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -539,6 +546,81 @@ public class TestTaskSchedulerManager {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
+ public void testReportFailureFromTaskScheduler() {
+ String dagName = DAG_NAME;
+ Configuration conf = new TezConfiguration();
+ String taskSchedulerName = "testTaskScheduler";
+ String expIdentifier = "[0:" + taskSchedulerName + "]";
+ EventHandler eventHandler = mock(EventHandler.class);
+ AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ doReturn(taskSchedulerName).when(appContext).getTaskSchedulerName(0);
+ doReturn(eventHandler).when(appContext).getEventHandler();
+ doReturn(conf).when(appContext).getAMConf();
+ InetSocketAddress address = new InetSocketAddress("host", 55000);
+
+ DAGClientServer dagClientServer = mock(DAGClientServer.class);
+ doReturn(address).when(dagClientServer).getBindAddress();
+
+ DAG dag = mock(DAG.class);
+ TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(1, 0), DAG_INDEX);
+ doReturn(dagName).when(dag).getName();
+ doReturn(dagId).when(dag).getID();
+ doReturn(dag).when(appContext).getCurrentDAG();
+
+ NamedEntityDescriptor<TaskSchedulerDescriptor> namedEntityDescriptor =
+ new NamedEntityDescriptor<>(taskSchedulerName, TaskSchedulerForFailureTest.class.getName());
+ List<NamedEntityDescriptor> list = new LinkedList<>();
+ list.add(namedEntityDescriptor);
+
+ TaskSchedulerManager taskSchedulerManager =
+ new TaskSchedulerManager(appContext, dagClientServer, eventHandler,
+ mock(ContainerSignatureMatcher.class), mock(WebUIService.class), list, false) {
+ @Override
+ TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
+ // Avoid wrapping in threads
+ return rawContext;
+ }
+ };
+ try {
+ taskSchedulerManager.init(new TezConfiguration());
+ taskSchedulerManager.start();
+
+ taskSchedulerManager.getTotalResources(0);
+ ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+ Event rawEvent = argumentCaptor.getValue();
+ assertTrue(rawEvent instanceof DAGEventTerminateDag);
+ DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent;
+ assertTrue(killEvent.getDiagnosticInfo().contains("ReportError"));
+ assertTrue(killEvent.getDiagnosticInfo()
+ .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name()));
+ assertTrue(killEvent.getDiagnosticInfo().contains(expIdentifier));
+
+
+ reset(eventHandler);
+ taskSchedulerManager.getAvailableResources(0);
+ argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+ verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+ rawEvent = argumentCaptor.getValue();
+
+ assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+ DAGAppMasterEventUserServiceFatalError event =
+ (DAGAppMasterEventUserServiceFatalError) rawEvent;
+ assertEquals(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, event.getType());
+ assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError"));
+ assertTrue(
+ event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
+ assertTrue(event.getDiagnosticInfo().contains(expIdentifier));
+
+ } finally {
+ taskSchedulerManager.stop();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
public void testTaskSchedulerUserError() {
TaskScheduler taskScheduler = mock(TaskScheduler.class, new ExceptionAnswer());
@@ -798,4 +880,83 @@ public class TestTaskSchedulerManager {
return false;
}
}
+
+ private static final String DAG_NAME = "dagName";
+ private static final int DAG_INDEX = 1;
+ public static class TaskSchedulerForFailureTest extends TaskScheduler {
+
+ public TaskSchedulerForFailureTest(TaskSchedulerContext taskSchedulerContext) {
+ super(taskSchedulerContext);
+ }
+
+ @Override
+ public Resource getAvailableResources() throws ServicePluginException {
+ getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null);
+ return Resource.newInstance(1024, 1);
+ }
+
+ @Override
+ public Resource getTotalResources() throws ServicePluginException {
+ getContext()
+ .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME));
+ return Resource.newInstance(1024, 1);
+ }
+
+ @Override
+ public int getClusterNodeCount() throws ServicePluginException {
+ return 0;
+ }
+
+ @Override
+ public void blacklistNode(NodeId nodeId) throws ServicePluginException {
+
+ }
+
+ @Override
+ public void unblacklistNode(NodeId nodeId) throws ServicePluginException {
+
+ }
+
+ @Override
+ public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
+ Priority priority, Object containerSignature,
+ Object clientCookie) throws
+ ServicePluginException {
+
+ }
+
+ @Override
+ public void allocateTask(Object task, Resource capability, ContainerId containerId,
+ Priority priority, Object containerSignature,
+ Object clientCookie) throws
+ ServicePluginException {
+
+ }
+
+ @Override
+ public boolean deallocateTask(Object task, boolean taskSucceeded,
+ TaskAttemptEndReason endReason,
+ @Nullable String diagnostics) throws ServicePluginException {
+ return false;
+ }
+
+ @Override
+ public Object deallocateContainer(ContainerId containerId) throws ServicePluginException {
+ return null;
+ }
+
+ @Override
+ public void setShouldUnregister() throws ServicePluginException {
+
+ }
+
+ @Override
+ public boolean hasUnregistered() throws ServicePluginException {
+ return false;
+ }
+
+ @Override
+ public void dagComplete() throws ServicePluginException {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
new file mode 100644
index 0000000..f92513f
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/helpers/DagInfoImplForTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.helpers;
+
+import org.apache.tez.serviceplugins.api.DagInfo;
+
+public class DagInfoImplForTest implements DagInfo {
+
+ private final int index;
+ private final String name;
+
+ public DagInfoImplForTest(int index, String name) {
+ this.index = index;
+ this.name = name;
+ }
+
+ @Override
+ public int getIndex() {
+ return index;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java
new file mode 100644
index 0000000..32d1fb6
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/ErrorPluginConfiguration.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.ServicePluginContextBase;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
+
+public class ErrorPluginConfiguration {
+
+ public static final String REPORT_FATAL_ERROR_MESSAGE = "ReportedFatalError";
+ public static final String REPORT_NONFATAL_ERROR_MESSAGE = "ReportedError";
+ public static final String THROW_ERROR_EXCEPTION_STRING = "Simulated Error";
+
+ private static final String CONF_THROW_ERROR = "throw.error";
+ private static final String CONF_REPORT_ERROR = "report.error";
+ private static final String CONF_REPORT_ERROR_FATAL = "report.error.fatal";
+ private static final String CONF_REPORT_ERROR_DAG_NAME = "report.error.dag.name";
+
+ private final HashMap<String, String> kv;
+
+ private ErrorPluginConfiguration() {
+ this.kv = new HashMap<>();
+ }
+
+ private ErrorPluginConfiguration(HashMap<String, String> map) {
+ this.kv = map;
+ }
+
+ public static ErrorPluginConfiguration createThrowErrorConf() {
+ ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
+ conf.kv.put(CONF_THROW_ERROR, String.valueOf(true));
+ return conf;
+ }
+
+ public static ErrorPluginConfiguration createReportFatalErrorConf(String dagName) {
+ ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
+ conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true));
+ conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(true));
+ conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName);
+ return conf;
+ }
+
+ public static ErrorPluginConfiguration createReportNonFatalErrorConf(String dagName) {
+ ErrorPluginConfiguration conf = new ErrorPluginConfiguration();
+ conf.kv.put(CONF_REPORT_ERROR, String.valueOf(true));
+ conf.kv.put(CONF_REPORT_ERROR_FATAL, String.valueOf(false));
+ conf.kv.put(CONF_REPORT_ERROR_DAG_NAME, dagName);
+ return conf;
+ }
+
+ public static UserPayload toUserPayload(ErrorPluginConfiguration conf) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(conf.kv);
+ oos.close();
+ UserPayload userPayload = UserPayload.create(ByteBuffer.wrap(baos.toByteArray()));
+ return userPayload;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ErrorPluginConfiguration toErrorPluginConfiguration(UserPayload userPayload) throws
+ IOException, ClassNotFoundException {
+
+ byte[] b = new byte[userPayload.getPayload().remaining()];
+ userPayload.getPayload().get(b);
+ ByteArrayInputStream bais = new ByteArrayInputStream(b);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+
+ HashMap<String, String> map = (HashMap) ois.readObject();
+ ErrorPluginConfiguration conf = new ErrorPluginConfiguration(map);
+ return conf;
+ }
+
+ public boolean shouldThrowError() {
+ return (kv.containsKey(CONF_THROW_ERROR) && Boolean.parseBoolean(kv.get(CONF_THROW_ERROR)));
+ }
+
+ public boolean shouldReportFatalError(String dagName) {
+ if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) &&
+ Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL))) {
+ if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") ||
+ kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean shouldReportNonFatalError(String dagName) {
+ if (kv.containsKey(CONF_REPORT_ERROR) && Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR)) &&
+ Boolean.parseBoolean(kv.get(CONF_REPORT_ERROR_FATAL)) == false) {
+ if (dagName == null || dagName.isEmpty() || kv.get(CONF_REPORT_ERROR_DAG_NAME).equals("*") ||
+ kv.get(CONF_REPORT_ERROR_DAG_NAME).equals(dagName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static void processError(ErrorPluginConfiguration conf, ServicePluginContextBase context) {
+ if (conf.shouldThrowError()) {
+ throw new RuntimeException(ErrorPluginConfiguration.THROW_ERROR_EXCEPTION_STRING);
+ } else if (conf.shouldReportFatalError(null)) {
+ context.reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE,
+ ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+ context.getCurrentDagInfo());
+ } else if (conf.shouldReportNonFatalError(null)) {
+ context.reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE,
+ ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE,
+ context.getCurrentDagInfo());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
index d489cca..b4ea176 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java
@@ -14,24 +14,33 @@
package org.apache.tez.dag.app.launcher;
+import java.io.IOException;
+
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
public class TezTestServiceContainerLauncherWithErrors extends ContainerLauncher {
+
+ private final ErrorPluginConfiguration conf;
+
public TezTestServiceContainerLauncherWithErrors(
- ContainerLauncherContext containerLauncherContext) {
+ ContainerLauncherContext containerLauncherContext) throws IOException,
+ ClassNotFoundException {
super(containerLauncherContext);
+ conf = ErrorPluginConfiguration.toErrorPluginConfiguration(containerLauncherContext.getInitialUserPayload());
}
@Override
public void launchContainer(ContainerLaunchRequest launchRequest) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
public void stopContainer(ContainerStopRequest stopRequest) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
index 1705eac..13d4815 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java
@@ -16,18 +16,25 @@ package org.apache.tez.dag.app.rm;
import javax.annotation.Nullable;
+import java.io.IOException;
+
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler {
+
+ private final ErrorPluginConfiguration conf;
+
public TezTestServiceTaskSchedulerServiceWithErrors(
- TaskSchedulerContext taskSchedulerContext) {
+ TaskSchedulerContext taskSchedulerContext) throws IOException, ClassNotFoundException {
super(taskSchedulerContext);
+ conf = ErrorPluginConfiguration.toErrorPluginConfiguration(taskSchedulerContext.getInitialUserPayload());
}
@Override
@@ -47,35 +54,37 @@ public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler
@Override
public void blacklistNode(NodeId nodeId) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
public void unblacklistNode(NodeId nodeId) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
Priority priority, Object containerSignature, Object clientCookie) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
public void allocateTask(Object task, Resource capability, ContainerId containerId,
Priority priority, Object containerSignature, Object clientCookie) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason,
@Nullable String diagnostics) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
+ return true;
}
@Override
public Object deallocateContainer(ContainerId containerId) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
index 90313d4..8221957 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
@@ -15,6 +15,7 @@
package org.apache.tez.dag.app.taskcomm;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
@@ -22,6 +23,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.event.VertexStateUpdate;
@@ -31,20 +34,24 @@ import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
+
+ private final ErrorPluginConfiguration conf;
+
public TezTestServiceTaskCommunicatorWithErrors(
- TaskCommunicatorContext taskCommunicatorContext) {
+ TaskCommunicatorContext taskCommunicatorContext) throws IOException, ClassNotFoundException {
super(taskCommunicatorContext);
+ conf = ErrorPluginConfiguration.toErrorPluginConfiguration(taskCommunicatorContext.getInitialUserPayload());
}
@Override
public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
@Nullable String diagnostics) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
@@ -52,14 +59,14 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
Map<String, LocalResource> additionalResources,
Credentials credentials, boolean credentialsChanged,
int priority) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
TaskAttemptEndReason endReason,
@Nullable String diagnostics) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
@@ -69,7 +76,7 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
@Override
public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
}
@Override
@@ -78,6 +85,7 @@ public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator {
@Override
public Object getMetaInfo() {
- throw new RuntimeException("Simulated Error");
+ ErrorPluginConfiguration.processError(conf, getContext());
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
index bfd3ed2..ac6ebde 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java
@@ -19,7 +19,9 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.EnumSet;
+import java.util.List;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -40,6 +42,7 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.app.ErrorPluginConfiguration;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
@@ -49,6 +52,7 @@ import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
@@ -63,7 +67,13 @@ public class TestExternalTezServicesErrors {
private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class);
private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
- private static final String EXT_FAIL_ENTITY_NAME = "ExtServiceTestFail";
+ private static final String EXT_THROW_ERROR_ENTITY_NAME = "ExtServiceTestThrowErrors";
+ private static final String EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportNonFatalErrors";
+ private static final String EXT_REPORT_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportFatalErrors";
+
+ private static final String SUFFIX_LAUNCHER = "ContainerLauncher";
+ private static final String SUFFIX_TASKCOMM = "TaskCommunicator";
+ private static final String SUFFIX_SCHEDULER = "TaskScheduler";
private static ExternalTezServiceTestHelper extServiceTestHelper;
@@ -76,12 +86,32 @@ public class TestExternalTezServicesErrors {
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
Vertex.VertexExecutionContext.create(
EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
- private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_FAIL =
- Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
- private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_FAIL =
- Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME);
- private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_FAIL =
- Vertex.VertexExecutionContext.create(EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+ // Throw error contexts
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_THROW =
+ Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_THROW =
+ Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
+ EXT_THROW_ERROR_ENTITY_NAME);
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_THROW =
+ Vertex.VertexExecutionContext.create(EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+
+ // Report-non-fatal contexts
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL =
+ Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL =
+ Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
+ EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME);
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL =
+ Vertex.VertexExecutionContext.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+
+ // Report fatal contexts
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL =
+ Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL =
+ Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
+ EXT_REPORT_FATAL_ERROR_ENTITY_NAME);
+ private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL =
+ Vertex.VertexExecutionContext.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
@@ -93,29 +123,63 @@ public class TestExternalTezServicesErrors {
public static void setup() throws Exception {
extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
- UserPayload userPayload = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
+ UserPayload userPayload =
+ TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
+ UserPayload userPayloadThrowError =
+ ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createThrowErrorConf());
+
+ UserPayload userPayloadReportFatalErrorLauncher = ErrorPluginConfiguration
+ .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_LAUNCHER));
+ UserPayload userPayloadReportFatalErrorTaskComm = ErrorPluginConfiguration
+ .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_TASKCOMM));
+ UserPayload userPayloadReportFatalErrorScheduler = ErrorPluginConfiguration
+ .toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_SCHEDULER));
+
+ UserPayload userPayloadReportNonFatalErrorLauncher = ErrorPluginConfiguration
+ .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_LAUNCHER));
+ UserPayload userPayloadReportNonFatalErrorTaskComm = ErrorPluginConfiguration
+ .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_TASKCOMM));
+ UserPayload userPayloadReportNonFatalErrorScheduler = ErrorPluginConfiguration
+ .toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_SCHEDULER));
TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
TaskSchedulerDescriptor
.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())
.setUserPayload(userPayload),
- TaskSchedulerDescriptor.create(EXT_FAIL_ENTITY_NAME,
+ TaskSchedulerDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
+ TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
+ userPayloadThrowError),
+ TaskSchedulerDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
+ TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
+ userPayloadReportFatalErrorScheduler),
+ TaskSchedulerDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
- userPayload)};
+ userPayloadReportNonFatalErrorScheduler),
+ };
ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
ContainerLauncherDescriptor
.create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())
.setUserPayload(userPayload),
- ContainerLauncherDescriptor.create(EXT_FAIL_ENTITY_NAME,
- TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayload)};
+ ContainerLauncherDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
+ TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadThrowError),
+ ContainerLauncherDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
+ TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorLauncher),
+ ContainerLauncherDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
+ TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorLauncher)
+ };
TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
TaskCommunicatorDescriptor
.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())
.setUserPayload(userPayload),
- TaskCommunicatorDescriptor.create(EXT_FAIL_ENTITY_NAME,
- TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayload)};
+ TaskCommunicatorDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
+ TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadThrowError),
+ TaskCommunicatorDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
+ TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorTaskComm),
+ TaskCommunicatorDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
+ TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorTaskComm)
+ };
servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
@@ -137,35 +201,86 @@ public class TestExternalTezServicesErrors {
extServiceTestHelper.tearDownAll();
}
- @Test (timeout = 90000)
- public void testContainerLauncherError() throws Exception {
- testServiceError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_FAIL,
- DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR);
+ @Test(timeout = 90000)
+ public void testContainerLauncherThrowError() throws Exception {
+ testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_THROW,
+ SUFFIX_LAUNCHER, Lists.newArrayList("Service Error",
+ DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR.name()));
+ }
+
+ @Test(timeout = 90000)
+ public void testTaskCommunicatorThrowError() throws Exception {
+ testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_TASKCOMM_THROW,
+ SUFFIX_TASKCOMM, Lists.newArrayList("Service Error",
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR.name()));
+ }
+
+ @Test(timeout = 90000)
+ public void testTaskSchedulerThrowError() throws Exception {
+ testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_SCHEDULER_THROW,
+ SUFFIX_SCHEDULER, Lists.newArrayList("Service Error",
+ DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR.name()));
+ }
+
+ @Test (timeout = 150000)
+ public void testNonFatalErrors() throws IOException, TezException, InterruptedException {
+ String methodName = "testNonFatalErrors";
+ TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
+ TezClient tezClient = TezClient
+ .newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
+ tezClientConf)
+ .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
+ try {
+ tezClient.start();
+ LOG.info("TezSessionStarted for " + methodName);
+ tezClient.waitTillReady();
+ LOG.info("TezSession ready for submission for " + methodName);
+
+
+ runAndVerifyForNonFatalErrors(tezClient, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL);
+ runAndVerifyForNonFatalErrors(tezClient, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL);
+ runAndVerifyForNonFatalErrors(tezClient, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL);
+
+ } finally {
+ tezClient.stop();
+ }
+ }
+
+ @Test(timeout = 90000)
+ public void testContainerLauncherReportFatalError() throws Exception {
+ testFatalError("_testContainerLauncherReportFatalError_",
+ EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL, SUFFIX_LAUNCHER, Lists
+ .newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+ ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
}
- @Test (timeout = 90000)
- public void testTaskCommunicatorError() throws Exception {
- testServiceError("_testTaskCommunicatorError_", EXECUTION_CONTEXT_TASKCOMM_FAIL,
- DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR);
+ @Test(timeout = 90000)
+ public void testTaskCommReportFatalError() throws Exception {
+ testFatalError("_testTaskCommReportFatalError_", EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL,
+ SUFFIX_TASKCOMM, Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+ ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
}
- @Test (timeout = 90000)
- public void testTaskSchedulerError() throws Exception {
- testServiceError("_testTaskSchedulerError_", EXECUTION_CONTEXT_SCHEDULER_FAIL,
- DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR);
+ @Test(timeout = 90000)
+ public void testTaskSchedulerReportFatalError() throws Exception {
+ testFatalError("_testTaskSchedulerReportFatalError_",
+ EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL, SUFFIX_SCHEDULER,
+ Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
+ ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
}
- private void testServiceError(String methodName,
- Vertex.VertexExecutionContext lhsExecutionContext,
- DAGAppMasterEventType expectedEventType) throws
- IOException, TezException, InterruptedException, YarnException {
+
+ private void testFatalError(String methodName,
+ Vertex.VertexExecutionContext lhsExecutionContext,
+ String dagNameSuffix, List<String> expectedDiagMessages) throws
+ IOException, TezException, YarnException, InterruptedException {
TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
TezClient tezClient = TezClient
.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
tezClientConf)
.setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
- ApplicationId appId;
+ ApplicationId appId= null;
try {
tezClient.start();
LOG.info("TezSessionStarted for " + methodName);
@@ -175,10 +290,11 @@ public class TestExternalTezServicesErrors {
JoinValidateConfigured joinValidate =
new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsExecutionContext,
EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
- EXECUTION_CONTEXT_EXT_SERVICE_PUSH, "LauncherFailTest");
+ EXECUTION_CONTEXT_EXT_SERVICE_PUSH, dagNameSuffix);
DAG dag = joinValidate
- .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH,
+ .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()),
+ HASH_JOIN_EXPECTED_RESULT_PATH,
HASH_JOIN_OUTPUT_PATH, 3);
DAGClient dagClient = tezClient.submitDAG(dag);
@@ -188,14 +304,15 @@ public class TestExternalTezServicesErrors {
assertEquals(DAGStatus.State.ERROR, dagStatus.getState());
boolean foundDiag = false;
for (String diag : dagStatus.getDiagnostics()) {
- if (diag.contains("Service Error") && diag.contains(
- expectedEventType.toString()) &&
- diag.contains("Simulated Error")) {
- foundDiag = true;
+ foundDiag = checkDiag(diag, expectedDiagMessages);
+ if (foundDiag) {
+ break;
}
}
appId = tezClient.getAppMasterApplicationId();
assertTrue(foundDiag);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
} finally {
tezClient.stop();
}
@@ -222,14 +339,58 @@ public class TestExternalTezServicesErrors {
String diag = appAttemptReport.getDiagnostics();
assertEquals(FinalApplicationStatus.FAILED, appReport.getFinalApplicationStatus());
assertEquals(YarnApplicationState.FINISHED, appReport.getYarnApplicationState());
- assertTrue(diag.contains("Service Error") && diag.contains(
- expectedEventType.toString()) &&
- diag.contains("Simulated Error"));
-
+ checkDiag(diag, expectedDiagMessages);
} finally {
yarnClient.stop();
}
}
}
+ private boolean checkDiag(String diag, List<String> expected) {
+ boolean found = true;
+ for (String exp : expected) {
+ if (diag.contains(exp)) {
+ found = true;
+ continue;
+ } else {
+ found = false;
+ break;
+ }
+ }
+ return found;
+ }
+
+
+ private void runAndVerifyForNonFatalErrors(TezClient tezClient, String componentName,
+ Vertex.VertexExecutionContext lhsContext) throws
+ TezException,
+ InterruptedException, IOException {
+ LOG.info("Running JoinValidate with componentName reportNonFatalException");
+ JoinValidateConfigured joinValidate =
+ new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext,
+ EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
+ EXECUTION_CONTEXT_EXT_SERVICE_PUSH, componentName);
+
+ DAG dag = joinValidate
+ .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()),
+ HASH_JOIN_EXPECTED_RESULT_PATH,
+ HASH_JOIN_OUTPUT_PATH, 3);
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+
+ DAGStatus dagStatus =
+ dagClient.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+ assertEquals(DAGStatus.State.FAILED, dagStatus.getState());
+
+ boolean foundDiag = false;
+ for (String diag : dagStatus.getDiagnostics()) {
+ if (diag.contains(ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE) &&
+ diag.contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())) {
+ foundDiag = true;
+ break;
+ }
+ }
+ assertTrue(foundDiag);
+ }
+
}