You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:47 UTC
[29/50] [abbrv] tez git commit: TEZ-2004. Define basic interface for
pluggable ContainerLaunchers. (sseth)
TEZ-2004. Define basic interface for pluggable ContainerLaunchers. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/af1cc723
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/af1cc723
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/af1cc723
Branch: refs/heads/TEZ-2003
Commit: af1cc7236f100bcc7efcf9b48aae24357fa851bd
Parents: a5dfca2
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jul 20 15:52:24 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:45 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../serviceplugins/api/ContainerEndReason.java | 31 +++++
.../api/ContainerLaunchRequest.java | 81 +++++++++++
.../serviceplugins/api/ContainerLauncher.java | 46 +++++++
.../api/ContainerLauncherContext.java | 54 ++++++++
.../api/ContainerLauncherOperationBase.java | 58 ++++++++
.../api/ContainerStopRequest.java | 47 +++++++
.../api/TaskAttemptEndReason.java | 32 +++++
.../org/apache/tez/common/TezUtilsInternal.java | 9 +-
.../apache/tez/dag/api/ContainerEndReason.java | 27 ----
.../tez/dag/api/TaskAttemptEndReason.java | 27 ----
.../apache/tez/dag/api/TaskCommunicator.java | 9 ++
.../tez/dag/api/TaskCommunicatorContext.java | 2 +-
.../tez/dag/api/TaskCommunicatorInterface.java | 18 +++
.../java/org/apache/tez/dag/app/AppContext.java | 5 +
.../dag/app/ContainerLauncherContextImpl.java | 101 ++++++++++++++
.../org/apache/tez/dag/app/DAGAppMaster.java | 15 +++
.../apache/tez/dag/app/TaskAttemptListener.java | 6 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 4 +-
.../dag/app/TaskCommunicatorContextImpl.java | 6 +-
.../tez/dag/app/TezTaskCommunicatorImpl.java | 12 +-
.../tez/dag/app/launcher/ContainerLauncher.java | 29 ----
.../dag/app/launcher/ContainerLauncherImpl.java | 128 ++++++++----------
.../app/launcher/ContainerLauncherRouter.java | 52 +++++++-
.../tez/dag/app/launcher/ContainerOp.java | 62 +++++++++
.../app/launcher/LocalContainerLauncher.java | 123 ++++++++---------
.../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 2 +-
.../dag/app/rm/LocalTaskSchedulerService.java | 2 +-
.../tez/dag/app/rm/NMCommunicatorEvent.java | 18 ++-
.../rm/NMCommunicatorLaunchRequestEvent.java | 11 +-
.../app/rm/NMCommunicatorStopRequestEvent.java | 4 +-
.../tez/dag/app/rm/TaskSchedulerService.java | 2 +-
.../dag/app/rm/YarnTaskSchedulerService.java | 2 +-
.../rm/container/AMContainerEventCompleted.java | 2 +-
.../dag/app/rm/container/AMContainerImpl.java | 9 +-
.../apache/tez/dag/app/MockDAGAppMaster.java | 63 ++++-----
.../app/TestTaskAttemptListenerImplTezDag.java | 4 +-
.../app/TestTaskAttemptListenerImplTezDag2.java | 2 +-
.../tez/dag/app/rm/TestContainerReuse.java | 4 +-
.../app/rm/TestLocalTaskSchedulerService.java | 1 -
.../dag/app/rm/container/TestAMContainer.java | 4 +-
.../TezTestServiceContainerLauncher.java | 133 +++++++++----------
.../TezTestServiceNoOpContainerLauncher.java | 53 +++-----
.../rm/TezTestServiceTaskSchedulerService.java | 2 +-
.../TezTestServiceTaskCommunicatorImpl.java | 4 +-
45 files changed, 887 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 604947c..88dd0c7 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -34,5 +34,6 @@ ALL CHANGES:
TEZ-2526. Fix version for tez-history-parser.
TEZ-2621. rebase 07/14
TEZ-2124. Change Node tracking to work per external container source.
+ TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java
new file mode 100644
index 0000000..ab8619f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerEndReason.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum ContainerEndReason {
+ NODE_FAILED, // Completed because the node running the container was marked as dead
+ INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+ EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+ APPLICATION_ERROR, // An error in the AM caused by user code
+ FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+ LAUNCH_FAILED, // Failure to launch the container
+ COMPLETED, // Completed via normal flow
+ OTHER
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
new file mode 100644
index 0000000..cfd7ca7
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
+
+ private final ContainerLaunchContext clc;
+ private final Container container;
+ private final String schedulerName;
+ private final String taskCommName;
+
+ public ContainerLaunchRequest(NodeId nodeId,
+ ContainerId containerId,
+ Token containerToken,
+ ContainerLaunchContext clc,
+ Container container, String schedulerName, String taskCommName) {
+ super(nodeId, containerId, containerToken);
+ this.clc = clc;
+ this.container = container;
+ this.schedulerName = schedulerName;
+ this.taskCommName = taskCommName;
+ }
+
+
+ // TODO Post TEZ-2003. TEZ-2625. ContainerLaunchContext needs to be built here instead of being passed in.
+ // Basic specifications need to be provided here
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return clc;
+ }
+
+ /**
+ * Get the name of the task communicator which will be used to communicate
+ * with the task that will run in this container.
+ * @return
+ */
+ public String getTaskCommunicatorName() {
+ return taskCommName;
+ }
+
+ /**
+ * Get the name of the scheduler which allocated this container.
+ * @return
+ */
+ public String getSchedulerName() {
+ return schedulerName;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerLaunchRequest{" +
+ "nodeId=" + getNodeId() +
+ ", containerId=" + getContainerId() +
+ ", clc=" + clc +
+ ", container=" + container +
+ ", schedulerName='" + schedulerName + '\'' +
+ ", taskCommName='" + taskCommName + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
new file mode 100644
index 0000000..218edb6
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Plugin to allow custom container launchers to be written to launch containers on different types
+ * of executors.
+ */
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class ContainerLauncher extends AbstractService {
+
+ private final ContainerLauncherContext containerLauncherContext;
+
+ // TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService.
+ // A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal
+ // configuration to the services if populated with the AM Configuration
+ public ContainerLauncher(String name, ContainerLauncherContext containerLauncherContext) {
+ super(name);
+ this.containerLauncherContext = containerLauncherContext;
+ }
+
+ public final ContainerLauncherContext getContext() {
+ return this.containerLauncherContext;
+ }
+
+ public abstract void launchContainer(ContainerLaunchRequest launchRequest);
+ public abstract void stopContainer(ContainerStopRequest stopRequest);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
new file mode 100644
index 0000000..836dc4a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ContainerLauncherContext {
+
+ // TODO Post TEZ-2003. Tez abstraction for ContainerId, NodeId, other YARN constructs
+
+ // Reporting APIs
+ void containerLaunched(ContainerId containerId);
+
+ void containerLaunchFailed(ContainerId containerId, String diagnostics);
+
+ void containerStopRequested(ContainerId containerId);
+
+ void containerStopFailed(ContainerId containerId, String diagnostics);
+
+ // TODO Post TEZ-2003. TaskAttemptEndReason does not belong here, and is an unnecessary leak.
+ // ContainerCompleted is normally generated by the scheduler in case of YARN since the RM informs about completion.
+ // For other sources, there may not be a central entity making this information available. The ContainerLauncher
+ // on the stop request will likely be the best place to generate it.
+ void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics, TaskAttemptEndReason endReason);
+
+ // Lookup APIs
+
+ // TODO TEZ-2003. To be replaced by getInitialPayload once the DAG API is changed.
+ Configuration getInitialConfiguration();
+
+ int getNumNodes(String sourceName);
+
+ ApplicationAttemptId getApplicationAttemptId();
+
+ Object getTaskCommunicatorMetaInfo(String taskCommName);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
new file mode 100644
index 0000000..29e0420
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerLauncherOperationBase {
+
+ private final NodeId nodeId;
+ private final ContainerId containerId;
+ private final Token containerToken;
+
+ public ContainerLauncherOperationBase(NodeId nodeId,
+ ContainerId containerId,
+ Token containerToken) {
+ this.nodeId = nodeId;
+ this.containerId = containerId;
+ this.containerToken = containerToken;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public Token getContainerToken() {
+ return containerToken;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerLauncherOperationBase{" +
+ "nodeId=" + nodeId +
+ ", containerId=" + containerId +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
new file mode 100644
index 0000000..cb0af31
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ContainerStopRequest extends ContainerLauncherOperationBase {
+
+ private final String schedulerName;
+ private final String taskCommName;
+
+ public ContainerStopRequest(NodeId nodeId,
+ ContainerId containerId,
+ Token containerToken, String schedulerName, String taskCommName) {
+ super(nodeId, containerId, containerToken);
+ this.schedulerName = schedulerName;
+ this.taskCommName = taskCommName;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerStopRequest{" +
+ "nodeId=" + getNodeId() +
+ ", containerId=" + getContainerId() +
+ ", schedulerName='" + schedulerName + '\'' +
+ ", taskCommName='" + taskCommName + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
new file mode 100644
index 0000000..4255c28
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskAttemptEndReason.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum TaskAttemptEndReason {
+ NODE_FAILED, // Completed because the node running the container was marked as dead
+ COMMUNICATION_ERROR, // Communication error with the task
+ SERVICE_BUSY, // External service busy
+ INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+ EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+ APPLICATION_ERROR, // An error in the AM caused by user code
+ FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+ CONTAINER_EXITED,
+ OTHER // Unknown reason
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 0bdeb79..4c8c227 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -35,13 +35,12 @@ import java.util.zip.Inflater;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.TextFormat;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Appender;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
@@ -256,6 +255,8 @@ public class TezUtilsInternal {
return TaskAttemptTerminationCause.FRAMEWORK_ERROR;
case NODE_FAILED:
return TaskAttemptTerminationCause.NODE_FAILED;
+ case CONTAINER_EXITED:
+ return TaskAttemptTerminationCause.CONTAINER_EXITED;
case OTHER:
return TaskAttemptTerminationCause.UNKNOWN_ERROR;
default:
@@ -283,6 +284,8 @@ public class TezUtilsInternal {
return TaskAttemptEndReason.FRAMEWORK_ERROR;
case NODE_FAILED:
return TaskAttemptEndReason.NODE_FAILED;
+ case CONTAINER_EXITED:
+ return TaskAttemptEndReason.CONTAINER_EXITED;
case INTERRUPTED_BY_SYSTEM:
case INTERRUPTED_BY_USER:
case UNKNOWN_ERROR:
@@ -296,7 +299,7 @@ public class TezUtilsInternal {
case OUTPUT_LOST:
case TASK_HEARTBEAT_ERROR:
case CONTAINER_LAUNCH_FAILED:
- case CONTAINER_EXITED:
+
case CONTAINER_STOPPED:
case NODE_DISK_ERROR:
default:
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
deleted file mode 100644
index e13e886..0000000
--- a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-// TODO TEZ-2003 Expose as a public API
-public enum ContainerEndReason {
- NODE_FAILED, // Completed because the node running the container was marked as dead
- INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
- EXTERNAL_PREEMPTION, // Preempted due to cluster contention
- APPLICATION_ERROR, // An error in the AM caused by user code
- FRAMEWORK_ERROR, // An error in the AM - likely a bug.
- LAUNCH_FAILED, // Failure to launch the container
- COMPLETED, // Completed via normal flow
- OTHER
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
deleted file mode 100644
index de78d21..0000000
--- a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-// TODO TEZ-2003 Expose as a public API
-public enum TaskAttemptEndReason {
- NODE_FAILED, // Completed because the node running the container was marked as dead
- COMMUNICATION_ERROR, // Communication error with the task
- SERVICE_BUSY, // External service busy
- INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
- EXTERNAL_PREEMPTION, // Preempted due to cluster contention
- APPLICATION_ERROR, // An error in the AM caused by user code
- FRAMEWORK_ERROR, // An error in the AM - likely a bug.
- OTHER // Unknown reason
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index d0a006b..05e437c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -87,4 +89,11 @@ public abstract class TaskCommunicator extends AbstractService {
// TODO TEZ-2003 This is extremely difficult to use. Add the dagStarted notification, and potentially
// throw exceptions between a dagComplete and dagStart invocation.
public abstract void dagComplete(String dagName);
+
+ /**
+ * Share meta-information such as host:port information where the Task Communicator may be listening.
+ * Primarily for use by compatible launchers to learn this information.
+ * @return
+ */
+ public abstract Object getMetaInfo();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 56345ab..b6e63f7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -16,13 +16,13 @@ package org.apache.tez.dag.api;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Collection;
import java.util.Set;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.records.TezTaskAttemptID;
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
new file mode 100644
index 0000000..022cd7b
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public interface TaskCommunicatorInterface {
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 1ccb10b..516fcef 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -118,4 +118,9 @@ public interface AppContext {
public Integer getTaskCommunicatorIdentifier(String name);
public Integer getTaskScheduerIdentifier(String name);
public Integer getContainerLauncherIdentifier(String name);
+
+ public String getTaskCommunicatorName(int taskCommId);
+ public String getTaskSchedulerName(int schedulerId);
+ public String getContainerLauncherName(int launcherId);
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
new file mode 100644
index 0000000..997775a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+
+public class ContainerLauncherContextImpl implements ContainerLauncherContext {
+
+ private final AppContext context;
+ private final TaskAttemptListener tal;
+
+ public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal) {
+ this.context = appContext;
+ this.tal = tal;
+ }
+
+ @Override
+ public void containerLaunched(ContainerId containerId) {
+ context.getEventHandler().handle(
+ new AMContainerEventLaunched(containerId));
+ ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+ containerId, context.getClock().getTime(), context.getApplicationAttemptId());
+ context.getHistoryHandler().handle(new DAGHistoryEvent(
+ null, lEvt));
+
+ }
+
+ @Override
+ public void containerLaunchFailed(ContainerId containerId, String diagnostics) {
+ context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, diagnostics));
+ }
+
+ @Override
+ public void containerStopRequested(ContainerId containerId) {
+ context.getEventHandler().handle(
+ new AMContainerEvent(containerId, AMContainerEventType.C_NM_STOP_SENT));
+ }
+
+ @Override
+ public void containerStopFailed(ContainerId containerId, String diagnostics) {
+ context.getEventHandler().handle(
+ new AMContainerEventStopFailed(containerId, diagnostics));
+ }
+
+ @Override
+ public void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics,
+ TaskAttemptEndReason endReason) {
+ context.getEventHandler().handle(new AMContainerEventCompleted(containerId, exitStatus, diagnostics, TezUtilsInternal
+ .fromTaskAttemptEndReason(
+ endReason)));
+ }
+
+ @Override
+ public Configuration getInitialConfiguration() {
+ return context.getAMConf();
+ }
+
+ @Override
+ public int getNumNodes(String sourceName) {
+ int sourceIndex = context.getTaskScheduerIdentifier(sourceName);
+ int numNodes = context.getNodeTracker().getNumNodes(sourceIndex);
+ return numNodes;
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return context.getApplicationAttemptId();
+ }
+
+ @Override
+ public Object getTaskCommunicatorMetaInfo(String taskCommName) {
+ int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName);
+ return tal.getTaskCommunicator(taskCommId).getMetaInfo();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 04e72db..d56fb95 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1551,6 +1551,21 @@ public class DAGAppMaster extends AbstractService {
}
@Override
+ public String getTaskCommunicatorName(int taskCommId) {
+ return taskCommunicators.inverse().get(taskCommId);
+ }
+
+ @Override
+ public String getTaskSchedulerName(int schedulerId) {
+ return taskSchedulers.inverse().get(schedulerId);
+ }
+
+ @Override
+ public String getContainerLauncherName(int launcherId) {
+ return containerLaunchers.inverse().get(launcherId);
+ }
+
+ @Override
public Map<ApplicationAccessType, String> getApplicationACLs() {
if (getServiceState() != STATE.STARTED) {
throw new TezUncheckedException(
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index 92e38ae..2eec2fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -18,11 +18,9 @@
package org.apache.tez.dag.app;
-import java.net.InetSocketAddress;
-
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index e2d44e2..47b63dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.ListUtils;
-import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -43,7 +43,7 @@ import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 790066f..50e006d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -17,10 +17,6 @@ package org.apache.tez.dag.app;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
@@ -30,7 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 83322f2..0374022 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -41,8 +41,8 @@ import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -180,7 +180,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
@Override
public void registerRunningContainer(ContainerId containerId, String host, int port) {
- ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId, new ContainerInfo(containerId, host, port));
+ ContainerInfo oldInfo = registeredContainers.putIfAbsent(containerId,
+ new ContainerInfo(containerId, host, port));
if (oldInfo != null) {
throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
}
@@ -267,6 +268,11 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
// Nothing to do at the moment. Some of the TODOs from TaskAttemptListener apply here.
}
+ @Override
+ public Object getMetaInfo() {
+ return address;
+ }
+
protected String getTokenIdentifier() {
return tokenIdentifier;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
deleted file mode 100644
index ea07a1d..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncher.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.launcher;
-
-
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-
-public interface ContainerLauncher
- extends EventHandler<NMCommunicatorEvent> {
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index a12fb04..fe0178c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -30,12 +30,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.apache.tez.dag.api.TezConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -45,57 +48,43 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
-import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-// TODO XXX: See what part of this lifecycle and state management can be simplified.
+// TODO See what part of this lifecycle and state management can be simplified.
// Ideally, no state - only sendStart / sendStop.
-// TODO XXX: Review this entire code and clean it up.
+// TODO Review this entire code and clean it up.
/**
* This class is responsible for launching of containers.
*/
-public class ContainerLauncherImpl extends AbstractService implements
- ContainerLauncher {
+public class ContainerLauncherImpl extends ContainerLauncher {
- // TODO XXX Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
+ // TODO Ensure the same thread is used to launch / stop the same container. Or - ensure event ordering.
static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
- private ConcurrentHashMap<ContainerId, Container> containers =
- new ConcurrentHashMap<ContainerId, Container>();
- private AppContext context;
+ private final ConcurrentHashMap<ContainerId, Container> containers =
+ new ConcurrentHashMap<>();
protected ThreadPoolExecutor launcherPool;
protected static final int INITIAL_POOL_SIZE = 10;
- private int limitOnPoolSize;
+ private final int limitOnPoolSize;
+ private final Configuration conf;
private Thread eventHandlingThread;
- protected BlockingQueue<NMCommunicatorEvent> eventQueue =
- new LinkedBlockingQueue<NMCommunicatorEvent>();
- private Clock clock;
+ protected BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
private ContainerManagementProtocolProxy cmProxy;
private AtomicBoolean serviceStopped = new AtomicBoolean(false);
- private Container getContainer(NMCommunicatorEvent event) {
- ContainerId id = event.getContainerId();
+ private Container getContainer(ContainerOp event) {
+ ContainerId id = event.getBaseOperation().getContainerId();
Container c = containers.get(id);
if(c == null) {
- c = new Container(event.getContainerId(),
- event.getNodeId().toString(), event.getContainerToken());
+ c = new Container(id,
+ event.getBaseOperation().getNodeId().toString(), event.getBaseOperation().getContainerToken());
Container old = containers.putIfAbsent(id, c);
if(old != null) {
c = old;
@@ -111,6 +100,7 @@ public class ContainerLauncherImpl extends AbstractService implements
}
}
+
private static enum ContainerState {
PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
}
@@ -135,7 +125,7 @@ public class ContainerLauncherImpl extends AbstractService implements
}
@SuppressWarnings("unchecked")
- public synchronized void launch(NMCommunicatorLaunchRequestEvent event) {
+ public synchronized void launch(ContainerLaunchRequest event) {
LOG.info("Launching Container with Id: " + event.getContainerId());
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
@@ -171,13 +161,7 @@ public class ContainerLauncherImpl extends AbstractService implements
// after launching, send launched event to task attempt to move
// it from ASSIGNED to RUNNING state
- context.getEventHandler().handle(
- new AMContainerEventLaunched(containerID));
- ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
- containerID, clock.getTime(), context.getApplicationAttemptId());
- context.getHistoryHandler().handle(new DAGHistoryEvent(
- null, lEvt));
-
+ getContext().containerLaunched(containerID);
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
String message = "Container launch failed for " + containerID + " : "
@@ -217,16 +201,14 @@ public class ContainerLauncherImpl extends AbstractService implements
// If stopContainer returns without an error, assuming the stop made
// it over to the NodeManager.
- context.getEventHandler().handle(
- new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+ getContext().containerStopRequested(containerID);
} catch (Throwable t) {
// ignore the cleanup failure
String message = "cleanup failed for container "
+ this.containerID + " : "
+ ExceptionUtils.getStackTrace(t);
- context.getEventHandler().handle(
- new AMContainerEventStopFailed(containerID, message));
+ getContext().containerStopFailed(containerID, message);
LOG.warn(message);
this.state = ContainerState.DONE;
return;
@@ -240,15 +222,9 @@ public class ContainerLauncherImpl extends AbstractService implements
}
}
- public ContainerLauncherImpl(AppContext context) {
- super(ContainerLauncherImpl.class.getName());
- this.context = context;
- this.clock = context.getClock();
- }
-
- @Override
- public synchronized void serviceInit(Configuration config) {
- Configuration conf = new Configuration(config);
+ public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
+ super(ContainerLauncherImpl.class.getName(), containerLauncherContext);
+ this.conf = new Configuration(containerLauncherContext.getInitialConfiguration());
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
@@ -262,7 +238,7 @@ public class ContainerLauncherImpl extends AbstractService implements
public void serviceStart() {
// pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
cmProxy =
- new ContainerManagementProtocolProxy(new Configuration(getConfig()));
+ new ContainerManagementProtocolProxy(conf);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"ContainerLauncher #%d").setDaemon(true).build();
@@ -275,7 +251,7 @@ public class ContainerLauncherImpl extends AbstractService implements
eventHandlingThread = new Thread() {
@Override
public void run() {
- NMCommunicatorEvent event = null;
+ ContainerOp event = null;
while (!Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
@@ -293,9 +269,8 @@ public class ContainerLauncherImpl extends AbstractService implements
// nodes where containers will run at *this* point of time. This is
// *not* the cluster size and doesn't need to be.
- int yarnSourceIndex =
- context.getTaskScheduerIdentifier(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
- int numNodes = context.getNodeTracker().getNumNodes(yarnSourceIndex);
+ int numNodes = getContext().getNumNodes(
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
if (poolSize < idealPoolSize) {
@@ -347,7 +322,7 @@ public class ContainerLauncherImpl extends AbstractService implements
}
}
- protected EventProcessor createEventProcessor(NMCommunicatorEvent event) {
+ protected EventProcessor createEventProcessor(ContainerOp event) {
return new EventProcessor(event);
}
@@ -361,32 +336,29 @@ public class ContainerLauncherImpl extends AbstractService implements
* Setup and start the container on remote nodemanager.
*/
class EventProcessor implements Runnable {
- private NMCommunicatorEvent event;
+ private ContainerOp event;
- EventProcessor(NMCommunicatorEvent event) {
+ EventProcessor(ContainerOp event) {
this.event = event;
}
@Override
public void run() {
- LOG.info("Processing the event " + event.toString());
+ LOG.info("Processing operation {}", event.toString());
// Load ContainerManager tokens before creating a connection.
// TODO: Do it only once per NodeManager.
- ContainerId containerID = event.getContainerId();
+ ContainerId containerID = event.getBaseOperation().getContainerId();
Container c = getContainer(event);
- switch(event.getType()) {
-
- case CONTAINER_LAUNCH_REQUEST:
- NMCommunicatorLaunchRequestEvent launchEvent
- = (NMCommunicatorLaunchRequestEvent) event;
- c.launch(launchEvent);
- break;
-
- case CONTAINER_STOP_REQUEST:
- c.kill();
- break;
+ switch(event.getOpType()) {
+ case LAUNCH_REQUEST:
+ ContainerLaunchRequest launchRequest = event.getLaunchRequest();
+ c.launch(launchRequest);
+ break;
+ case STOP_REQUEST:
+ c.kill();
+ break;
}
removeContainerIfDone(containerID);
}
@@ -408,13 +380,23 @@ public class ContainerLauncherImpl extends AbstractService implements
void sendContainerLaunchFailedMsg(ContainerId containerId,
String message) {
LOG.error(message);
- context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
+ getContext().containerLaunchFailed(containerId, message);
+ }
+
+
+ @Override
+ public void launchContainer(ContainerLaunchRequest launchRequest) {
+ try {
+ eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException(e);
+ }
}
@Override
- public void handle(NMCommunicatorEvent event) {
+ public void stopContainer(ContainerStopRequest stopRequest) {
try {
- eventQueue.put(event);
+ eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
} catch (InterruptedException e) {
throw new TezUncheckedException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index db145f4..9f741cf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -23,12 +23,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerLauncherContextImpl;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,11 +44,15 @@ public class ContainerLauncherRouter extends AbstractService
static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
private final ContainerLauncher containerLaunchers[];
+ private final ContainerLauncherContext containerLauncherContexts[];
+ private final AppContext appContext;
@VisibleForTesting
- public ContainerLauncherRouter(ContainerLauncher containerLauncher) {
+ public ContainerLauncherRouter(ContainerLauncher containerLauncher, AppContext context) {
super(ContainerLauncherRouter.class.getName());
+ this.appContext = context;
containerLaunchers = new ContainerLauncher[] {containerLauncher};
+ containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
}
// Accepting conf to setup final parameters, if required.
@@ -53,6 +63,7 @@ public class ContainerLauncherRouter extends AbstractService
boolean isPureLocalMode) throws UnknownHostException {
super(ContainerLauncherRouter.class.getName());
+ this.appContext = context;
if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) {
if (isPureLocalMode) {
containerLauncherClassIdentifiers =
@@ -62,16 +73,21 @@ public class ContainerLauncherRouter extends AbstractService
new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
}
}
+ containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length];
containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
+
for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
+ ContainerLauncherContext containerLauncherContext = new ContainerLauncherContextImpl(context, taskAttemptListener);
+ containerLauncherContexts[i] = containerLauncherContext;
containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
- taskAttemptListener, workingDirectory, isPureLocalMode, conf);
+ containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
}
}
private ContainerLauncher createContainerLauncher(String containerLauncherClassIdentifier,
AppContext context,
+ ContainerLauncherContext containerLauncherContext,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
boolean isPureLocalMode,
@@ -79,12 +95,15 @@ public class ContainerLauncherRouter extends AbstractService
UnknownHostException {
if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Creating DefaultContainerLauncher");
- return new ContainerLauncherImpl(context);
+ return new ContainerLauncherImpl(containerLauncherContext);
} else if (containerLauncherClassIdentifier
.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Creating LocalContainerLauncher");
+ // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of
+ // extensive internals which are only available at runtime. Will likely require
+ // some kind of runtime binding of parameters in the payload to work correctly.
return
- new LocalContainerLauncher(context, taskAttemptListener, workingDirectory, isPureLocalMode);
+ new LocalContainerLauncher(containerLauncherContext, context, taskAttemptListener, workingDirectory, isPureLocalMode);
} else {
LOG.info("Creating container launcher : " + containerLauncherClassIdentifier);
Class<? extends ContainerLauncher> containerLauncherClazz =
@@ -92,9 +111,9 @@ public class ContainerLauncherRouter extends AbstractService
containerLauncherClassIdentifier);
try {
Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
- .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+ .getConstructor(ContainerLauncherContext.class);
ctor.setAccessible(true);
- return ctor.newInstance(context, conf, taskAttemptListener);
+ return ctor.newInstance(containerLauncherContext);
} catch (NoSuchMethodException e) {
throw new TezUncheckedException(e);
} catch (InvocationTargetException e) {
@@ -141,6 +160,25 @@ public class ContainerLauncherRouter extends AbstractService
@Override
public void handle(NMCommunicatorEvent event) {
- containerLaunchers[event.getLauncherId()].handle(event);
+ int launcherId = event.getLauncherId();
+ String schedulerName = appContext.getTaskSchedulerName(event.getSchedulerId());
+ String taskCommName = appContext.getTaskCommunicatorName(event.getTaskCommId());
+ switch (event.getType()) {
+ case CONTAINER_LAUNCH_REQUEST:
+ NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
+ ContainerLaunchRequest launchRequest =
+ new ContainerLaunchRequest(launchEvent.getNodeId(), launchEvent.getContainerId(),
+ launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
+ launchEvent.getContainer(), schedulerName,
+ taskCommName);
+ containerLaunchers[launcherId].launchContainer(launchRequest);
+ break;
+ case CONTAINER_STOP_REQUEST:
+ ContainerStopRequest stopRequest =
+ new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
+ event.getContainerToken(), schedulerName, taskCommName);
+ containerLaunchers[launcherId].stopContainer(stopRequest);
+ break;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java
new file mode 100644
index 0000000..c62de66
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerOp.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncherOperationBase;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+
+@InterfaceAudience.Private
+public class ContainerOp {
+ enum OPType {
+ LAUNCH_REQUEST, STOP_REQUEST
+ }
+
+ final ContainerLauncherOperationBase command;
+ final OPType opType;
+
+ public ContainerOp(OPType opType, ContainerLauncherOperationBase command) {
+ this.opType = opType;
+ this.command = command;
+ }
+
+ public OPType getOpType() {
+ return opType;
+ }
+
+ public ContainerLauncherOperationBase getBaseOperation() {
+ return command;
+ }
+
+ public ContainerLaunchRequest getLaunchRequest() {
+ Preconditions.checkState(opType == OPType.LAUNCH_REQUEST);
+ return (ContainerLaunchRequest) command;
+ }
+
+ public ContainerStopRequest getStopRequest() {
+ Preconditions.checkState(opType == OPType.STOP_REQUEST);
+ return (ContainerStopRequest) command;
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerOp{" +
+ "opType=" + opType +
+ ", command=" + command +
+ '}';
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index fe23409..a1b8e29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -44,11 +44,15 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
@@ -60,17 +64,6 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
@@ -82,17 +75,17 @@ import org.apache.tez.runtime.task.TezChild;
* Since all (sub)tasks share the same local directory, they must be executed
* sequentially in order to avoid creating/deleting the same files/dirs.
*/
-public class LocalContainerLauncher extends AbstractService implements
- ContainerLauncher {
+public class LocalContainerLauncher extends ContainerLauncher {
private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
+
private final AppContext context;
private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
private final String workingDirectory;
private final TaskAttemptListener tal;
private final Map<String, String> localEnv;
private final ExecutionContext executionContext;
- private int numExecutors;
+ private final int numExecutors;
private final boolean isPureLocalMode;
private final ConcurrentHashMap<ContainerId, RunningTaskCallback>
@@ -102,23 +95,25 @@ public class LocalContainerLauncher extends AbstractService implements
private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("CallbackExecutor").build());
- private BlockingQueue<NMCommunicatorEvent> eventQueue =
- new LinkedBlockingQueue<NMCommunicatorEvent>();
+ private BlockingQueue<ContainerOp> eventQueue = new LinkedBlockingQueue<>();
private Thread eventHandlingThread;
private ListeningExecutorService taskExecutorService;
-
- public LocalContainerLauncher(AppContext context,
+ public LocalContainerLauncher(ContainerLauncherContext containerLauncherContext,
+ AppContext context,
TaskAttemptListener taskAttemptListener,
String workingDirectory,
boolean isPureLocalMode) throws UnknownHostException {
- super(LocalContainerLauncher.class.getName());
+ // TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM
+ // starts up. It's not possible to set these up via a static payload.
+ // Will need some kind of mechanism to dynamically crate payloads / bind to parameters
+ // after the AM starts up.
+ super(LocalContainerLauncher.class.getName(), containerLauncherContext);
this.context = context;
this.tal = taskAttemptListener;
-
this.workingDirectory = workingDirectory;
this.isPureLocalMode = isPureLocalMode;
if (isPureLocalMode) {
@@ -133,11 +128,8 @@ public class LocalContainerLauncher extends AbstractService implements
String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() :
System.getenv(Environment.NM_HOST.name());
executionContext = new ExecutionContextImpl(host);
- }
- @Override
- public synchronized void serviceInit(Configuration conf) {
- numExecutors = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+ numExecutors = getContext().getInitialConfiguration().getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
Preconditions.checkState(numExecutors >=1, "Must have at least 1 executor");
ExecutorService rawExecutor = Executors.newFixedThreadPool(numExecutors,
@@ -169,20 +161,22 @@ public class LocalContainerLauncher extends AbstractService implements
callbackExecutor.shutdownNow();
}
+
+
// Thread to monitor the queue of incoming NMCommunicator events
private class TezSubTaskRunner implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted() && !serviceStopped.get()) {
- NMCommunicatorEvent event;
+ ContainerOp event;
try {
event = eventQueue.take();
- switch (event.getType()) {
- case CONTAINER_LAUNCH_REQUEST:
- launch((NMCommunicatorLaunchRequestEvent) event);
+ switch (event.getOpType()) {
+ case LAUNCH_REQUEST:
+ launch(event.getLaunchRequest());
break;
- case CONTAINER_STOP_REQUEST:
- stop((NMCommunicatorStopRequestEvent)event);
+ case STOP_REQUEST:
+ stop(event.getStopRequest());
break;
}
} catch (InterruptedException e) {
@@ -200,7 +194,7 @@ public class LocalContainerLauncher extends AbstractService implements
@SuppressWarnings("unchecked")
void sendContainerLaunchFailedMsg(ContainerId containerId, String message) {
- context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
+ getContext().containerLaunchFailed(containerId, message);
}
private void handleLaunchFailed(Throwable t, ContainerId containerId) {
@@ -215,16 +209,17 @@ public class LocalContainerLauncher extends AbstractService implements
}
//launch tasks
- private void launch(NMCommunicatorLaunchRequestEvent event) {
+ private void launch(ContainerLaunchRequest event) {
String tokenIdentifier = context.getApplicationID().toString();
try {
TezChild tezChild;
try {
+ int taskCommId = context.getTaskCommunicatorIdentifier(event.getTaskCommunicatorName());
tezChild =
createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
- ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(event.getTaskCommId())).getUmbilical(),
+ ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId)).getUmbilical(),
TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
} catch (InterruptedException e) {
handleLaunchFailed(e, event.getContainerId());
@@ -238,7 +233,7 @@ public class LocalContainerLauncher extends AbstractService implements
}
ListenableFuture<TezChild.ContainerExecutionResult> runningTaskFuture =
taskExecutorService.submit(createSubTask(tezChild, event.getContainerId()));
- RunningTaskCallback callback = new RunningTaskCallback(context, event.getContainerId());
+ RunningTaskCallback callback = new RunningTaskCallback(event.getContainerId());
runningContainers.put(event.getContainerId(), callback);
Futures.addCallback(runningTaskFuture, callback, callbackExecutor);
} catch (RejectedExecutionException e) {
@@ -246,7 +241,7 @@ public class LocalContainerLauncher extends AbstractService implements
}
}
- private void stop(NMCommunicatorStopRequestEvent event) {
+ private void stop(ContainerStopRequest event) {
// A stop_request will come in when a task completes and reports back or a preemption decision
// is made. Currently the LocalTaskScheduler does not support preemption. Also preemption
// will not work in local mode till Tez supports task preemption instead of container preemption.
@@ -263,18 +258,15 @@ public class LocalContainerLauncher extends AbstractService implements
// This will need to be fixed once interrupting tasks is supported.
}
// Send this event to maintain regular control flow. This isn't of much use though.
- context.getEventHandler().handle(
- new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
+ getContext().containerStopRequested(event.getContainerId());
}
private class RunningTaskCallback
implements FutureCallback<TezChild.ContainerExecutionResult> {
- private final AppContext appContext;
private final ContainerId containerId;
- RunningTaskCallback(AppContext appContext, ContainerId containerId) {
- this.appContext = appContext;
+ RunningTaskCallback(ContainerId containerId) {
this.containerId = containerId;
}
@@ -286,16 +278,16 @@ public class LocalContainerLauncher extends AbstractService implements
result.getExitStatus() ==
TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE) {
LOG.info("Container: " + containerId + " completed successfully");
- appContext.getEventHandler().handle(
- new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
- null, TaskAttemptTerminationCause.CONTAINER_EXITED));
+ getContext()
+ .containerCompleted(containerId, result.getExitStatus().getExitCode(), null,
+ TaskAttemptEndReason.CONTAINER_EXITED);
} else {
LOG.info("Container: " + containerId + " completed but with errors");
- appContext.getEventHandler().handle(
- new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
- result.getErrorMessage() == null ?
- (result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
- result.getErrorMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
+ getContext().containerCompleted(
+ containerId, result.getExitStatus().getExitCode(),
+ result.getErrorMessage() == null ?
+ (result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
+ result.getErrorMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
}
}
@@ -307,16 +299,14 @@ public class LocalContainerLauncher extends AbstractService implements
if (!(t instanceof CancellationException)) {
LOG.info("Container: " + containerId + ": Execution Failed: ", t);
// Inform of failure with exit code 1.
- appContext.getEventHandler()
- .handle(new AMContainerEventCompleted(containerId,
- TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
- t.getMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
+ getContext().containerCompleted(containerId,
+ TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
+ t.getMessage(), TaskAttemptEndReason.APPLICATION_ERROR);
} else {
LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher");
- appContext.getEventHandler()
- .handle(new AMContainerEventCompleted(containerId,
- TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
- "CancellationException", TaskAttemptTerminationCause.CONTAINER_EXITED));
+ getContext().containerCompleted(containerId,
+ TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
+ "CancellationException", TaskAttemptEndReason.COMMUNICATION_ERROR.CONTAINER_EXITED);
}
}
}
@@ -334,12 +324,7 @@ public class LocalContainerLauncher extends AbstractService implements
// TezTaskRunner needs to be fixed to ensure this.
Thread.interrupted();
// Inform about the launch request now that the container has been allocated a thread to execute in.
- context.getEventHandler().handle(new AMContainerEventLaunched(containerId));
- ContainerLaunchedEvent lEvt =
- new ContainerLaunchedEvent(containerId, context.getClock().getTime(),
- context.getApplicationAttemptId());
-
- context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(), lEvt));
+ getContext().containerLaunched(containerId);
return tezChild.run();
}
};
@@ -368,11 +353,19 @@ public class LocalContainerLauncher extends AbstractService implements
}
+ @Override
+ public void launchContainer(ContainerLaunchRequest launchRequest) {
+ try {
+ eventQueue.put(new ContainerOp(ContainerOp.OPType.LAUNCH_REQUEST, launchRequest));
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
@Override
- public void handle(NMCommunicatorEvent event) {
+ public void stopContainer(ContainerStopRequest stopRequest) {
try {
- eventQueue.put(event);
+ eventQueue.put(new ContainerOp(ContainerOp.OPType.STOP_REQUEST, stopRequest));
} catch (InterruptedException e) {
throw new TezUncheckedException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index a775948..33763e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -18,7 +18,7 @@
package org.apache.tez.dag.app.rm;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.records.TezTaskAttemptID;
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index a234e07..ef789c5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
index f86894f..dc50c37 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
@@ -29,14 +29,19 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
private final NodeId nodeId;
private final Token containerToken;
private final int launcherId;
+ private final int schedulerId;
+ private final int taskCommId;
public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
- Token containerToken, NMCommunicatorEventType type, int launcherId) {
+ Token containerToken, NMCommunicatorEventType type, int launcherId,
+ int schedulerId, int taskCommId) {
super(type);
this.containerId = containerId;
this.nodeId = nodeId;
this.containerToken = containerToken;
this.launcherId = launcherId;
+ this.schedulerId = schedulerId;
+ this.taskCommId = taskCommId;
}
public ContainerId getContainerId() {
@@ -55,9 +60,18 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
return launcherId;
}
+ public int getSchedulerId() {
+ return schedulerId;
+ }
+
+ public int getTaskCommId() {
+ return taskCommId;
+ }
+
public String toSrting() {
return super.toString() + " for container " + containerId + ", nodeId: "
- + nodeId + ", launcherId: " + launcherId;
+ + nodeId + ", launcherId: " + launcherId + ", schedulerId=" + schedulerId +
+ ", taskCommId=" + taskCommId;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
index a38345c..c57b6be 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
@@ -26,15 +26,14 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
private final ContainerLaunchContext clc;
private final Container container;
// The task communicator index for the specific container being launched.
- private final int taskCommId;
public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
- Container container, int launcherId, int taskCommId) {
+ Container container, int launcherId, int schedulerId, int taskCommId) {
super(container.getId(), container.getNodeId(), container
- .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, launcherId);
+ .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST,
+ launcherId, schedulerId, taskCommId);
this.clc = clc;
this.container = container;
- this.taskCommId = taskCommId;
}
public ContainerLaunchContext getContainerLaunchContext() {
@@ -45,10 +44,6 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
return container;
}
- public int getTaskCommId() {
- return taskCommId;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/tez/blob/af1cc723/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
index c9b5c44..352f450 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.Token;
public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
- Token containerToken, int launcherId) {
+ Token containerToken, int launcherId, int schedulerId, int taskCommId) {
super(containerId, nodeId, containerToken,
- NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId);
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId, schedulerId, taskCommId);
}
}