You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/01/13 00:20:06 UTC
[3/3] tez git commit: TEZ-2669. Propagation of errors from plugins to
the AM for error reporting. Contributed by Hitesh Shah and Siddharth Seth.
TEZ-2669. Propagation of errors from plugins to the AM for error
reporting. Contributed by Hitesh Shah and Siddharth Seth.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1d765431
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1d765431
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1d765431
Branch: refs/heads/master
Commit: 1d765431601fb8ab7cca248baa973684d828afaa
Parents: 0c08577
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jan 12 15:19:22 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jan 12 15:19:22 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../serviceplugins/api/ContainerLauncher.java | 12 +-
.../api/ContainerLauncherContext.java | 3 +-
.../api/ServicePluginException.java | 36 +++
.../tez/serviceplugins/api/TaskScheduler.java | 48 +++-
tez-dag/src/main/java/org/apache/tez/Utils.java | 66 +++++
.../apache/tez/dag/api/TaskCommunicator.java | 39 ++-
.../dag/app/ContainerLauncherContextImpl.java | 18 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 68 +++--
.../dag/app/TaskCommunicatorContextImpl.java | 6 +-
.../tez/dag/app/TaskCommunicatorManager.java | 151 +++++++++--
.../app/TaskCommunicatorManagerInterface.java | 3 +-
.../tez/dag/app/TaskCommunicatorWrapper.java | 83 ++++++
.../tez/dag/app/TezTaskCommunicatorImpl.java | 2 +-
.../tez/dag/app/dag/StateChangeNotifier.java | 7 +-
.../app/dag/event/DAGAppMasterEventType.java | 3 +
.../DAGAppMasterEventUserServiceFatalError.java | 46 ++++
.../app/dag/event/DAGEventInternalError.java | 32 +++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 22 +-
.../tez/dag/app/dag/impl/VertexManager.java | 8 +-
.../app/launcher/ContainerLauncherManager.java | 49 +++-
.../app/launcher/ContainerLauncherWrapper.java | 40 +++
.../app/launcher/LocalContainerLauncher.java | 2 +-
.../tez/dag/app/rm/TaskSchedulerManager.java | 271 ++++++++++++++++---
.../tez/dag/app/rm/TaskSchedulerWrapper.java | 90 ++++++
.../dag/app/rm/container/AMContainerImpl.java | 30 +-
.../apache/tez/dag/app/MockDAGAppMaster.java | 2 +-
.../tez/dag/app/PluginWrapperTestHelpers.java | 149 ++++++++++
.../dag/app/TestTaskCommunicatorManager.java | 99 ++++++-
.../dag/app/TestTaskCommunicatorManager1.java | 6 +-
.../dag/app/TestTaskCommunicatorWrapper.java | 43 +++
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 9 +-
.../launcher/TestContainerLauncherManager.java | 83 +++++-
.../launcher/TestContainerLauncherWrapper.java | 30 ++
.../tez/dag/app/rm/TestContainerReuse.java | 8 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 14 +-
.../dag/app/rm/TestTaskSchedulerManager.java | 106 +++++++-
.../dag/app/rm/TestTaskSchedulerWrapper.java | 29 ++
.../dag/app/rm/container/TestAMContainer.java | 10 +-
.../app/rm/container/TestAMContainerMap.java | 3 +-
.../org/apache/tez/examples/JoinValidate.java | 4 +-
...zTestServiceContainerLauncherWithErrors.java | 37 +++
...stServiceTaskSchedulerServiceWithErrors.java | 93 +++++++
...ezTestServiceTaskCommunicatorWithErrors.java | 83 ++++++
.../tez/examples/JoinValidateConfigured.java | 10 +
.../tez/tests/ExternalTezServiceTestHelper.java | 194 +++++++++++++
.../tez/tests/TestExternalTezServices.java | 125 +--------
.../tests/TestExternalTezServicesErrors.java | 235 ++++++++++++++++
48 files changed, 2231 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6cdc037..d39cbb8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-2669. Propagation of errors from plugins to the AM for error reporting.
TEZ-2978. Add an option to allow the SplitGrouper to generate node local only groups.
TEZ-2129. Task and Attempt views should contain links to the logs
TEZ-3025. InputInitializer creation should use the dag ugi.
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
index 5a77b69..8792fd7 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -74,16 +74,22 @@ public abstract class ContainerLauncher implements ServicePluginLifecycle {
}
/**
- * A request to launch the specified container
+ * Get the {@link ContainerLauncherContext} associated with this instance of the container
+ * launcher, which is used to communicate with the rest of the system
*
* @param launchRequest the actual launch request
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void launchContainer(ContainerLaunchRequest launchRequest);
+ public abstract void launchContainer(ContainerLaunchRequest launchRequest) throws
+ ServicePluginException;
/**
* A request to stop a specific container
*
* @param stopRequest the actual stop request
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void stopContainer(ContainerStopRequest stopRequest);
+ public abstract void stopContainer(ContainerStopRequest stopRequest) throws ServicePluginException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
index dcd9e80..70a3498 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -99,12 +99,13 @@ public interface ContainerLauncherContext {
ApplicationAttemptId getApplicationAttemptId();
/**
- * Get meta info from the specified TaskCommunicator. This assumes that the launched has been
+ * Get meta info from the specified TaskCommunicator. This assumes that the launcher has been
* setup
* along with a compatible TaskCommunicator, and the launcher knows how to read this meta-info
*
* @param taskCommName the name of the task communicator
* @return meta info for the requested task communicator
+ *
*/
Object getTaskCommunicatorMetaInfo(String taskCommName);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java
new file mode 100644
index 0000000..737329a
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+/**
+ * Indicates an error from pluggable Tez Services.
+ */
+public class ServicePluginException extends Exception {
+
+ public ServicePluginException() {
+ }
+
+ public ServicePluginException(String message) {
+ super(message);
+ }
+
+ public ServicePluginException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ServicePluginException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
index de76029..5875bd2 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -101,38 +101,48 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
* Get the currently available resources from this source
*
* @return the resources available at the time of invocation
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract Resource getAvailableResources();
+ public abstract Resource getAvailableResources() throws ServicePluginException;
/**
* Get the total available resources from this source
*
* @return the total available resources from the source
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract Resource getTotalResources();
+ public abstract Resource getTotalResources() throws ServicePluginException;
/**
* Get the number of nodes available from the source
*
* @return the number of nodes
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract int getClusterNodeCount();
+ public abstract int getClusterNodeCount() throws ServicePluginException;
/**
* Indication to a source that a node has been blacklisted, and should not be used for subsequent
* allocations.
*
* @param nodeId te nodeId to be blacklisted
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void blacklistNode(NodeId nodeId);
+ public abstract void blacklistNode(NodeId nodeId) throws ServicePluginException;
/**
* Indication to a source that a node has been un-blacklisted, and can be used from subsequent
* allocations
*
* @param nodeId the nodeId to be unblacklisted
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void unblacklistNode(NodeId nodeId);
+ public abstract void unblacklistNode(NodeId nodeId) throws ServicePluginException;
/**
* A request to the source to allocate resources for a requesting task, with location information
@@ -150,10 +160,12 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
* @param clientCookie a cookie associated with this request. This should be returned back
* via the {@link TaskSchedulerContext#taskAllocated(Object, Object,
* Container)} method when a task is assigned to a resource
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
public abstract void allocateTask(Object task, Resource capability,
String[] hosts, String[] racks, Priority priority,
- Object containerSignature, Object clientCookie);
+ Object containerSignature, Object clientCookie) throws ServicePluginException;
/**
* A request to the source to allocate resources for a requesting task, based on a previously used
@@ -171,11 +183,13 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
* @param clientCookie a cookie associated with this request. This should be returned back
* via the {@link TaskSchedulerContext#taskAllocated(Object, Object,
* Container)} method when a task is assigned to a resource
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
public abstract void allocateTask(Object task, Resource capability,
ContainerId containerId, Priority priority,
Object containerSignature,
- Object clientCookie);
+ Object clientCookie) throws ServicePluginException;
/**
* A request to deallocate a task. This is typically a result of a task completing - with success
@@ -190,38 +204,48 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
* @param endReason the reason for the task failure
* @param diagnostics additional diagnostics information which may be relevant
* @return true if the task was associated with a container, false if the task was not associated
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
* with a container
*/
public abstract boolean deallocateTask(Object task, boolean taskSucceeded,
TaskAttemptEndReason endReason,
- @Nullable String diagnostics);
+ @Nullable String diagnostics) throws ServicePluginException;
/**
* A request to de-allocate a previously allocated container.
*
* @param containerId the containerId to de-allocate
* @return the task which was previously associated with this container, null otherwise
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract Object deallocateContainer(ContainerId containerId);
+ public abstract Object deallocateContainer(ContainerId containerId) throws ServicePluginException;
/**
* Inform the scheduler that it should unregister. This is primarily valid for schedulers which
* require registration (YARN a.t.m)
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void setShouldUnregister();
+ public abstract void setShouldUnregister() throws ServicePluginException;
/**
* Checks with the scheduler whether it has unregistered.
*
* @return true if the scheduler has unregistered. False otherwise.
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract boolean hasUnregistered();
+ public abstract boolean hasUnregistered() throws ServicePluginException;
/**
* Indicates to the scheduler that the currently running dag has completed.
* This can be used to reset dag specific statistics, potentially release resources and prepare
* for a new DAG.
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void dagComplete();
+ public abstract void dagComplete() throws ServicePluginException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/Utils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java
new file mode 100644
index 0000000..959b536
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/Utils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.app.AppContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+/**
+ * Utility class within the tez-dag module
+ */
+public class Utils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+ public static String getContainerLauncherIdentifierString(int launcherIndex, AppContext appContext) {
+ String name;
+ try {
+ name = appContext.getContainerLauncherName(launcherIndex);
+ } catch (Exception e) {
+ LOG.error("Unable to get launcher name for index: " + launcherIndex +
+ ", falling back to reporting the index");
+ return "[" + String.valueOf(launcherIndex) + "]";
+ }
+ return "[" + launcherIndex + ":" + name + "]";
+ }
+
+ public static String getTaskCommIdentifierString(int taskCommIndex, AppContext appContext) {
+ String name;
+ try {
+ name = appContext.getTaskCommunicatorName(taskCommIndex);
+ } catch (Exception e) {
+ LOG.error("Unable to get taskcomm name for index: " + taskCommIndex +
+ ", falling back to reporting the index");
+ return "[" + String.valueOf(taskCommIndex) + "]";
+ }
+ return "[" + taskCommIndex + ":" + name + "]";
+ }
+
+ public static String getTaskSchedulerIdentifierString(int schedulerIndex, AppContext appContext) {
+ String name;
+ try {
+ name = appContext.getTaskSchedulerName(schedulerIndex);
+ } catch (Exception e) {
+ LOG.error("Unable to get scheduler name for index: " + schedulerIndex +
+ ", falling back to reporting the index");
+ return "[" + String.valueOf(schedulerIndex) + "]";
+ }
+ return "[" + schedulerIndex + ":" + name + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 38742de..1b6ad07 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.common.ServicePluginLifecycle;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -107,8 +108,11 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* @param containerId the associated containerId
* @param hostname the hostname on which the container runs
* @param port the port for the service which is running the container
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);
+ public abstract void registerRunningContainer(ContainerId containerId, String hostname,
+ int port) throws ServicePluginException;
/**
* Register the end of a container. This can be caused by preemption, the container completing
@@ -117,9 +121,12 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* @param containerId the associated containerId
* @param endReason the end reason for the container completing
* @param diagnostics diagnostics associated with the container end
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
- @Nullable String diagnostics);
+ @Nullable String diagnostics) throws
+ ServicePluginException;
/**
* Register a task attempt to execute on a container
@@ -133,11 +140,14 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* @param credentialsChanged whether the credentials are different from the original credentials
* associated with this container
* @param priority the priority of the task being executed
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
Map<String, LocalResource> additionalResources,
Credentials credentials,
- boolean credentialsChanged, int priority);
+ boolean credentialsChanged, int priority) throws
+ ServicePluginException;
/**
* Register the completion of a task. This may be a result of preemption, the container dying,
@@ -146,17 +156,22 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* @param taskAttemptID the task attempt which has completed / needs to be completed
* @param endReason the endReason for the task attempt.
* @param diagnostics diagnostics associated with the task end
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
TaskAttemptEndReason endReason,
- @Nullable String diagnostics);
+ @Nullable String diagnostics) throws
+ ServicePluginException;
/**
* Return the address, if any, that the service listens on
*
* @return the address
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract InetSocketAddress getAddress();
+ public abstract InetSocketAddress getAddress() throws ServicePluginException;
/**
* Receive notifications on vertex state changes.
@@ -175,9 +190,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* @param stateUpdate an event indicating the name of the vertex, and it's updated state.
* Additional information may be available for specific events, Look at the
* type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
- * @throws Exception
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception;
+ public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException;
/**
* Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
@@ -187,9 +203,10 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* the next dag being submitted.
*
* @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context.
- *
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract void dagComplete(int dagIdentifier);
+ public abstract void dagComplete(int dagIdentifier) throws ServicePluginException;
/**
* Share meta-information such as host:port information where the Task Communicator may be
@@ -197,6 +214,8 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* Primarily for use by compatible launchers to learn this information.
*
* @return meta info for the task communicator
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
*/
- public abstract Object getMetaInfo();
+ public abstract Object getMetaInfo() throws ServicePluginException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index a2e0dd6..9434256 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -19,6 +19,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
@@ -29,10 +31,13 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("unchecked")
public class ContainerLauncherContextImpl implements ContainerLauncherContext {
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherContextImpl.class);
private final AppContext context;
private final TaskCommunicatorManagerInterface tal;
private final UserPayload initialUserPayload;
@@ -101,7 +106,18 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
@Override
public Object getTaskCommunicatorMetaInfo(String taskCommName) {
int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName);
- return tal.getTaskCommunicator(taskCommId).getMetaInfo();
+ try {
+ return tal.getTaskCommunicator(taskCommId).getMetaInfo();
+ } catch (Exception e) {
+ String msg = "Error in retrieving meta-info from TaskCommunicator"
+ + ", communicatorName=" + context.getTaskCommunicatorName(taskCommId);
+ LOG.error(msg, e);
+ context.getEventHandler().handle(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c0b86a5..609a018 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -63,6 +63,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.client.CallerContext;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
@@ -72,6 +73,8 @@ import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -151,10 +154,6 @@ import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
-import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
-import org.apache.tez.dag.app.dag.impl.TaskImpl;
-import org.apache.tez.dag.app.dag.impl.VertexImpl;
-import org.apache.tez.dag.app.launcher.LocalContainerLauncher;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.ContainerLauncherEventType;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
@@ -671,8 +670,34 @@ public class DAGAppMaster extends AbstractService {
return taskSchedulerManager;
}
+ private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagEvent) {
+ state = DAGAppMasterState.ERROR;
+ if (currentDAG != null) {
+ _updateLoggers(currentDAG, "_post");
+ String errDiagnostics = errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID();
+ LOG.info(errDiagnostics);
+ // Inform the current DAG about the error
+ sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent));
+ } else {
+ LOG.info(errDiagnosticsPrefix + ". AppMaster will exit as no dag is active");
+ // This could be problematic if the scheduler generated the error,
+ // since un-registration may not be possible.
+ // For now - try setting this flag, but call the shutdownHandler irrespective of
+ // how the flag is handled by user code.
+ try {
+ this.taskSchedulerManager.setShouldUnregisterFlag();
+ } catch (Exception e) {
+ // Ignore exception for now
+ LOG.error("Error when trying to set unregister flag for TaskScheduler", e);
+ } finally {
+ shutdownHandler.shutdown();
+ }
+ }
+ }
+
@VisibleForTesting
protected synchronized void handle(DAGAppMasterEvent event) {
+ String errDiagnostics;
switch (event.getType()) {
case SCHEDULING_SERVICE_ERROR:
// Scheduling error - probably an issue with the communication with the RM
@@ -683,22 +708,30 @@ public class DAGAppMaster extends AbstractService {
DAGAppMasterEventSchedulingServiceError schedulingServiceErrorEvent =
(DAGAppMasterEventSchedulingServiceError) event;
state = DAGAppMasterState.ERROR;
- LOG.info("Error in the TaskScheduler. Shutting down.",
- schedulingServiceErrorEvent.getThrowable());
+ errDiagnostics = "Error in the TaskScheduler. Shutting down. ";
+ addDiagnostic(errDiagnostics
+ + "Error=" + ExceptionUtils.getStackTrace(schedulingServiceErrorEvent.getThrowable()));
+ LOG.error(errDiagnostics, schedulingServiceErrorEvent.getThrowable());
shutdownHandler.shutdown();
break;
+ case TASK_COMMUNICATOR_SERVICE_FATAL_ERROR:
+ case CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR:
+ case TASK_SCHEDULER_SERVICE_FATAL_ERROR:
+ // A fatal error from the pluggable services. The AM cannot continue operation, and should
+ // be shutdown. The AM should not be restarted for recovery.
+ DAGAppMasterEventUserServiceFatalError usfe = (DAGAppMasterEventUserServiceFatalError) event;
+ Throwable error = usfe.getError();
+ errDiagnostics = "Service Error: " + usfe.getDiagnosticInfo()
+ + ", eventType=" + event.getType()
+ + ", exception=" + ExceptionUtils.getStackTrace(usfe.getError());
+ LOG.error(errDiagnostics, error);
+ addDiagnostic(errDiagnostics);
+
+ handleInternalError("Service error: " + event.getType(), errDiagnostics);
+ break;
case INTERNAL_ERROR:
- state = DAGAppMasterState.ERROR;
- if(currentDAG != null) {
- _updateLoggers(currentDAG, "_post");
- // notify dag to finish which will send the DAG_FINISHED event
- LOG.info("Internal Error. Notifying dags to finish.");
- sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
- } else {
- LOG.info("Internal Error. Finishing directly as no dag is active.");
- this.taskSchedulerManager.setShouldUnregisterFlag();
- shutdownHandler.shutdown();
- }
+ handleInternalError("DAGAppMaster Internal Error occurred",
+ "DAGAppMaster Internal Error occurred");
break;
case DAG_FINISHED:
DAGAppMasterEventDAGFinished finishEvt =
@@ -756,6 +789,7 @@ public class DAGAppMaster extends AbstractService {
LOG.error("Received a DAG Finished Event with state="
+ finishEvt.getDAGState()
+ ". Error. Shutting down.");
+ addDiagnostic("DAG completed with an ERROR state. Shutting down AM");
state = DAGAppMasterState.ERROR;
this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 6ae6dad..2b7234c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -205,11 +205,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
@Override
public void onStateUpdated(VertexStateUpdate event) {
- try {
- taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
- } catch (Exception e) {
- throw new TezUncheckedException(e);
- }
+ taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
}
private DAG getDag() {
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 92bf3c4..64a964b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -30,9 +30,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.tez.Utils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -48,7 +53,6 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
@@ -81,7 +85,7 @@ public class TaskCommunicatorManager extends AbstractService implements
.getLogger(TaskCommunicatorManager.class);
private final AppContext context;
- private final TaskCommunicator[] taskCommunicators;
+ private final TaskCommunicatorWrapper[] taskCommunicators;
private final TaskCommunicatorContext[] taskCommunicatorContexts;
protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
@@ -106,6 +110,24 @@ public class TaskCommunicatorManager extends AbstractService implements
private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ /**
+ * Only for testing.
+ */
+ public TaskCommunicatorManager(TaskCommunicator taskCommunicator, AppContext appContext,
+ TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+ super(TaskCommunicatorManager.class.getName());
+ this.context = appContext;
+ this.taskHeartbeatHandler = thh;
+ this.containerHeartbeatHandler = chh;
+ taskCommunicators =
+ new TaskCommunicatorWrapper[]{new TaskCommunicatorWrapper(taskCommunicator)};
+ taskCommunicatorContexts = new TaskCommunicatorContext[]{taskCommunicator.getContext()};
+ taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[]{
+ new ServicePluginLifecycleAbstractService(taskCommunicator)};
+ }
+
public TaskCommunicatorManager(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
@@ -116,14 +138,15 @@ public class TaskCommunicatorManager extends AbstractService implements
Preconditions.checkArgument(
taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(),
"TaskCommunicators must be specified");
- this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
+ this.taskCommunicators = new TaskCommunicatorWrapper[taskCommunicatorDescriptors.size()];
this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
- taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
- taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
+ taskCommunicators[i] = new TaskCommunicatorWrapper(createTaskCommunicator(taskCommunicatorDescriptors.get(i), i));
+ taskCommunicatorServiceWrappers[i] =
+ new ServicePluginLifecycleAbstractService(taskCommunicators[i].getTaskCommunicator());
}
// TODO TEZ-2118 Start using taskCommunicator indices properly
}
@@ -269,11 +292,11 @@ public class TaskCommunicatorManager extends AbstractService implements
}
if (taskAttemptEvent != null) {
taskAttemptEvent.setReadErrorReported(readErrorReported);
- context.getEventHandler().handle(taskAttemptEvent);
+ sendEvent(taskAttemptEvent);
}
// route taGeneratedEvents to TaskAttempt
if (!taGeneratedEvents.isEmpty()) {
- context.getEventHandler().handle(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
+ sendEvent(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
}
// route events to TaskAttempt
Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple TaskAttemptFinishedEvent");
@@ -300,14 +323,14 @@ public class TaskCommunicatorManager extends AbstractService implements
sourceMeta.getEventGenerator());
}
TaskAttemptFailedEvent taskFailedEvent =(TaskAttemptFailedEvent) e.getEvent();
- context.getEventHandler().handle(
+ sendEvent(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED,
"Error: " + taskFailedEvent.getDiagnostics(),
errCause));
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
- context.getEventHandler().handle(
+ sendEvent(
new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
break;
default:
@@ -317,7 +340,7 @@ public class TaskCommunicatorManager extends AbstractService implements
}
if (!eventsForVertex.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
- context.getEventHandler().handle(
+ sendEvent(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
@@ -339,8 +362,7 @@ public class TaskCommunicatorManager extends AbstractService implements
}
public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
- context.getEventHandler()
- .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
+ sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
pingContainerHeartbeatHandler(containerId);
}
@@ -351,7 +373,7 @@ public class TaskCommunicatorManager extends AbstractService implements
// TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
// Fix along the same lines as TEZ-2124 by introducing an explict context.
- context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
+ sendEvent(new TaskAttemptEventAttemptKilled(taskAttemptId,
diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
taskAttemptEndReason)));
}
@@ -363,14 +385,25 @@ public class TaskCommunicatorManager extends AbstractService implements
// TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
// Fix along the same lines as TEZ-2124 by introducing an explict context.
- context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
+ sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId,
TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
taskAttemptEndReason)));
}
- public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
- Exception {
- taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+ public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) {
+ try {
+ taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+ } catch (Exception e) {
+ String msg = "Error in TaskCommunicator when handling vertex state update notification"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context)
+ + ", vertexName=" + event.getVertexName()
+ + ", vertexState=" + event.getVertexState();
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
}
@@ -410,9 +443,19 @@ public class TaskCommunicatorManager extends AbstractService implements
// Inform all communicators of the dagCompletion.
for (int i = 0 ; i < taskCommunicators.length ; i++) {
- ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
- taskCommunicators[i].dagComplete(dag.getID().getId());
- ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+ try {
+ ((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteStart(dag);
+ taskCommunicators[i].dagComplete(dag.getID().getId());
+ ((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteEnd();
+ } catch (Exception e) {
+ String msg = "Error in TaskCommunicator when notifying for DAG completion"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(i, context);
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
}
}
@@ -434,8 +477,20 @@ public class TaskCommunicatorManager extends AbstractService implements
"Multiple registrations for containerId: " + containerId);
}
NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
- taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
- nodeId.getPort());
+ try {
+ taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
+ nodeId.getPort());
+ } catch (Exception e) {
+ String msg = "Error in TaskCommunicator when registering running Container"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ + ", containerId=" + containerId
+ + ", nodeId=" + nodeId;
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
}
@Override
@@ -447,7 +502,18 @@ public class TaskCommunicatorManager extends AbstractService implements
if (containerInfo.taskAttemptId != null) {
registeredAttempts.remove(containerInfo.taskAttemptId);
}
- taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
+ try {
+ taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
+ } catch (Exception e) {
+ String msg = "Error in TaskCommunicator when unregistering Container"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ + ", containerId=" + containerId;
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
}
@Override
@@ -475,9 +541,21 @@ public class TaskCommunicatorManager extends AbstractService implements
+ amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ " when already assigned to: " + containerIdFromMap);
}
- taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
- amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
- amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
+ try {
+ taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+ amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
+ amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
+ } catch (Exception e) {
+ String msg = "Error in TaskCommunicator when registering Task Attempt"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ + ", containerId=" + containerId
+ + ", taskId=" + amContainerTask.getTask().getTaskAttemptID();
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
}
@Override
@@ -495,11 +573,23 @@ public class TaskCommunicatorManager extends AbstractService implements
}
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, NULL_CONTAINER_INFO);
- taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
+ try {
+ taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
+ } catch (Exception e) {
+ String msg = "Error in TaskCommunicator when unregistering Task Attempt"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ + ", containerId=" + containerId
+ + ", taskId=" + attemptId;
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
}
@Override
- public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
+ public TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex) {
return taskCommunicators[taskCommIndex];
}
@@ -516,4 +606,9 @@ public class TaskCommunicatorManager extends AbstractService implements
+ ", ContainerId not known for this attempt");
}
}
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ context.getEventHandler().handle(event);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
index 8d060a2..e07b1a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
/**
@@ -42,5 +41,5 @@ public interface TaskCommunicatorManagerInterface {
void dagSubmitted();
- TaskCommunicator getTaskCommunicator(int taskCommIndex);
+ TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
new file mode 100644
index 0000000..4f9780e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+
+public class TaskCommunicatorWrapper {
+
+ private final TaskCommunicator real;
+
+ public TaskCommunicatorWrapper(TaskCommunicator real) {
+ this.real = real;
+ }
+
+
+ public void registerRunningContainer(ContainerId containerId, String hostname, int port) throws
+ Exception {
+ real.registerRunningContainer(containerId, hostname, port);
+ }
+
+ public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+ @Nullable String diagnostics) throws Exception {
+ real.registerContainerEnd(containerId, endReason, diagnostics);
+
+ }
+
+ public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
+ Map<String, LocalResource> additionalResources,
+ Credentials credentials, boolean credentialsChanged,
+ int priority) throws Exception {
+ real.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority);
+ }
+
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+ TaskAttemptEndReason endReason,
+ @Nullable String diagnostics) throws Exception {
+ real.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics);
+ }
+
+ public InetSocketAddress getAddress() throws Exception {
+ return real.getAddress();
+ }
+
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+ real.onVertexStateUpdated(stateUpdate);
+ }
+
+ public void dagComplete(int dagIdentifier) throws Exception {
+ real.dagComplete(dagIdentifier);
+ }
+
+ public Object getMetaInfo() throws Exception {
+ return real.getMetaInfo();
+ }
+
+ public TaskCommunicator getTaskCommunicator() {
+ return real;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 78e95bd..d071e0d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -273,7 +273,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
}
@Override
- public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
// Empty. Not registering, or expecting any updates.
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index 990bdea..bd04fd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -34,10 +34,12 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -110,7 +112,10 @@ public class StateChangeNotifier {
} catch (Exception e) {
// TODO send user code exception - TEZ-2332
LOG.error("Error in state update notification for " + event, e);
- dag.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.INTERNAL_ERROR));
+ dag.getEventHandler().handle(
+ new DAGEventInternalError(dag.getID(),
+ "Internal Error in State Update Notification: "
+ + ExceptionUtils.getStackTrace(e)));
return;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
index 5a102a5..9cf2414 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventType.java
@@ -22,6 +22,9 @@ public enum DAGAppMasterEventType {
INTERNAL_ERROR,
AM_REBOOT,
DAG_FINISHED,
+ TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+ TASK_SCHEDULER_SERVICE_FATAL_ERROR,
SCHEDULING_SERVICE_ERROR,
NEW_DAG_SUBMITTED, // Indicates a new dag being submitted, to notify sub-components
DAG_CLEANUP
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java
new file mode 100644
index 0000000..7bc3bd8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventUserServiceFatalError.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import java.util.EnumSet;
+
+import com.google.common.base.Preconditions;
+
+public class DAGAppMasterEventUserServiceFatalError extends DAGAppMasterEvent implements DiagnosableEvent {
+
+ private final Throwable error;
+ private final String diagnostics;
+
+ public DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType type,
+ String diagnostics, Throwable t) {
+ super(type);
+ Preconditions.checkArgument(
+ EnumSet.of(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+ DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR).contains(type),
+ "Event created with incorrect type: " + type);
+ this.error = t;
+ this.diagnostics = diagnostics;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java
new file mode 100644
index 0000000..724ecbe
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventInternalError.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventInternalError extends DAGEvent implements DiagnosableEvent {
+
+ private final String diagnostics;
+
+ public DAGEventInternalError(TezDAGID dagId, String diagnostics) {
+ super(dagId, DAGEventType.INTERNAL_ERROR);
+ this.diagnostics = diagnostics;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 60f933f..41017ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -43,6 +43,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.LimitExceededException;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
import org.slf4j.Logger;
@@ -2252,13 +2254,21 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private static class InternalErrorTransition implements
SingleArcTransition<DAGImpl, DAGEvent> {
@Override
- public void transition(DAGImpl job, DAGEvent event) {
- LOG.info(job.getID() + " terminating due to internal error");
+ public void transition(DAGImpl dag, DAGEvent event) {
+ String diagnostics = null;
+ if (event instanceof DiagnosableEvent) {
+ DiagnosableEvent errEvent = (DiagnosableEvent) event;
+ diagnostics = errEvent.getDiagnosticInfo();
+ dag.addDiagnostic(diagnostics);
+ }
+
+ LOG.info(dag.getID() + " terminating due to internal error. "
+ + (diagnostics == null? "" : " Error=" + diagnostics));
// terminate all vertices
- job.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR);
- job.setFinishTime();
- job.cancelCommits();
- job.finished(DAGState.ERROR);
+ dag.enactKill(DAGTerminationCause.INTERNAL_ERROR, VertexTerminationCause.INTERNAL_ERROR);
+ dag.setFinishTime();
+ dag.cancelCommits();
+ dag.finished(DAGState.ERROR);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 379e316..388d3c7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -56,8 +58,6 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.CallableEvent;
-import org.apache.tez.dag.app.dag.event.DAGEvent;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
@@ -530,7 +530,9 @@ public class VertexManager {
// state change must be triggered via an event transition
LOG.error("Error after vertex manager callback " + managedVertex.getLogIdentifier(), e);
appContext.getEventHandler().handle(
- (new DAGEvent(managedVertex.getVertexId().getDAGId(), DAGEventType.INTERNAL_ERROR)));
+ (new DAGEventInternalError(managedVertex.getVertexId().getDAGId(),
+ "Error in VertexManager for vertex: " + managedVertex.getLogIdentifier()
+ + ", error=" + ExceptionUtils.getStackTrace(e))));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 9e56f44..98237c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -22,13 +22,17 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.Utils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -49,7 +53,7 @@ public class ContainerLauncherManager extends AbstractService
static final Logger LOG = LoggerFactory.getLogger(TezContainerLauncherImpl.class);
@VisibleForTesting
- final ContainerLauncher containerLaunchers[];
+ final ContainerLauncherWrapper containerLaunchers[];
@VisibleForTesting
final ContainerLauncherContext containerLauncherContexts[];
protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
@@ -59,7 +63,7 @@ public class ContainerLauncherManager extends AbstractService
public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) {
super(ContainerLauncherManager.class.getName());
this.appContext = context;
- containerLaunchers = new ContainerLauncher[] {containerLauncher};
+ containerLaunchers = new ContainerLauncherWrapper[] {new ContainerLauncherWrapper(containerLauncher)};
containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
new ServicePluginLifecycleAbstractService<>(containerLauncher)};
@@ -78,7 +82,7 @@ public class ContainerLauncherManager extends AbstractService
containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
"ContainerLauncherDescriptors must be specified");
containerLauncherContexts = new ContainerLauncherContext[containerLauncherDescriptors.size()];
- containerLaunchers = new ContainerLauncher[containerLauncherDescriptors.size()];
+ containerLaunchers = new ContainerLauncherWrapper[containerLauncherDescriptors.size()];
containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherDescriptors.size()];
@@ -87,9 +91,9 @@ public class ContainerLauncherManager extends AbstractService
ContainerLauncherContext containerLauncherContext =
new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload);
containerLauncherContexts[i] = containerLauncherContext;
- containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
- containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode);
- containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]);
+ containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context,
+ containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode));
+ containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i].getContainerLauncher());
}
}
@@ -197,14 +201,43 @@ public class ContainerLauncherManager extends AbstractService
launchEvent.getContainerToken(), launchEvent.getContainerLaunchContext(),
launchEvent.getContainer(), schedulerName,
taskCommName);
- containerLaunchers[launcherId].launchContainer(launchRequest);
+ try {
+ containerLaunchers[launcherId].launchContainer(launchRequest);
+ } catch (Exception e) {
+ String msg = "Error when launching container"
+ + ", containerLauncher=" + Utils.getContainerLauncherIdentifierString(launcherId, appContext)
+ + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+ + ", taskCommunicator=" + Utils.getTaskCommIdentifierString(event.getTaskCommId(), appContext);
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
break;
case CONTAINER_STOP_REQUEST:
ContainerStopRequest stopRequest =
new ContainerStopRequest(event.getNodeId(), event.getContainerId(),
event.getContainerToken(), schedulerName, taskCommName);
- containerLaunchers[launcherId].stopContainer(stopRequest);
+ try {
+ containerLaunchers[launcherId].stopContainer(stopRequest);
+ } catch (Exception e) {
+ String msg = "Error when stopping container"
+ + ", containerLauncher=" + Utils.getContainerLauncherIdentifierString(launcherId, appContext)
+ + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+ + ", taskCommunicator=" + Utils.getTaskCommIdentifierString(event.getTaskCommId(), appContext);
+ LOG.error(msg, e);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+ msg, e));
+ }
break;
}
}
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ appContext.getEventHandler().handle(event);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
new file mode 100644
index 0000000..08e287e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+
+public class ContainerLauncherWrapper {
+
+ private final ContainerLauncher real;
+
+ public ContainerLauncherWrapper(ContainerLauncher containerLauncher) {
+ this.real = containerLauncher;
+ }
+
+ public void launchContainer(ContainerLaunchRequest launchRequest) throws Exception {
+ real.launchContainer(launchRequest);
+ }
+
+ public void stopContainer(ContainerStopRequest stopRequest) throws Exception {
+ real.stopContainer(stopRequest);
+ }
+
+ public ContainerLauncher getContainerLauncher() {
+ return real;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index c4ab6e3..b737fda 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -228,7 +228,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
tezChild =
createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
- ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId)).getUmbilical(),
+ ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(taskCommId).getTaskCommunicator()).getUmbilical(),
TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
} catch (InterruptedException e) {
handleLaunchFailed(e, event.getContainerId());