You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:59 UTC
[41/50] [abbrv] tez git commit: TEZ-2678. Fix comments from reviews -
part 1. (sseth)
TEZ-2678. Fix comments from reviews - part 1. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fee059b9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fee059b9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fee059b9
Branch: refs/heads/TEZ-2003
Commit: fee059b9030f76432b308105ded559c5195f922a
Parents: a960c64
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Aug 11 11:19:09 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:47:10 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../java/org/apache/tez/client/TezClient.java | 2 +-
.../org/apache/tez/client/TezClientUtils.java | 14 +-
.../main/java/org/apache/tez/dag/api/DAG.java | 76 ++++++++++-
.../apache/tez/dag/api/DagTypeConverters.java | 2 +-
.../api/ContainerLauncherOperationBase.java | 8 +-
.../api/ServicePluginsDescriptor.java | 13 ++
.../api/TaskAttemptEndReason.java | 2 +-
.../tez/serviceplugins/api/TaskScheduler.java | 6 +-
.../java/org/apache/tez/dag/api/TestDAG.java | 2 +-
.../org/apache/tez/dag/api/TestDAGPlan.java | 113 ++++++++++++++-
.../tez/dag/api/TestDagTypeConverters.java | 11 +-
.../org/apache/tez/common/TezUtilsInternal.java | 5 +-
.../apache/tez/dag/api/TaskCommunicator.java | 12 +-
.../tez/dag/api/TaskCommunicatorContext.java | 2 +-
.../apache/tez/dag/app/TaskAttemptListener.java | 4 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 9 +-
.../dag/app/TaskCommunicatorContextImpl.java | 15 +-
.../tez/dag/app/TezTaskCommunicatorImpl.java | 10 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 13 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 9 +-
.../app/launcher/ContainerLauncherRouter.java | 19 +--
.../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 8 +-
.../dag/app/rm/LocalTaskSchedulerService.java | 2 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 5 +-
.../dag/app/rm/YarnTaskSchedulerService.java | 5 +-
.../dag/app/rm/container/AMContainerImpl.java | 62 +++++----
.../tez/dag/app/rm/node/AMNodeTracker.java | 2 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 6 +-
.../app/TestTaskAttemptListenerImplTezDag2.java | 3 +-
.../app/TestTaskCommunicatorContextImpl.java | 85 ++++++++++++
.../dag/app/TestTaskCommunicatorManager.java | 4 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 12 +-
.../launcher/TestContainerLauncherRouter.java | 6 +-
.../tez/dag/app/rm/TestContainerReuse.java | 136 +++++++++++++------
.../app/rm/TestLocalTaskSchedulerService.java | 4 +-
.../tez/dag/app/rm/TestTaskScheduler.java | 18 +--
.../app/rm/TestTaskSchedulerEventHandler.java | 13 +-
.../dag/app/rm/container/TestAMContainer.java | 127 +++++++++++------
.../org/apache/tez/examples/JoinValidate.java | 8 ++
tez-ext-service-tests/pom.xml | 5 -
.../rm/TezTestServiceTaskSchedulerService.java | 5 +-
.../TezTestServiceTaskCommunicatorImpl.java | 10 +-
.../tez/service/impl/ContainerRunnerImpl.java | 2 +-
.../apache/tez/runtime/task/TezTaskRunner2.java | 16 +--
.../runtime/task/TaskExecutionTestHelpers.java | 1 +
.../runtime/task/TestContainerExecution.java | 1 +
47 files changed, 663 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 75fac88..fd3374e 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -45,5 +45,6 @@ ALL CHANGES:
TEZ-2126. Add unit tests for verifying multiple schedulers, launchers, communicators.
TEZ-2698. rebase 08/05
TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins
+ TEZ-2678. Fix comments from reviews - part 1.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 373be81..036b5e8 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -471,7 +471,7 @@ public class TezClient {
Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
- usingTezArchiveDeploy, sessionCredentials, aclConfigs);
+ usingTezArchiveDeploy, sessionCredentials, aclConfigs, servicePluginsDescriptor);
SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
requestBuilder.setDAGPlan(dagPlan).build();
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 6086fa1..ecf5c07 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -609,7 +609,7 @@ public class TezClientUtils {
if(dag != null) {
DAGPlan dagPB = prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive,
- sessionCreds);
+ sessionCreds, servicePluginsDescriptor);
// emit protobuf DAG file style
Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
@@ -685,18 +685,19 @@ public class TezClientUtils {
static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
- Credentials credentials) throws IOException {
+ Credentials credentials, ServicePluginsDescriptor servicePluginsDescriptor) throws IOException {
return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials,
- null);
+ null, servicePluginsDescriptor);
}
static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
- Credentials credentials, Map<String, String> additionalDAGConfigs) throws IOException {
+ Credentials credentials, Map<String, String> additionalDAGConfigs,
+ ServicePluginsDescriptor servicePluginsDescriptor) throws IOException {
Credentials dagCredentials = setupDAGCredentials(dag, credentials,
amConfig.getTezConfiguration());
return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
- amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs);
+ amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs, servicePluginsDescriptor);
}
static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -776,7 +777,7 @@ public class TezClientUtils {
}
AMPluginDescriptorProto pluginDescriptorProto =
- DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
builder.setAmPluginDescriptor(pluginDescriptorProto);
return builder.build();
@@ -1035,4 +1036,5 @@ public class TezClientUtils {
}
}
}
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 927039a..78bb660 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -35,6 +35,8 @@ import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -714,14 +716,15 @@ public class DAG {
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
boolean tezLrsAsArchive) {
return createDag(tezConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
- null);
+ null, null);
}
// create protobuf message describing DAG
@Private
public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
- boolean tezLrsAsArchive, Map<String, String> additionalConfigs) {
+ boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
+ ServicePluginsDescriptor servicePluginsDescriptor) {
verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -732,6 +735,7 @@ public class DAG {
// Setup default execution context.
VertexExecutionContext defaultContext = getDefaultExecutionContext();
+ verifyExecutionContext(defaultContext, servicePluginsDescriptor, "DAGDefault");
if (defaultContext != null) {
DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto(
defaultContext);
@@ -834,6 +838,7 @@ public class DAG {
// Vertex ExecutionContext setup
VertexExecutionContext execContext = vertex.getVertexExecutionContext();
+ verifyExecutionContext(execContext, servicePluginsDescriptor, vertex.getName());
if (execContext != null) {
DAGProtos.VertexExecutionContextProto contextProto =
DagTypeConverters.convertToProto(execContext);
@@ -986,4 +991,71 @@ public class DAG {
return dagBuilder.build();
}
+
+ private void verifyExecutionContext(VertexExecutionContext executionContext,
+ ServicePluginsDescriptor servicePluginsDescriptor,
+ String context) {
+ if (executionContext != null) {
+ if (executionContext.shouldExecuteInContainers()) {
+ if (servicePluginsDescriptor == null || !servicePluginsDescriptor.areContainersEnabled()) {
+ throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+ " specifies container execution but this is disabled in the ServicePluginDescriptor");
+ }
+ }
+ if (executionContext.shouldExecuteInAm()) {
+ if (servicePluginsDescriptor == null || !servicePluginsDescriptor.isUberEnabled()) {
+ throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+ " specifies AM execution but this is disabled in the ServicePluginDescriptor");
+ }
+ }
+ if (executionContext.getTaskSchedulerName() != null) {
+ boolean found = false;
+ if (servicePluginsDescriptor != null) {
+ found = checkNamedEntityExists(executionContext.getTaskSchedulerName(),
+ servicePluginsDescriptor.getTaskSchedulerDescriptors());
+ }
+ if (!found) {
+ throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+ " specifies task scheduler as " + executionContext.getTaskSchedulerName() +
+ " which is not part of the ServicePluginDescriptor");
+ }
+ }
+ if (executionContext.getContainerLauncherName() != null) {
+ boolean found = false;
+ if (servicePluginsDescriptor != null) {
+ found = checkNamedEntityExists(executionContext.getContainerLauncherName(),
+ servicePluginsDescriptor.getContainerLauncherDescriptors());
+ }
+ if (!found) {
+ throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+ " specifies container launcher as " + executionContext.getContainerLauncherName() +
+ " which is not part of the ServicePluginDescriptor");
+ }
+ }
+ if (executionContext.getTaskCommName() != null) {
+ boolean found = false;
+ if (servicePluginsDescriptor != null) {
+ found = checkNamedEntityExists(executionContext.getTaskCommName(),
+ servicePluginsDescriptor.getTaskCommunicatorDescriptors());
+ }
+ if (!found) {
+ throw new IllegalStateException("Invalid configuration. ExecutionContext for " + context +
+ " specifies task communicator as " + executionContext.getTaskCommName() +
+ " which is not part of the ServicePluginDescriptor");
+ }
+ }
+ }
+ }
+
+ private boolean checkNamedEntityExists(String expected, NamedEntityDescriptor[] namedEntities) {
+ if (namedEntities == null) {
+ return false;
+ }
+ for (NamedEntityDescriptor named : namedEntities) {
+ if (named.getEntityName().equals(expected)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 61e4d33..2823a86 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -801,7 +801,7 @@ public class DagTypeConverters {
return builder.build();
}
- public static AMPluginDescriptorProto convertServicePluginDescriptoToProto(
+ public static AMPluginDescriptorProto convertServicePluginDescriptorToProto(
ServicePluginsDescriptor servicePluginsDescriptor) {
AMPluginDescriptorProto.Builder pluginDescriptorBuilder =
AMPluginDescriptorProto.newBuilder();
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
index 260b681..98806fa 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
@@ -42,8 +42,8 @@ public class ContainerLauncherOperationBase {
}
/**
- * Get the node on whcih this container is to be launched
- * @return
+ * Get the node on which this container is to be launched
+ * @return the node id for the container
*/
public NodeId getNodeId() {
return nodeId;
@@ -51,7 +51,7 @@ public class ContainerLauncherOperationBase {
/**
* Get the containerId for the container
- * @return
+ * @return the container id for the container opeartion
*/
public ContainerId getContainerId() {
return containerId;
@@ -59,7 +59,7 @@ public class ContainerLauncherOperationBase {
/**
* Get the security token for the container. Primarily for YARN
- * @return
+ * @return the token for the container launch.
*/
public Token getContainerToken() {
return containerToken;
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
index ce35350..113b7db 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -14,6 +14,8 @@
package org.apache.tez.serviceplugins.api;
+import java.util.Arrays;
+
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -138,4 +140,15 @@ public class ServicePluginsDescriptor {
public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() {
return taskCommunicatorDescriptors;
}
+
+ @Override
+ public String toString() {
+ return "ServicePluginsDescriptor{" +
+ "enableContainers=" + enableContainers +
+ ", enableUber=" + enableUber +
+ ", taskSchedulerDescriptors=" + Arrays.toString(taskSchedulerDescriptors) +
+ ", containerLauncherDescriptors=" + Arrays.toString(containerLauncherDescriptors) +
+ ", taskCommunicatorDescriptors=" + Arrays.toString(taskCommunicatorDescriptors) +
+ '}';
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
index 4255c28..bff36cd 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability;
public enum TaskAttemptEndReason {
NODE_FAILED, // Completed because the node running the container was marked as dead
COMMUNICATION_ERROR, // Communication error with the task
- SERVICE_BUSY, // External service busy
+ EXECUTOR_BUSY, // External service busy
INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
EXTERNAL_PREEMPTION, // Preempted due to cluster contention
APPLICATION_ERROR, // An error in the AM caused by user code
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
index f05bddc..9a864c5 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -14,6 +14,8 @@
package org.apache.tez.serviceplugins.api;
+import javax.annotation.Nullable;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.Container;
@@ -178,11 +180,13 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
* @param task the task being de-allocated.
* @param taskSucceeded whether the task succeeded or not
* @param endReason the reason for the task failure
+ * @param diagnostics additional diagnostics information which may be relevant
* @return true if the task was associated with a container, false if the task was not associated
* with a container
*/
public abstract boolean deallocateTask(Object task, boolean taskSucceeded,
- TaskAttemptEndReason endReason);
+ TaskAttemptEndReason endReason,
+ @Nullable String diagnostics);
/**
* A request to de-allocate a previously allocated container.
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 3fe17df..268267b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -86,7 +86,7 @@ public class TestDAG {
dummyTaskCount, dummyTaskResource);
DAG dag = DAG.create("testDAG");
- dag.createVertexGroup("group_1", v1,v2);
+ dag.createVertexGroup("group_1", v1, v2);
try {
dag.createVertexGroup("group_1", v2, v3);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index cd42109..7edea2f 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -38,7 +38,6 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
-import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
@@ -46,6 +45,10 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
+import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
+import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -317,6 +320,108 @@ public class TestDAGPlan {
}
@Test(timeout = 5000)
+ public void testInvalidExecContext_1() {
+ DAG dag = DAG.create("dag1");
+ dag.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
+ Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1);
+ dag.addVertex(v1);
+
+ try {
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null, null);
+ fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("AM execution"));
+ }
+
+ dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true));
+
+ try {
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null, null);
+ fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("container execution"));
+ }
+
+ }
+
+ @Test(timeout = 5000)
+ public void testInvalidExecContext_2() {
+
+ ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor
+ .create(false,
+ new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create("plugin", null)},
+ new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)},
+ new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)});
+
+ VertexExecutionContext validExecContext = VertexExecutionContext.create("plugin", "plugin",
+ "plugin");
+ VertexExecutionContext invalidExecContext1 =
+ VertexExecutionContext.create("invalidplugin", "plugin", "plugin");
+ VertexExecutionContext invalidExecContext2 =
+ VertexExecutionContext.create("plugin", "invalidplugin", "plugin");
+ VertexExecutionContext invalidExecContext3 =
+ VertexExecutionContext.create("plugin", "plugin", "invalidplugin");
+
+
+ DAG dag = DAG.create("dag1");
+ dag.setExecutionContext(VertexExecutionContext.createExecuteInContainers(true));
+ Vertex v1 = Vertex.create("testvertex", ProcessorDescriptor.create("processor1"), 1);
+ dag.addVertex(v1);
+
+ // Should succeed. Default context is containers.
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null,
+ servicePluginsDescriptor);
+
+
+ // Set execute in AM should fail
+ v1.setExecutionContext(VertexExecutionContext.createExecuteInAm(true));
+ try {
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+ fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("AM execution"));
+ }
+
+ // Valid context
+ v1.setExecutionContext(validExecContext);
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+
+ // Invalid task scheduler
+ v1.setExecutionContext(invalidExecContext1);
+ try {
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+ fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("testvertex"));
+ assertTrue(e.getMessage().contains("task scheduler"));
+ assertTrue(e.getMessage().contains("invalidplugin"));
+ }
+
+ // Invalid ContainerLauncher
+ v1.setExecutionContext(invalidExecContext2);
+ try {
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+ fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("testvertex"));
+ assertTrue(e.getMessage().contains("container launcher"));
+ assertTrue(e.getMessage().contains("invalidplugin"));
+ }
+
+ // Invalid task comm
+ v1.setExecutionContext(invalidExecContext3);
+ try {
+ dag.createDag(new TezConfiguration(false), null, null, null, true, null, servicePluginsDescriptor);
+ fail("Expecting dag create to fail due to invalid ServicePluginDescriptor");
+ } catch (IllegalStateException e) {
+ assertTrue(e.getMessage().contains("testvertex"));
+ assertTrue(e.getMessage().contains("task communicator"));
+ assertTrue(e.getMessage().contains("invalidplugin"));
+ }
+
+ }
+
+ @Test(timeout = 5000)
public void testServiceDescriptorPropagation() {
DAG dag = DAG.create("testDag");
ProcessorDescriptor pd1 = ProcessorDescriptor.create("processor1").
@@ -328,6 +433,10 @@ public class TestDAGPlan {
VertexExecutionContext.create("plugin", "plugin", "plugin");
VertexExecutionContext v1Context = VertexExecutionContext.createExecuteInAm(true);
+ ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor
+ .create(true, new TaskSchedulerDescriptor[]{TaskSchedulerDescriptor.create("plugin", null)},
+ new ContainerLauncherDescriptor[]{ContainerLauncherDescriptor.create("plugin", null)},
+ new TaskCommunicatorDescriptor[]{TaskCommunicatorDescriptor.create("plugin", null)});
Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1)).setExecutionContext(v1Context);
Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
@@ -347,7 +456,7 @@ public class TestDAGPlan {
dag.addVertex(v1).addVertex(v2).addEdge(edge);
dag.setExecutionContext(defaultExecutionContext);
- DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
+ DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true, null, servicePluginsDescriptor);
assertEquals(2, dagProto.getVertexCount());
assertEquals(1, dagProto.getEdgeCount());
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
index e37f849..6f795fc 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDagTypeConverters.java
@@ -33,16 +33,13 @@ import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
-import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexExecutionContextProto;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
-import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.Assert;
import org.junit.Test;
@@ -152,7 +149,7 @@ public class TestDagTypeConverters {
// Uber-execution
servicePluginsDescriptor = ServicePluginsDescriptor.create(true);
- proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
assertTrue(proto.hasUberEnabled());
assertTrue(proto.hasContainersEnabled());
assertTrue(proto.getUberEnabled());
@@ -168,7 +165,7 @@ public class TestDagTypeConverters {
servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
taskComms);
- proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
assertTrue(proto.hasUberEnabled());
assertTrue(proto.hasContainersEnabled());
assertFalse(proto.getUberEnabled());
@@ -185,7 +182,7 @@ public class TestDagTypeConverters {
servicePluginsDescriptor = ServicePluginsDescriptor.create(taskSchedulers, containerLaunchers,
taskComms);
- proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
assertTrue(proto.hasUberEnabled());
assertTrue(proto.hasContainersEnabled());
assertFalse(proto.getUberEnabled());
@@ -201,7 +198,7 @@ public class TestDagTypeConverters {
servicePluginsDescriptor = ServicePluginsDescriptor.create(false, true, taskSchedulers, containerLaunchers,
taskComms);
- proto = DagTypeConverters.convertServicePluginDescriptoToProto(servicePluginsDescriptor);
+ proto = DagTypeConverters.convertServicePluginDescriptorToProto(servicePluginsDescriptor);
assertTrue(proto.hasUberEnabled());
assertTrue(proto.hasContainersEnabled());
assertTrue(proto.getUberEnabled());
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 1fb7ff9..d6ef901 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -45,7 +45,6 @@ import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -271,7 +270,7 @@ public class TezUtilsInternal {
switch (taskAttemptEndReason) {
case COMMUNICATION_ERROR:
return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
- case SERVICE_BUSY:
+ case EXECUTOR_BUSY:
return TaskAttemptTerminationCause.SERVICE_BUSY;
case INTERNAL_PREEMPTION:
return TaskAttemptTerminationCause.INTERNAL_PREEMPTION;
@@ -301,7 +300,7 @@ public class TezUtilsInternal {
case COMMUNICATION_ERROR:
return TaskAttemptEndReason.COMMUNICATION_ERROR;
case SERVICE_BUSY:
- return TaskAttemptEndReason.SERVICE_BUSY;
+ return TaskAttemptEndReason.EXECUTOR_BUSY;
case INTERNAL_PREEMPTION:
return TaskAttemptEndReason.INTERNAL_PREEMPTION;
case EXTERNAL_PREEMPTION:
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 4fc541c..f1f683b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.api;
+import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.Map;
@@ -115,8 +116,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
*
* @param containerId the associated containerId
* @param endReason the end reason for the container completing
+ * @param diagnostics diagnostics associated with the container end
*/
- public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason);
+ public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+ @Nullable String diagnostics);
/**
* Register a task attempt to execute on a container
@@ -138,14 +141,15 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
/**
* Register the completion of a task. This may be a result of preemption, the container dying,
- * the
- * node dying, the task completing to success
+ * the node dying, the task completing to success
*
* @param taskAttemptID the task attempt which has completed / needs to be completed
* @param endReason the endReason for the task attempt.
+ * @param diagnostics diagnostics associated with the task end
*/
public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
- TaskAttemptEndReason endReason);
+ TaskAttemptEndReason endReason,
+ @Nullable String diagnostics);
/**
* Return the address, if any, that the service listens on
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 0a684e7..e81ba2b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -160,7 +160,7 @@ public interface TaskCommunicatorContext {
*
* @return the name of the currently executing dag
*/
- String getCurretnDagName();
+ String getCurrentDagName();
/**
* Get the name of the Input vertices for the specified vertex.
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index 2eec2fb..761bdb0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -34,9 +34,9 @@ public interface TaskAttemptListener {
void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
- void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason);
+ void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics);
- void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason);
+ void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics);
void dagComplete(DAG dag);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index ad6f2c4..2f6e93c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -176,7 +176,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
try {
Constructor<? extends TaskCommunicator> ctor =
taskCommClazz.getConstructor(TaskCommunicatorContext.class);
- ctor.setAccessible(true);
return ctor.newInstance(taskCommunicatorContext);
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new TezUncheckedException(e);
@@ -366,7 +365,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason) {
+ public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
}
@@ -374,7 +373,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
if (containerInfo.taskAttemptId != null) {
registeredAttempts.remove(containerInfo.taskAttemptId);
}
- taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason);
+ taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
}
@Override
@@ -408,7 +407,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason) {
+ public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
ContainerId containerId = registeredAttempts.remove(attemptId);
if (containerId == null) {
LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -422,7 +421,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, NULL_CONTAINER_INFO);
- taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason);
+ taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 0f10305..c56311c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -96,7 +97,13 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
@Override
public boolean isKnownContainer(ContainerId containerId) {
- return context.getAllContainers().get(containerId) != null;
+ AMContainer amContainer = context.getAllContainers().get(containerId);
+ if (amContainer == null ||
+ amContainer.getTaskCommunicatorIdentifier() != taskCommunicatorIndex) {
+ return false;
+ } else {
+ return true;
+ }
}
@Override
@@ -106,7 +113,9 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
@Override
public void containerAlive(ContainerId containerId) {
- taskAttemptListener.containerAlive(containerId);
+ if (isKnownContainer(containerId)) {
+ taskAttemptListener.containerAlive(containerId);
+ }
}
@Override
@@ -136,7 +145,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
}
@Override
- public String getCurretnDagName() {
+ public String getCurrentDagName() {
return getDag().getName();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index d3f1c44..9ecee5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -36,8 +36,12 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.common.*;
import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezConverterUtils;
+import org.apache.tez.common.TezLocalResource;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
@@ -199,7 +203,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
}
@Override
- public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
+ public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
ContainerInfo containerInfo = registeredContainers.remove(containerId);
if (containerInfo != null) {
synchronized(containerInfo) {
@@ -245,7 +249,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
@Override
- public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) {
TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
ContainerId containerId = attemptToContainerMap.remove(taskAttempt);
if(containerId == null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 17f5675..6b474ff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -82,7 +82,6 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
@@ -180,6 +179,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final AppContext appContext;
private final UserGroupInformation dagUGI;
private final ACLManager aclManager;
+ private final org.apache.tez.dag.api.Vertex.VertexExecutionContext defaultExecutionContext;
@VisibleForTesting
StateChangeNotifier entityUpdateTracker;
@@ -538,6 +538,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// this is only for recovery in case it does not call the init transition
this.startDAGCpuTime = appContext.getCumulativeCPUTime();
this.startDAGGCTime = appContext.getCumulativeGCTime();
+ if (jobPlan.hasDefaultExecutionContext()) {
+ defaultExecutionContext = DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
+ } else {
+ defaultExecutionContext = null;
+ }
this.taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(dagConf);
// This "this leak" is okay because the retained pointer is in an
@@ -718,11 +723,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
@Override
public org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext() {
- if (jobPlan.hasDefaultExecutionContext()) {
- return DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
- } else {
- return null;
- }
+ return defaultExecutionContext;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 65ea3fb..c6d8a7e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -227,7 +227,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminatedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.START_WAIT,
- TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILLED,
new TerminatedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.START_WAIT,
@@ -267,7 +267,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_KILL_REQUEST,
new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
- TaskAttemptStateInternal.KILL_IN_PROGRESS,
+ TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILLED,
new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING,
@@ -1095,7 +1095,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// Compute node/rack location request even if re-scheduled.
Set<String> racks = new HashSet<String>();
- // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define localicty for different attempts.
+ // TODO Post TEZ-2003. Allow for a policy in the VMPlugin to define locality for different attempts.
TaskLocationHint locationHint = ta.getTaskLocationHint();
if (locationHint != null) {
if (locationHint.getRacks() != null) {
@@ -1266,6 +1266,7 @@ public class TaskAttemptImpl implements TaskAttempt,
if (sendSchedulerEvent()) {
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
.getTaskAttemptState(), TezUtilsInternal.toTaskAttemptEndReason(ta.terminationCause),
+ ta instanceof DiagnosableEvent ? ((DiagnosableEvent)ta).getDiagnosticInfo() : null,
ta.getVertex().getTaskSchedulerIdentifier()));
}
}
@@ -1348,7 +1349,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// Inform the Scheduler.
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
- TaskAttemptState.SUCCEEDED, null, ta.getVertex().getTaskSchedulerIdentifier()));
+ TaskAttemptState.SUCCEEDED, null, null, ta.getVertex().getTaskSchedulerIdentifier()));
// Inform the task.
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index d0cee21..b56bd5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -70,7 +70,7 @@ public class ContainerLauncherRouter extends AbstractService
TaskAttemptListener taskAttemptListener,
String workingDirectory,
List<NamedEntityDescriptor> containerLauncherDescriptors,
- boolean isPureLocalMode) throws UnknownHostException {
+ boolean isPureLocalMode) {
super(ContainerLauncherRouter.class.getName());
this.appContext = context;
@@ -101,8 +101,7 @@ public class ContainerLauncherRouter extends AbstractService
TaskAttemptListener taskAttemptListener,
String workingDirectory,
int containerLauncherIndex,
- boolean isPureLocalMode) throws
- UnknownHostException {
+ boolean isPureLocalMode) {
if (containerLauncherDescriptor.getEntityName().equals(
TezConstants.getTezYarnServicePluginName())) {
return createYarnContainerLauncher(containerLauncherContext);
@@ -126,15 +125,18 @@ public class ContainerLauncherRouter extends AbstractService
AppContext context,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
- boolean isPureLocalMode) throws
- UnknownHostException {
+ boolean isPureLocalMode) {
LOG.info("Creating LocalContainerLauncher");
// TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
// extensive internals which are only available at runtime. Will likely require
// some kind of runtime binding of parameters in the payload to work correctly.
- return
- new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener,
- workingDirectory, isPureLocalMode);
+ try {
+ return
+ new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener,
+ workingDirectory, isPureLocalMode);
+ } catch (UnknownHostException e) {
+ throw new TezUncheckedException(e);
+ }
}
@VisibleForTesting
@@ -149,7 +151,6 @@ public class ContainerLauncherRouter extends AbstractService
try {
Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
.getConstructor(ContainerLauncherContext.class);
- ctor.setAccessible(true);
return ctor.newInstance(containerLauncherContext);
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new TezUncheckedException(e);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index 33763e7..ccc5465 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -29,15 +29,17 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
private final ContainerId containerId;
private final TaskAttemptState state;
private final TaskAttemptEndReason taskAttemptEndReason;
+ private final String diagnostics;
private final int schedulerId;
public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
- TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, int schedulerId) {
+ TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, String diagnostics, int schedulerId) {
super(AMSchedulerEventType.S_TA_ENDED);
this.attempt = attempt;
this.containerId = containerId;
this.state = state;
this.taskAttemptEndReason = taskAttemptEndReason;
+ this.diagnostics = diagnostics;
this.schedulerId = schedulerId;
}
@@ -64,4 +66,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
public TaskAttemptEndReason getTaskAttemptEndReason() {
return taskAttemptEndReason;
}
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index befde94..f77a9a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -131,7 +131,7 @@ public class LocalTaskSchedulerService extends TaskScheduler {
}
@Override
- public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
+ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) {
return taskRequestHandler.addDeallocateTaskRequest(task);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index f001909..7d2e768 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -286,7 +286,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
TaskAttempt attempt = event.getAttempt();
// Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
- .deallocateTask(attempt, false, event.getTaskAttemptEndReason());
+ .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
// use stored value of container id in case the scheduler has removed this
// assignment because the task has been deallocated earlier.
// retroactive case
@@ -331,7 +331,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
}
boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
- true, null);
+ true, null, event.getDiagnostics());
if (!wasContainerAllocated) {
LOG.error("De-allocated successful task: " + attempt.getID()
+ ", but TaskScheduler reported no container assigned to task");
@@ -436,7 +436,6 @@ public class TaskSchedulerEventHandler extends AbstractService implements
try {
Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
.getConstructor(TaskSchedulerContext.class);
- ctor.setAccessible(true);
return ctor.newInstance(taskSchedulerContext);
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new TezUncheckedException(e);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 940c5b0..64d0fd2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -980,7 +980,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
*/
@Override
public boolean deallocateTask(Object task, boolean taskSucceeded,
- TaskAttemptEndReason endReason) {
+ TaskAttemptEndReason endReason,
+ String diagnostics) {
Map<CookieContainerRequest, Container> assignedContainers = null;
synchronized (this) {
@@ -1170,7 +1171,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
CookieContainerRequest request = entry.getValue();
if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
LOG.info("Resending request for task again: " + task);
- deallocateTask(task, true, null);
+ deallocateTask(task, true, null, null);
allocateTask(task, request.getCapability(),
(request.getNodes() == null ? null :
request.getNodes().toArray(new String[request.getNodes().size()])),
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index aeacf84..99cec2b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -631,14 +631,14 @@ public class AMContainerImpl implements AMContainer {
SingleArcTransition<AMContainerImpl, AMContainerEvent> {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
+ AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
if (container.currentAttempt != null) {
- AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
// for a properly setup cluster this should almost always be an app error
// need to differentiate between launch failed due to framework/cluster or app
container.sendTerminatingToTaskAttempt(container.currentAttempt,
event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
}
- container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED);
+ container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED, event.getMessage());
container.deAllocate();
}
}
@@ -668,7 +668,7 @@ public class AMContainerImpl implements AMContainer {
}
container.containerLocalResources = null;
container.additionalLocalResources = null;
- container.unregisterFromTAListener(event.getContainerEndReason());
+ container.unregisterFromTAListener(event.getContainerEndReason(), event.getDiagnostics());
String diag = event.getDiagnostics();
if (!(diag == null || diag.equals(""))) {
LOG.info("Container " + container.getContainerId()
@@ -694,7 +694,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.currentAttempt,
getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
}
- container.unregisterFromTAListener(ContainerEndReason.OTHER);
+ container.unregisterFromTAListener(ContainerEndReason.OTHER, getMessage(container, cEvent));
container.logStopped(container.currentAttempt == null ?
ContainerExitStatus.SUCCESS
: ContainerExitStatus.INVALID);
@@ -746,7 +746,11 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
- container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED);
+ String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. ";
+ if (cEvent instanceof DiagnosableEvent) {
+ errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+ }
+ container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED, errorMessage);
container.deAllocate();
}
}
@@ -756,14 +760,15 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
+ String errorMessage = "Container " + container.getContainerId() +
+ " hit an invalid transition - " + cEvent.getType() + " at " +
+ container.getState();
if (container.currentAttempt != null) {
container.sendTerminatingToTaskAttempt(container.currentAttempt,
- "Container " + container.getContainerId() +
- " hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+ errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
container.logStopped(ContainerExitStatus.ABORTED);
- container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
+ container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR, errorMessage);
container.sendStopRequestToNM();
}
}
@@ -835,7 +840,12 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
- container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
+ String errorMessage = "AMScheduler Error: Multiple simultaneous " +
+ "taskAttempt allocations to: " + container.getContainerId() +
+ ". Attempts: " + container.getCurrentTaskAttempt() + ", " + event.getTaskAttemptId() +
+ ". Current state: " + container.getState();
+ container.unregisterAttemptFromListener(container.currentAttempt,
+ TaskAttemptEndReason.FRAMEWORK_ERROR, errorMessage);
container.handleExtraTAAssign(event, container.currentAttempt);
}
}
@@ -846,7 +856,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.lastTaskFinishTime = System.currentTimeMillis();
container.completedAttempts.add(container.currentAttempt);
- container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER, null);
container.currentAttempt = null;
}
}
@@ -863,7 +873,9 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatedToTaskAttempt(container.currentAttempt,
getMessage(container, event), event.getTerminationCause());
}
- container.unregisterAttemptFromListener(container.currentAttempt, TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()));
+ container.unregisterAttemptFromListener(container.currentAttempt,
+ TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()),
+ getMessage(container, event));
container.registerFailedAttempt(container.currentAttempt);
container.currentAttempt= null;
super.transition(container, cEvent);
@@ -873,7 +885,8 @@ public class AMContainerImpl implements AMContainer {
protected static class StopRequestAtRunningTransition
extends StopRequestAtIdleTransition {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
- container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER,
+ getMessage(container, cEvent));
super.transition(container, cEvent);
}
}
@@ -894,7 +907,8 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
- container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED);
+ String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. ";
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED, errorMessage);
}
}
@@ -903,11 +917,13 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
- container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
+ String errorMessage = "Container " + container.getContainerId() +
+ " hit an invalid transition - " + cEvent.getType() + " at " +
+ container.getState();
+ container.unregisterAttemptFromListener(container.currentAttempt,
+ TaskAttemptEndReason.FRAMEWORK_ERROR, errorMessage);
container.sendTerminatingToTaskAttempt(container.currentAttempt,
- "Container " + container.getContainerId() +
- " hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+ errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
}
@@ -1029,7 +1045,7 @@ public class AMContainerImpl implements AMContainer {
LOG.warn(errorMessage);
this.logStopped(ContainerExitStatus.INVALID);
this.sendStopRequestToNM();
- this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
+ this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR, errorMessage);
this.unregisterFromContainerListener();
}
@@ -1087,8 +1103,8 @@ public class AMContainerImpl implements AMContainer {
container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId));
}
- protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) {
- taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason);
+ protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason, String diagnostics) {
+ taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics);
}
protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
@@ -1099,8 +1115,8 @@ public class AMContainerImpl implements AMContainer {
taskAttemptListener.registerRunningContainer(containerId, taskCommId);
}
- protected void unregisterFromTAListener(ContainerEndReason endReason) {
- this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason);
+ protected void unregisterFromTAListener(ContainerEndReason endReason, String diagnostics) {
+ this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason, diagnostics);
}
protected void registerWithContainerListener() {
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 0668ff2..32e515b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -53,7 +53,7 @@ public class AMNodeTracker extends AbstractService implements
@SuppressWarnings("rawtypes")
public AMNodeTracker(EventHandler eventHandler, AppContext appContext) {
- super("AMNodeMap");
+ super("AMNodeTracker");
this.perSourceNodeTrackers = new ConcurrentHashMap<>();
this.eventHandler = eventHandler;
this.appContext = appContext;
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 4d404b9..5159aff 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -184,12 +184,12 @@ public class TestTaskAttemptListenerImplTezDag {
assertEquals(taskSpec, containerTask.getTaskSpec());
// Task unregistered. Should respond to heartbeats
- taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+ taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
containerTask = tezUmbilical.getTask(containerContext2);
assertNull(containerTask);
// Container unregistered. Should send a shouldDie = true
- taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER);
+ taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null);
containerTask = tezUmbilical.getTask(containerContext2);
assertTrue(containerTask.shouldDie());
@@ -203,7 +203,7 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
- taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER);
+ taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null);
containerTask = tezUmbilical.getTask(containerContext3);
assertTrue(containerTask.shouldDie());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
index abb5e42..74468f2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -41,7 +41,6 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
@@ -110,7 +109,7 @@ public class TestTaskAttemptListenerImplTezDag2 {
taskAttemptListener
.taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
taskAttemptListener
- .taskKilled(taskAttemptId2, TaskAttemptEndReason.SERVICE_BUSY, "Diagnostics2");
+ .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(2)).handle(argumentCaptor.capture());
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
new file mode 100644
index 0000000..1545eb4
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.junit.Test;
+
+public class TestTaskCommunicatorContextImpl {
+
+ @Test(timeout = 5000)
+ public void testIsKnownContainer() {
+ AppContext appContext = mock(AppContext.class);
+ TaskAttemptListenerImpTezDag tal = mock(TaskAttemptListenerImpTezDag.class);
+
+ AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), tal, mock(
+ ContainerSignatureMatcher.class), appContext);
+
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+
+ ContainerId containerId01 = mock(ContainerId.class);
+ Container container01 = mock(Container.class);
+ doReturn(containerId01).when(container01).getId();
+
+ ContainerId containerId11 = mock(ContainerId.class);
+ Container container11 = mock(Container.class);
+ doReturn(containerId11).when(container11).getId();
+
+ amContainerMap.addContainerIfNew(container01, 0, 0, 0);
+ amContainerMap.addContainerIfNew(container11, 1, 1, 1);
+
+ TaskCommunicatorContext taskCommContext0 = new TaskCommunicatorContextImpl(appContext, tal, null, 0);
+ TaskCommunicatorContext taskCommContext1 = new TaskCommunicatorContextImpl(appContext, tal, null, 1);
+
+ assertTrue(taskCommContext0.isKnownContainer(containerId01));
+ assertFalse(taskCommContext0.isKnownContainer(containerId11));
+
+ assertFalse(taskCommContext1.isKnownContainer(containerId01));
+ assertTrue(taskCommContext1.isKnownContainer(containerId11));
+
+ taskCommContext0.containerAlive(containerId01);
+ verify(tal).containerAlive(containerId01);
+ reset(tal);
+
+ taskCommContext0.containerAlive(containerId11);
+ verify(tal, never()).containerAlive(containerId11);
+ reset(tal);
+
+ taskCommContext1.containerAlive(containerId01);
+ verify(tal, never()).containerAlive(containerId01);
+ reset(tal);
+
+ taskCommContext1.containerAlive(containerId11);
+ verify(tal).containerAlive(containerId11);
+ reset(tal);
+
+ taskCommContext1.containerAlive(containerId01);
+ verify(tal, never()).containerAlive(containerId01);
+ reset(tal);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index c76aa50..4f68fab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -328,7 +328,7 @@ public class TestTaskCommunicatorManager {
}
@Override
- public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
+ public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
}
@@ -342,7 +342,7 @@ public class TestTaskCommunicatorManager {
@Override
public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
- TaskAttemptEndReason endReason) {
+ TaskAttemptEndReason endReason, String diagnostics) {
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 2bf1c85..947ea93 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -308,6 +308,7 @@ public class TestTaskAttempt {
resource, createFakeContainerContext(), false);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -351,6 +352,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -453,6 +455,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -519,6 +522,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -613,6 +617,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -745,6 +750,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -837,6 +843,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -933,6 +940,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -1037,6 +1045,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -1138,6 +1147,7 @@ public class TestTaskAttempt {
Resource resource = Resource.newInstance(1024, 1);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ @SuppressWarnings("deprecation")
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -1342,7 +1352,7 @@ public class TestTaskAttempt {
}
}
}
- };
+ }
private class MockTaskAttemptImpl extends TaskAttemptImpl {
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
index 62a5f19..d0caf8c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
@@ -273,8 +273,7 @@ public class TestContainerLauncherRouter {
TaskAttemptListener taskAttemptListener,
String workingDirectory,
int containerLauncherIndex,
- boolean isPureLocalMode) throws
- UnknownHostException {
+ boolean isPureLocalMode) {
numContainerLaunchers.incrementAndGet();
boolean added = containerLauncherIndices.add(containerLauncherIndex);
assertTrue("Cannot add multiple launchers with the same index", added);
@@ -298,8 +297,7 @@ public class TestContainerLauncherRouter {
AppContext context,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
- boolean isPureLocalMode) throws
- UnknownHostException {
+ boolean isPureLocalMode) {
uberContainerLauncherCreated.set(true);
testContainerLaunchers.add(uberContainerlauncher);
return uberContainerlauncher;