You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/02/17 22:39:44 UTC
[2/2] tez git commit: TEZ-3029. Add an onError method to service
plugin contexts. (sseth)
TEZ-3029. Add an onError method to service plugin contexts. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a812c346
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a812c346
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a812c346
Branch: refs/heads/master
Commit: a812c3462808e73b8a59e1852ff2547dcbafbf84
Parents: fec46aa
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Feb 17 13:39:11 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Feb 17 13:39:11 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../api/ContainerLauncherContext.java | 12 +-
.../apache/tez/serviceplugins/api/DagInfo.java | 30 +++
.../api/ServicePluginContextBase.java | 49 ++++
.../serviceplugins/api/ServicePluginError.java | 48 ++++
.../api/ServicePluginErrorDefaults.java | 76 ++++++
.../api/TaskSchedulerContext.java | 19 +-
tez-dag/src/main/java/org/apache/tez/Utils.java | 33 +++
.../tez/dag/api/client/DAGClientHandler.java | 5 +-
.../dag/app/ContainerLauncherContextImpl.java | 27 ++-
.../org/apache/tez/dag/app/DAGAppMaster.java | 24 +-
.../dag/app/TaskCommunicatorContextImpl.java | 17 ++
.../tez/dag/app/TaskCommunicatorManager.java | 26 ++
.../app/TaskCommunicatorManagerInterface.java | 4 +
.../java/org/apache/tez/dag/app/dag/DAG.java | 3 +-
.../tez/dag/app/dag/DAGTerminationCause.java | 3 +
.../tez/dag/app/dag/VertexTerminationCause.java | 2 +-
...DAGAppMasterEventSchedulingServiceError.java | 15 +-
.../dag/app/dag/event/DAGEventTerminateDag.java | 38 +++
.../tez/dag/app/dag/event/DAGEventType.java | 4 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 82 ++++---
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +-
.../app/launcher/ContainerLauncherManager.java | 59 ++++-
.../dag/app/rm/TaskSchedulerContextImpl.java | 22 +-
.../app/rm/TaskSchedulerContextImplWrapper.java | 33 ++-
.../tez/dag/app/rm/TaskSchedulerManager.java | 36 ++-
.../dag/app/rm/YarnTaskSchedulerService.java | 6 +-
.../app/rm/YarnTaskSchedulerServiceError.java | 33 +++
.../api/TaskCommunicatorContext.java | 16 +-
.../dag/api/client/TestDAGClientHandler.java | 4 +-
.../apache/tez/dag/app/MockDAGAppMaster.java | 6 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 3 +-
.../dag/app/TestTaskCommunicatorManager.java | 136 ++++++++++-
.../apache/tez/dag/app/dag/impl/TestCommit.java | 87 +++++--
.../tez/dag/app/dag/impl/TestDAGImpl.java | 84 +++++--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 13 +-
.../launcher/TestContainerLauncherManager.java | 101 +++++++-
.../tez/dag/app/rm/TestTaskScheduler.java | 19 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 15 +-
.../dag/app/rm/TestTaskSchedulerManager.java | 161 ++++++++++++
.../tez/dag/helpers/DagInfoImplForTest.java | 38 +++
.../tez/dag/app/ErrorPluginConfiguration.java | 134 ++++++++++
...zTestServiceContainerLauncherWithErrors.java | 17 +-
...stServiceTaskSchedulerServiceWithErrors.java | 23 +-
...ezTestServiceTaskCommunicatorWithErrors.java | 22 +-
.../tests/TestExternalTezServicesErrors.java | 243 +++++++++++++++----
46 files changed, 1584 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e2f77f6..af643dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
Release 0.8.3: Unreleased
INCOMPATIBLE CHANGES
+ TEZ-3029. Add an onError method to service plugin contexts.
ALL CHANGES:
TEZ-3103. Shuffle can hang when memory to memory merging enabled
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
index 70a3498..ed1d58f 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -14,15 +14,15 @@
package org.apache.tez.serviceplugins.api;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.UserPayload;
@InterfaceAudience.Public
@InterfaceStability.Unstable
-public interface ContainerLauncherContext {
+public interface ContainerLauncherContext extends ServicePluginContextBase {
// TODO TEZ-2003 (post) TEZ-2664 Tez abstraction for ContainerId, NodeId, other YARN constructs
@@ -77,13 +77,6 @@ public interface ContainerLauncherContext {
// Lookup APIs
/**
- * Get the UserPayload that was configured while setting up the launcher
- *
- * @return the initially configured user payload
- */
- UserPayload getInitialUserPayload();
-
- /**
* Get the number of nodes being handled by the specified source
*
* @param sourceName the relevant source name
@@ -108,4 +101,5 @@ public interface ContainerLauncherContext {
*
*/
Object getTaskCommunicatorMetaInfo(String taskCommName);
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
new file mode 100644
index 0000000..ef73343
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/DagInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+public interface DagInfo {
+
+ /**
+ * The index of the current dag
+ * @return a numerical identifier for the DAG. This is unique within the currently running application.
+ */
+ int getIndex();
+
+ /**
+ * Get the name of the dag
+ * @return the name of the dag
+ */
+ String getName();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java
new file mode 100644
index 0000000..90a51b2
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginContextBase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.tez.dag.api.UserPayload;
+
+/**
+ * Base interface for ServicePluginContexts
+ */
+public interface ServicePluginContextBase {
+
+ /**
+ * Get the UserPayload that was configured while setting up the launcher
+ *
+ * @return the initially configured user payload
+ */
+ UserPayload getInitialUserPayload();
+
+ /**
+ * Get information on the currently executing dag
+ * @return info on the currently running dag, or null if no dag is executing
+ */
+ @Nullable
+ DagInfo getCurrentDagInfo();
+
+ /**
+ * Report an error from the service. This results in the specific DAG being killed.
+ *
+ * @param servicePluginError the error category
+ * @param message A diagnostic message associated with this error
+ * @param dagInfo the affected dag
+ */
+ void reportError(@Nonnull ServicePluginError servicePluginError, String message, DagInfo dagInfo);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java
new file mode 100644
index 0000000..932c0fa
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginError.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+/**
+ * Represents errors from a ServicePlugin. The default implementation {@link ServicePluginErrorDefaults}
+ * lists a basic set of errors.
+ * This can be extended by implementing this interface, if the default set is not adequate
+ */
+public interface ServicePluginError {
+
+ enum ErrorType {
+ TEMPORARY, PERMANENT,
+ }
+
+ /**
+ * Get the enum representation
+ *
+ * @return an enum representation of the ServicePluginError
+ */
+ Enum getEnum();
+
+ /**
+ * The type of the error
+ *
+ * @return the type of the error
+ */
+ ErrorType getErrorType();
+
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java
new file mode 100644
index 0000000..83a85b5
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginErrorDefaults.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A default set of errors from ServicePlugins
+ *
+ * Errors are marked as fatal or non-fatal for the Application.
+ * Fatal errors cause the AM to go down.
+ *
+ */
+@InterfaceAudience.Public
+public enum ServicePluginErrorDefaults implements ServicePluginError {
+ /**
+ * Indicates that the service is currently unavailable.
+ * This is a temporary error.
+ */
+ SERVICE_UNAVAILABLE(ErrorType.TEMPORARY),
+
+ /** Indicates that the service is in an inconsistent state.
+ * This is a fatal error.
+ */
+ INCONSISTENT_STATE(ErrorType.PERMANENT),
+
+ /**
+ * Other temporary error,
+ */
+ OTHER(ErrorType.TEMPORARY),
+
+ /**
+ * Other fatal error.
+ */
+ OTHER_FATAL(ErrorType.PERMANENT);
+
+ private ErrorType errorType;
+
+ ServicePluginErrorDefaults(ErrorType errorType) {
+ this.errorType = errorType;
+ }
+
+ @Override
+ public Enum getEnum() {
+ return this;
+ }
+
+ @Override
+ public ErrorType getErrorType() {
+ return errorType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
index a24061f..d30ada3 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.dag.api.UserPayload;
/**
* Context for a {@link TaskScheduler}
@@ -42,7 +41,7 @@ import org.apache.tez.dag.api.UserPayload;
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
-public interface TaskSchedulerContext {
+public interface TaskSchedulerContext extends ServicePluginContextBase {
class AppFinalStatus {
public final FinalApplicationStatus exitStatus;
@@ -136,14 +135,6 @@ public interface TaskSchedulerContext {
);
/**
- * Indicate to the framework that the scheduler has run into an error. This will cause
- * the DAG and application to be killed.
- *
- * @param t the relevant error
- */
- void onError(Throwable t);
-
- /**
* Inform the framework that the scheduler has determined that a previously allocated container
* needs to be preempted
*
@@ -164,13 +155,6 @@ public interface TaskSchedulerContext {
// Getters
/**
- * Get the UserPayload that was configured while setting up the scheduler
- *
- * @return the initially configured user payload
- */
- UserPayload getInitialUserPayload();
-
- /**
* Get the tracking URL for the application. Primarily relevant to YARN
*
* @return the trackingUrl for the app
@@ -234,4 +218,5 @@ public interface TaskSchedulerContext {
* @return the app master state
*/
AMState getAMState();
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/Utils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java
index 959b536..6f03a67 100644
--- a/tez-dag/src/main/java/org/apache/tez/Utils.java
+++ b/tez-dag/src/main/java/org/apache/tez/Utils.java
@@ -15,7 +15,14 @@
package org.apache.tez;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,4 +70,30 @@ public class Utils {
return "[" + schedulerIndex + ":" + name + "]";
}
+ public static void processNonFatalServiceErrorReport(String entityString,
+ ServicePluginError servicePluginError,
+ String diagnostics,
+ DagInfo dagInfo, AppContext appContext,
+ String componentName) {
+ String message = "Error reported by " + componentName + " [" +
+ entityString + "][" +
+ servicePluginError +
+ "] " + (diagnostics == null ? "" : diagnostics);
+ if (dagInfo != null) {
+ DAG dag = appContext.getCurrentDAG();
+ if (dag != null && dag.getID().getId() == dagInfo.getIndex()) {
+ TezDAGID dagId = dag.getID();
+ // Send a kill message only if it is the same dag.
+ LOG.warn(message + ", Failing dag: [" + dagInfo.getName() + ", " + dagId + "]");
+ sendEvent(appContext, new DAGEventTerminateDag(dagId, DAGTerminationCause.SERVICE_PLUGIN_ERROR, message));
+ }
+ } else {
+ LOG.warn("No current dag name provided. Not acting on " + message);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void sendEvent(AppContext appContext, Event<?> event) {
+ appContext.getEventHandler().handle(event);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index 0f674f3..79b9acd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -112,7 +112,7 @@ public class DAGClientHandler {
public void tryKillDAG(String dagIdStr) throws TezException {
DAG dag = getDAG(dagIdStr);
LOG.info("Sending client kill to dag: " + dagIdStr);
- dagAppMaster.tryKillDAG(dag);
+ dagAppMaster.tryKillDAG(dag, "Kill Dag request received from client");
}
public synchronized String submitDAG(DAGPlan dagPlan,
@@ -120,10 +120,11 @@ public class DAGClientHandler {
return dagAppMaster.submitDAGToAppMaster(dagPlan, additionalAmResources);
}
+ // Only to be invoked by the DAGClient.
public synchronized void shutdownAM() throws TezException {
LOG.info("Received message to shutdown AM");
if (dagAppMaster != null) {
- dagAppMaster.shutdownTezAM();
+ dagAppMaster.shutdownTezAM("AM Shutdown request received from client");
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index 9434256..7e68675 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -14,6 +14,8 @@
package org.apache.tez.dag.app;
+import javax.annotation.Nullable;
+
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -21,7 +23,10 @@ import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -39,15 +44,22 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
private static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherContextImpl.class);
private final AppContext context;
+ private final ContainerLauncherManager containerLauncherManager;
private final TaskCommunicatorManagerInterface tal;
private final UserPayload initialUserPayload;
+ private final int containerLauncherIndex;
- public ContainerLauncherContextImpl(AppContext appContext, TaskCommunicatorManagerInterface tal, UserPayload initialUserPayload) {
+ public ContainerLauncherContextImpl(AppContext appContext, ContainerLauncherManager containerLauncherManager,
+ TaskCommunicatorManagerInterface tal,
+ UserPayload initialUserPayload, int containerLauncherIndex) {
Preconditions.checkNotNull(appContext, "AppContext cannot be null");
+ Preconditions.checkNotNull(appContext, "ContainerLauncherManager cannot be null");
Preconditions.checkNotNull(tal, "TaskCommunicator cannot be null");
this.context = appContext;
+ this.containerLauncherManager = containerLauncherManager;
this.tal = tal;
this.initialUserPayload = initialUserPayload;
+ this.containerLauncherIndex = containerLauncherIndex;
}
@Override
@@ -103,6 +115,12 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
return context.getApplicationAttemptId();
}
+ @Nullable
+ @Override
+ public DagInfo getCurrentDagInfo() {
+ return context.getCurrentDAG();
+ }
+
@Override
public Object getTaskCommunicatorMetaInfo(String taskCommName) {
int taskCommId = context.getTaskCommunicatorIdentifier(taskCommName);
@@ -120,4 +138,11 @@ public class ContainerLauncherContextImpl implements ContainerLauncherContext {
return null;
}
+ @Override
+ public void reportError(ServicePluginError servicePluginError, String message, DagInfo dagInfo) {
+ Preconditions.checkNotNull(servicePluginError, "ServiceError must be specified");
+ containerLauncherManager.reportError(containerLauncherIndex, servicePluginError, message, dagInfo);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 579d23f..5ac3800 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -72,9 +72,11 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDagCleanup;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.history.events.DAGRecoveredEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -711,8 +713,8 @@ public class DAGAppMaster extends AbstractService {
state = DAGAppMasterState.ERROR;
errDiagnostics = "Error in the TaskScheduler. Shutting down. ";
addDiagnostic(errDiagnostics
- + "Error=" + ExceptionUtils.getStackTrace(schedulingServiceErrorEvent.getThrowable()));
- LOG.error(errDiagnostics, schedulingServiceErrorEvent.getThrowable());
+ + "Error=" + schedulingServiceErrorEvent.getDiagnosticInfo());
+ LOG.error(errDiagnostics);
shutdownHandler.shutdown();
break;
case TASK_COMMUNICATOR_SERVICE_FATAL_ERROR:
@@ -724,7 +726,7 @@ public class DAGAppMaster extends AbstractService {
Throwable error = usfe.getError();
errDiagnostics = "Service Error: " + usfe.getDiagnosticInfo()
+ ", eventType=" + event.getType()
- + ", exception=" + ExceptionUtils.getStackTrace(usfe.getError());
+ + ", exception=" + (usfe.getError() == null ? "None" : ExceptionUtils.getStackTrace(usfe.getError()));
LOG.error(errDiagnostics, error);
addDiagnostic(errDiagnostics);
@@ -1291,16 +1293,16 @@ public class DAGAppMaster extends AbstractService {
+ oldState + " new state: " + state);
}
- public void shutdownTezAM() throws TezException {
+ public void shutdownTezAM(String dagKillmessage) throws TezException {
sessionStopped.set(true);
synchronized (this) {
this.taskSchedulerManager.setShouldUnregisterFlag();
if (currentDAG != null
&& !currentDAG.isComplete()) {
- //send a DAG_KILL message
+ //send a DAG_TERMINATE message
LOG.info("Sending a kill event to the current DAG"
+ ", dagId=" + currentDAG.getID());
- tryKillDAG(currentDAG);
+ tryKillDAG(currentDAG, dagKillmessage);
} else {
LOG.info("No current running DAG, shutting down the AM");
if (isSession && !state.equals(DAGAppMasterState.ERROR)) {
@@ -1376,13 +1378,13 @@ public class DAGAppMaster extends AbstractService {
}
@SuppressWarnings("unchecked")
- public void tryKillDAG(DAG dag) throws TezException {
+ public void tryKillDAG(DAG dag, String message) throws TezException {
try {
logDAGKillRequestEvent(dag.getID(), false);
} catch (IOException e) {
throw new TezException(e);
}
- dispatcher.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
+ dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, message));
}
private Map<String, LocalResource> getAdditionalLocalResourceDiff(
@@ -2235,10 +2237,10 @@ public class DAGAppMaster extends AbstractService {
if (currentTime < (lastDAGCompletionTime + sessionTimeoutInterval)) {
return;
}
- LOG.info("Session timed out"
+ String message = "Session timed out"
+ ", lastDAGCompletionTime=" + lastDAGCompletionTime + " ms"
- + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms");
- shutdownTezAM();
+ + ", sessionTimeoutInterval=" + sessionTimeoutInterval + " ms";
+ shutdownTezAM(message);
}
public boolean isSession() {
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 7f88be2..a922f38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -14,6 +14,7 @@
package org.apache.tez.dag.app;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Set;
@@ -28,6 +29,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
@@ -143,6 +146,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
this);
}
+ @SuppressWarnings("deprecation")
@Override
public String getCurrentDagName() {
return getDag().getName();
@@ -153,11 +157,18 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
return context.getApplicationID().toString();
}
+ @SuppressWarnings("deprecation")
@Override
public int getCurrentDagIdenitifer() {
return getDag().getID().getId();
}
+ @Nullable
+ @Override
+ public DagInfo getCurrentDagInfo() {
+ return getDag();
+ }
+
@Override
public Iterable<String> getInputVertexNames(String vertexName) {
Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
@@ -203,6 +214,12 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
}
@Override
+ public void reportError(@Nonnull ServicePluginError servicePluginError, String message, DagInfo dagInfo) {
+ Preconditions.checkNotNull(servicePluginError, "ServicePluginError must be set");
+ taskCommunicatorManager.reportError(taskCommunicatorIndex, servicePluginError, message, dagInfo);
+ }
+
+ @Override
public void onStateUpdated(VertexStateUpdate event) {
taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index a196114..403e1a1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -33,6 +33,8 @@ import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tez.Utils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
@@ -593,6 +595,30 @@ public class TaskCommunicatorManager extends AbstractService implements
return taskCommunicators[taskCommIndex];
}
+ @Override
+ public void reportError(int taskCommIndex, ServicePluginError servicePluginError,
+ String diagnostics,
+ DagInfo dagInfo) {
+ if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
+ String msg = "Fatal Error reported by TaskCommunicator"
+ + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context)
+ + ", servicePluginError=" + servicePluginError
+ + ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
+ LOG.error(msg + ", Diagnostics=" + diagnostics);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
+ msg, null));
+ } else {
+ Utils
+ .processNonFatalServiceErrorReport(
+ Utils.getTaskCommIdentifierString(taskCommIndex, context), servicePluginError,
+ diagnostics,
+ dagInfo, context,
+ "TaskCommunicator");
+ }
+ }
+
private void pingContainerHeartbeatHandler(ContainerId containerId) {
containerHeartbeatHandler.pinged(containerId);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
index e07b1a0..e0f9852 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -20,6 +20,8 @@ package org.apache.tez.dag.app;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -42,4 +44,6 @@ public interface TaskCommunicatorManagerInterface {
void dagSubmitted();
TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex);
+
+ void reportError(int taskCommIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagName);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index a01c623..dd96ab2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -36,11 +36,12 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.serviceplugins.api.DagInfo;
/**
* Main interface to interact with the job.
*/
-public interface DAG {
+public interface DAG extends DagInfo {
TezDAGID getID();
Map<String, LocalResource> getLocalResources();
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index b6be395..b73cbe6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -26,6 +26,9 @@ public enum DAGTerminationCause {
/** DAG was directly killed. */
DAG_KILL(DAGState.KILLED),
+
+ /** A service plugin indicated an error */
+ SERVICE_PLUGIN_ERROR(DAGState.FAILED),
/** A vertex failed. */
VERTEX_FAILURE(DAGState.FAILED),
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
index 816f85a..49be74d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexTerminationCause.java
@@ -24,7 +24,7 @@ package org.apache.tez.dag.app.dag;
public enum VertexTerminationCause {
/** DAG was killed */
- DAG_KILL(VertexState.KILLED),
+ DAG_TERMINATED(VertexState.KILLED),
/** Other vertex failed causing DAG to fail thus killing this vertex */
OTHER_VERTEX_FAILURE(VertexState.KILLED),
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
index 16625df..cf49d20 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGAppMasterEventSchedulingServiceError.java
@@ -18,17 +18,18 @@
package org.apache.tez.dag.app.dag.event;
-public class DAGAppMasterEventSchedulingServiceError extends DAGAppMasterEvent {
+public class DAGAppMasterEventSchedulingServiceError extends DAGAppMasterEvent
+ implements DiagnosableEvent {
- private final Throwable throwable;
+ private final String diagnostics;
- public DAGAppMasterEventSchedulingServiceError(Throwable t) {
+ public DAGAppMasterEventSchedulingServiceError(String diagnostics) {
super(DAGAppMasterEventType.SCHEDULING_SERVICE_ERROR);
- this.throwable = t;
+ this.diagnostics = diagnostics;
}
- public Throwable getThrowable() {
- return throwable;
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java
new file mode 100644
index 0000000..1286e11
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventTerminateDag.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.records.TezDAGID;
+
+public class DAGEventTerminateDag extends DAGEvent implements DiagnosableEvent {
+ private final String diagMessage;
+ private final DAGTerminationCause terminationCause;
+
+ public DAGEventTerminateDag(TezDAGID dagId, DAGTerminationCause terminationCause, String message) {
+ super(dagId, DAGEventType.DAG_TERMINATE);
+ this.diagMessage = message;
+ this.terminationCause = terminationCause;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagMessage;
+ }
+
+ public DAGTerminationCause getTerminationCause() {
+ return terminationCause;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
index ea6a3cc..bf3b30a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventType.java
@@ -23,8 +23,8 @@ package org.apache.tez.dag.app.dag.event;
*/
public enum DAGEventType {
- //Producer:Client
- DAG_KILL,
+ //Producer: ServicePluginManagers , Client (KILL)
+ DAG_TERMINATE,
//Producer:AM
DAG_INIT,
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 88dfe27..a6c6c02 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -43,7 +43,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.LimitExceededException;
-import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
@@ -253,8 +253,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
EnumSet.of(DAGState.INITED, DAGState.FAILED),
DAGEventType.DAG_INIT,
new InitTransition())
- .addTransition(DAGState.NEW, DAGState.KILLED,
- DAGEventType.DAG_KILL,
+ .addTransition(DAGState.NEW, EnumSet.of(DAGState.KILLED, DAGState.FAILED),
+ DAGEventType.DAG_TERMINATE,
new KillNewJobTransition())
.addTransition(DAGState.NEW, DAGState.ERROR,
DAGEventType.INTERNAL_ERROR,
@@ -269,8 +269,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
.addTransition(DAGState.INITED, DAGState.RUNNING,
DAGEventType.DAG_START,
new StartTransition())
- .addTransition(DAGState.INITED, DAGState.KILLED,
- DAGEventType.DAG_KILL,
+ .addTransition(DAGState.INITED, EnumSet.of(DAGState.KILLED, DAGState.FAILED),
+ DAGEventType.DAG_TERMINATE,
new KillInitedJobTransition())
.addTransition(DAGState.INITED, DAGState.ERROR,
DAGEventType.INTERNAL_ERROR,
@@ -287,7 +287,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGEventType.DAG_VERTEX_RERUNNING,
new VertexReRunningTransition())
.addTransition(DAGState.RUNNING, DAGState.TERMINATING,
- DAGEventType.DAG_KILL, new DAGKilledTransition())
+ DAGEventType.DAG_TERMINATE, new DAGKilledTransition())
.addTransition(DAGState.RUNNING, DAGState.RUNNING,
DAGEventType.DAG_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
@@ -311,7 +311,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGEventType.DAG_COMMIT_COMPLETED,
COMMIT_COMPLETED_TRANSITION)
.addTransition(DAGState.COMMITTING, DAGState.TERMINATING,
- DAGEventType.DAG_KILL,
+ DAGEventType.DAG_TERMINATE,
new DAGKilledWhileCommittingTransition())
.addTransition(
DAGState.COMMITTING,
@@ -354,7 +354,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
// Ignore-able events
.addTransition(DAGState.TERMINATING, DAGState.TERMINATING,
- EnumSet.of(DAGEventType.DAG_KILL,
+ EnumSet.of(DAGEventType.DAG_TERMINATE,
DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE))
@@ -370,7 +370,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(DAGState.SUCCEEDED, DAGState.SUCCEEDED,
- EnumSet.of(DAGEventType.DAG_KILL,
+ EnumSet.of(DAGEventType.DAG_TERMINATE,
DAGEventType.DAG_SCHEDULER_UPDATE,
DAGEventType.DAG_VERTEX_COMPLETED))
@@ -386,7 +386,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(DAGState.FAILED, DAGState.FAILED,
- EnumSet.of(DAGEventType.DAG_KILL,
+ EnumSet.of(DAGEventType.DAG_TERMINATE,
DAGEventType.DAG_START,
DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE,
@@ -404,7 +404,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(DAGState.KILLED, DAGState.KILLED,
- EnumSet.of(DAGEventType.DAG_KILL,
+ EnumSet.of(DAGEventType.DAG_TERMINATE,
DAGEventType.DAG_START,
DAGEventType.DAG_VERTEX_RERUNNING,
DAGEventType.DAG_SCHEDULER_UPDATE,
@@ -415,7 +415,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGState.ERROR,
DAGState.ERROR,
EnumSet.of(
- DAGEventType.DAG_KILL,
+ DAGEventType.DAG_TERMINATE,
DAGEventType.DAG_INIT,
DAGEventType.DAG_START,
DAGEventType.DAG_VERTEX_COMPLETED,
@@ -1424,6 +1424,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
@Override
+ public int getIndex() {
+ return dagId.getId();
+ }
+
+ @Override
public String getName() {
return dagName;
}
@@ -1836,28 +1841,41 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
}
+ private void addDiagnostics(DiagnosableEvent event) {
+ if (event.getDiagnosticInfo() != null && !event.getDiagnosticInfo().isEmpty()) {
+ addDiagnostic(event.getDiagnosticInfo());
+ }
+ }
+
// Task-start has been moved out of InitTransition, so this arc simply
// hardcodes 0 for both map and reduce finished tasks.
- private static class KillNewJobTransition
- implements SingleArcTransition<DAGImpl, DAGEvent> {
+ private static class KillNewJobTransition implements
+ MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
@Override
- public void transition(DAGImpl dag, DAGEvent dagEvent) {
+ public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+ DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
dag.setFinishTime();
- dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
- dag.finished(DAGState.KILLED);
+ dag.trySetTerminationCause(event.getTerminationCause());
+ dag.addDiagnostic("Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+ "] in NEW state.");
+ dag.addDiagnostics(event);
+ return dag.finished(event.getTerminationCause().getFinishedState());
}
}
- private static class KillInitedJobTransition
- implements SingleArcTransition<DAGImpl, DAGEvent> {
+ private static class KillInitedJobTransition implements
+ MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
@Override
- public void transition(DAGImpl dag, DAGEvent dagEvent) {
- dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
- dag.addDiagnostic("Job received Kill in INITED state.");
- dag.finished(DAGState.KILLED);
+ public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
+ DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
+ dag.trySetTerminationCause(event.getTerminationCause());
+ dag.addDiagnostic("Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+ "] in INITED state.");
+ dag.addDiagnostics(event);
+ return dag.finished(event.getTerminationCause().getFinishedState());
}
}
@@ -1865,11 +1883,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private static class DAGKilledTransition
implements SingleArcTransition<DAGImpl, DAGEvent> {
@Override
- public void transition(DAGImpl job, DAGEvent event) {
- String msg = "Job received Kill while in RUNNING state.";
+ public void transition(DAGImpl job, DAGEvent dagEvent) {
+ DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
+ String msg = "Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+ "] in RUNNING state.";
LOG.info(msg);
job.addDiagnostic(msg);
- job.enactKill(DAGTerminationCause.DAG_KILL, VertexTerminationCause.DAG_KILL);
+ job.addDiagnostics(event);
+ job.enactKill(event.getTerminationCause(), VertexTerminationCause.DAG_TERMINATED);
// Commit may happen when dag is still in RUNNING (vertex group commit)
job.cancelCommits();
// TODO Metrics
@@ -1883,12 +1904,15 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
implements SingleArcTransition<DAGImpl, DAGEvent> {
@Override
- public void transition(DAGImpl dag, DAGEvent event) {
- String diag = "DAG received Kill while in COMMITTING state.";
+ public void transition(DAGImpl dag, DAGEvent dagEvent) {
+ DAGEventTerminateDag event = (DAGEventTerminateDag) dagEvent;
+ String diag = "Dag received [" + event.getType() + ", " + event.getTerminationCause() +
+ "] in COMMITTING state.";
LOG.info(diag);
dag.addDiagnostic(diag);
+ dag.addDiagnostics(event);
dag.cancelCommits();
- dag.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+ dag.trySetTerminationCause(event.getTerminationCause());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 065974e..c8f217b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -3206,7 +3206,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
VertexEventTermination vet = (VertexEventTermination) event;
VertexTerminationCause trigger = vet.getTerminationCause();
switch(trigger){
- case DAG_KILL : vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break;
+ case DAG_TERMINATED: vertex.tryEnactKill(trigger, TaskTerminationCause.DAG_KILL); break;
case OWN_TASK_FAILURE: vertex.tryEnactKill(trigger, TaskTerminationCause.OTHER_TASK_FAILURE); break;
case ROOT_INPUT_INIT_FAILURE:
case COMMIT_FAILURE:
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 98237c1..250afd8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -44,6 +44,8 @@ import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.ContainerLauncherEvent;
import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,16 +60,9 @@ public class ContainerLauncherManager extends AbstractService
final ContainerLauncherContext containerLauncherContexts[];
protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
private final AppContext appContext;
+ private final boolean isIncompleteCtor;
+
- @VisibleForTesting
- public ContainerLauncherManager(ContainerLauncher containerLauncher, AppContext context) {
- super(ContainerLauncherManager.class.getName());
- this.appContext = context;
- containerLaunchers = new ContainerLauncherWrapper[] {new ContainerLauncherWrapper(containerLauncher)};
- containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
- containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
- new ServicePluginLifecycleAbstractService<>(containerLauncher)};
- }
// Accepting conf to setup final parameters, if required.
public ContainerLauncherManager(AppContext context,
@@ -77,6 +72,7 @@ public class ContainerLauncherManager extends AbstractService
boolean isPureLocalMode) throws TezException {
super(ContainerLauncherManager.class.getName());
+ this.isIncompleteCtor = false;
this.appContext = context;
Preconditions.checkArgument(
containerLauncherDescriptors != null && !containerLauncherDescriptors.isEmpty(),
@@ -89,7 +85,7 @@ public class ContainerLauncherManager extends AbstractService
for (int i = 0; i < containerLauncherDescriptors.size(); i++) {
UserPayload userPayload = containerLauncherDescriptors.get(i).getUserPayload();
ContainerLauncherContext containerLauncherContext =
- new ContainerLauncherContextImpl(context, taskCommunicatorManagerInterface, userPayload);
+ new ContainerLauncherContextImpl(context, this, taskCommunicatorManagerInterface, userPayload, i);
containerLauncherContexts[i] = containerLauncherContext;
containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context,
containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode));
@@ -98,6 +94,25 @@ public class ContainerLauncherManager extends AbstractService
}
@VisibleForTesting
+ public ContainerLauncherManager(AppContext context) {
+ super(ContainerLauncherManager.class.getName());
+ this.isIncompleteCtor = true;
+ this.appContext = context;
+ containerLaunchers = new ContainerLauncherWrapper[1];
+ containerLauncherContexts = new ContainerLauncherContext[1];
+ containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[1];
+ }
+
+ // To be used with the constructor which accepts the AppContext only, and is for testing.
+ @VisibleForTesting
+ public void setContainerLauncher(ContainerLauncher containerLauncher) {
+ Preconditions.checkState(isIncompleteCtor == true, "Can only be used with the Test constructor");
+ containerLaunchers[0] = new ContainerLauncherWrapper(containerLauncher);
+ containerLauncherContexts[0] = containerLauncher.getContext();
+ containerLauncherServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(containerLauncher);
+ }
+
+ @VisibleForTesting
ContainerLauncher createContainerLauncher(
NamedEntityDescriptor containerLauncherDescriptor,
AppContext context,
@@ -236,6 +251,30 @@ public class ContainerLauncherManager extends AbstractService
}
}
+ public void reportError(int containerLauncherIndex, ServicePluginError servicePluginError,
+ String diagnostics,
+ DagInfo dagInfo) {
+ if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
+ String msg = "Fatal Error reported by ContainerLauncher"
+ + ", containerLauncher=" +
+ Utils.getContainerLauncherIdentifierString(containerLauncherIndex, appContext)
+ + ", servicePluginError=" + servicePluginError
+ + ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
+ LOG.error(msg);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR,
+ msg, null));
+ } else {
+ Utils
+ .processNonFatalServiceErrorReport(
+ Utils.getContainerLauncherIdentifierString(containerLauncherIndex, appContext),
+ servicePluginError,
+ diagnostics, dagInfo,
+ appContext, "ContainerLauncher");
+ }
+ }
+
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
appContext.getEventHandler().handle(event);
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
index 37aa96b..fb4198b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -14,10 +14,12 @@
package org.apache.tez.dag.app.rm;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -29,6 +31,8 @@ import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
public class TaskSchedulerContextImpl implements TaskSchedulerContext {
@@ -94,11 +98,6 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
}
@Override
- public void onError(Throwable t) {
- taskSchedulerManager.onError(schedulerId, t);
- }
-
- @Override
public float getProgress() {
return taskSchedulerManager.getProgress(schedulerId);
}
@@ -139,6 +138,12 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
return appContext.getApplicationAttemptId();
}
+ @Nullable
+ @Override
+ public DagInfo getCurrentDagInfo() {
+ return appContext.getCurrentDAG();
+ }
+
@Override
public String getAppHostName() {
return appHostName;
@@ -175,4 +180,11 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
throw new TezUncheckedException("Unexpected state " + appContext.getAMState());
}
}
+
+ @Override
+ public void reportError(ServicePluginError servicePluginError, String diagnostics,
+ DagInfo dagInfo) {
+ Preconditions.checkNotNull(servicePluginError, "ServicePluginError must be specified");
+ taskSchedulerManager.reportError(schedulerId, servicePluginError, diagnostics, dagInfo);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
index 9e4c8e0..7e1988b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -18,6 +18,8 @@
package org.apache.tez.dag.app.rm;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@@ -37,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
/**
@@ -97,8 +101,9 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
}
@Override
- public void onError(Throwable t) {
- executorService.submit(new OnErrorCallable(real, t));
+ public void reportError(@Nonnull ServicePluginError servicePluginError, String message,
+ DagInfo dagInfo) {
+ executorService.submit(new ReportErrorCallable(real, servicePluginError, message, dagInfo));
}
@Override
@@ -156,6 +161,12 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
return real.getApplicationAttemptId();
}
+ @Nullable
+ @Override
+ public DagInfo getCurrentDagInfo() {
+ return real.getCurrentDagInfo();
+ }
+
@Override
public String getAppHostName() {
return real.getAppHostName();
@@ -175,6 +186,7 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
public AMState getAMState() {
return real.getAMState();
}
+
// End of getters which do not need to go through a thread. Underlying implementation
// does not use locks.
@@ -301,19 +313,24 @@ class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
}
}
- static class OnErrorCallable extends TaskSchedulerContextCallbackBase implements
- Callable<Void> {
+ static class ReportErrorCallable extends TaskSchedulerContextCallbackBase implements Callable<Void> {
- private final Throwable throwable;
+ private final ServicePluginError servicePluginError;
+ private final String message;
+ private final DagInfo dagInfo;
- public OnErrorCallable(TaskSchedulerContext app, Throwable throwable) {
+ public ReportErrorCallable(TaskSchedulerContext app,
+ ServicePluginError servicePluginError, String message,
+ DagInfo dagInfo) {
super(app);
- this.throwable = throwable;
+ this.servicePluginError = servicePluginError;
+ this.message = message;
+ this.dagInfo = dagInfo;
}
@Override
public Void call() throws Exception {
- app.onError(throwable);
+ app.reportError(servicePluginError, message, dagInfo);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index fa9fb81..5317440 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -38,6 +38,8 @@ import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
+import org.apache.tez.serviceplugins.api.DagInfo;
+import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
@@ -844,9 +846,36 @@ public class TaskSchedulerManager extends AbstractService implements
return dagAppMaster.getProgress();
}
- public void onError(int schedulerId, Throwable t) {
- LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
- sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
+ public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError,
+ String diagnostics,
+ DagInfo dagInfo) {
+ if (servicePluginError == YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR) {
+ LOG.info("Error reported by scheduler {} - {}",
+ Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " +
+ diagnostics);
+ if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName()
+ .equals(YarnTaskSchedulerService.class.getName())) {
+ LOG.warn(
+ "Reporting a SchedulerServiceError to the DAGAppMaster since the error" +
+ " was reported by the default YARN Task Scheduler");
+ sendEvent(new DAGAppMasterEventSchedulingServiceError(diagnostics));
+ }
+ } else if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
+ String msg = "Fatal error reported by TaskScheduler"
+ + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext)
+ + ", servicePluginError=" + servicePluginError
+ + ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
+ LOG.error(msg);
+ sendEvent(
+ new DAGAppMasterEventUserServiceFatalError(
+ DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+ msg, null));
+ } else {
+ Utils.processNonFatalServiceErrorReport(
+ Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext),
+ servicePluginError, diagnostics, dagInfo,
+ appContext, "TaskScheduler");
+ }
}
public void dagCompleted() {
@@ -964,5 +993,4 @@ public class TaskSchedulerManager extends AbstractService implements
return historyUrl;
}
-
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 1f05064..c1c363b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -35,7 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.util.StringUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -916,7 +916,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
LOG.error("Got TaskSchedulerError, " + ExceptionUtils.getStackTrace(t));
return;
}
- getContext().onError(t);
+ LOG.error("Got Error from RMClient", t);
+ getContext().reportError(YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR, StringUtils.stringifyException(t),
+ null);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java
new file mode 100644
index 0000000..e8017dd
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerServiceError.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.tez.serviceplugins.api.ServicePluginError;
+
+public enum YarnTaskSchedulerServiceError implements ServicePluginError {
+
+ RESOURCEMANAGER_ERROR;
+
+ @Override
+ public Enum getEnum() {
+ return this;
+ }
+
+ @Override
+ public ErrorType getErrorType() {
+ return ErrorType.PERMANENT;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index c55bdbd..c551b09 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -44,7 +43,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
// Do not make calls into this from within a held lock.
// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public interface TaskCommunicatorContext {
+public interface TaskCommunicatorContext extends ServicePluginContextBase {
// TODO TEZ-2003 (post) TEZ-2666 Enhancements to API
// - Consolidate usage of IDs
@@ -57,12 +56,6 @@ public interface TaskCommunicatorContext {
// - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
// - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
- /**
- * Get the UserPayload that was configured while setting up the task communicator
- *
- * @return the initially configured user payload
- */
- UserPayload getInitialUserPayload();
/**
* Get the application attempt id for the running application. Relevant when running under YARN
@@ -170,11 +163,14 @@ public interface TaskCommunicatorContext {
*/
void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
+ // TODO TEZ-3120 Remove deprecated methods
/**
* Get the name of the currently executing dag
*
* @return the name of the currently executing dag
+ * @deprecated replaced by {@link TaskCommunicatorContext#getCurrentDagInfo}
*/
+ @Deprecated
String getCurrentDagName();
/**
@@ -183,10 +179,13 @@ public interface TaskCommunicatorContext {
*/
String getCurrentAppIdentifier();
+ // TODO TEZ-3120 Remove deprecated methods
/**
* Get the identifier for the currently executing dag.
* @return a numerical identifier for the currently running DAG. This is unique within the currently running application.
+ * @deprecated replaced by {@link TaskCommunicatorContext#getCurrentDagInfo}
*/
+ @Deprecated
int getCurrentDagIdenitifer();
/**
@@ -237,4 +236,5 @@ public interface TaskCommunicatorContext {
* @return time when the current dag started executing
*/
long getDagStartTime();
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
index 80414ba..23a5191 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java
@@ -112,7 +112,7 @@ public class TestDAGClientHandler {
}
dagClientHandler.tryKillDAG("dag_9999_0001_1");
ArgumentCaptor<DAG> eventCaptor = ArgumentCaptor.forClass(DAG.class);
- verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture());
+ verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture(), eq("Kill Dag request received from client"));
assertEquals(1, eventCaptor.getAllValues().size());
assertTrue(eventCaptor.getAllValues().get(0) instanceof DAG);
assertEquals("dag_9999_0001_1", ((DAG)eventCaptor.getAllValues().get(0)).getID().toString());
@@ -125,7 +125,7 @@ public class TestDAGClientHandler {
// shutdown
dagClientHandler.shutdownAM();
- verify(mockDagAM).shutdownTezAM();
+ verify(mockDagAM).shutdownTezAM(eq("AM Shutdown request received from client"));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 08f81fb..b021a36 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -520,10 +520,12 @@ public class MockDAGAppMaster extends DAGAppMaster {
} catch (IOException e) {
throw new TezUncheckedException(e);
}
+ ContainerLauncherManager clManager = new ContainerLauncherManager(getContext());
ContainerLauncherContext containerLauncherContext =
- new ContainerLauncherContextImpl(getContext(), getTaskCommunicatorManager(), userPayload);
+ new ContainerLauncherContextImpl(getContext(), clManager, getTaskCommunicatorManager(), userPayload, 0);
containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
- return new ContainerLauncherManager(containerLauncher, getContext());
+ clManager.setContainerLauncher(containerLauncher);
+ return clManager;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index d5ee67d..74ac51e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -854,7 +854,8 @@ public class TestMockDAGAppMaster {
tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
- mockApp.handle(new DAGAppMasterEventSchedulingServiceError(new RuntimeException("Mock error")));
+ mockApp.handle(new DAGAppMasterEventSchedulingServiceError(
+ org.apache.hadoop.util.StringUtils.stringifyException(new RuntimeException("Mock error"))));
while(!mockApp.getShutdownHandler().wasShutdownInvoked()) {
Thread.sleep(100);
http://git-wip-us.apache.org/repos/asf/tez/blob/a812c346/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index 5323928..c7f97d3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -23,11 +23,13 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
@@ -42,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -49,6 +52,11 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
+import org.apache.tez.dag.helpers.DagInfoImplForTest;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
+import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TezConstants;
@@ -62,6 +70,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -73,7 +82,7 @@ public class TestTaskCommunicatorManager {
@Before
@After
- public void reset() {
+ public void resetForNextTest() {
TaskCommManagerForMultipleCommTest.reset();
}
@@ -233,6 +242,71 @@ public class TestTaskCommunicatorManager {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
+ public void testReportFailureFromTaskCommunicator() throws TezException {
+ String dagName = DAG_NAME;
+ EventHandler eventHandler = mock(EventHandler.class);
+ AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ doReturn("testTaskCommunicator").when(appContext).getTaskCommunicatorName(0);
+ doReturn(eventHandler).when(appContext).getEventHandler();
+
+ DAG dag = mock(DAG.class);
+ TezDAGID dagId = TezDAGID.getInstance(ApplicationId.newInstance(1, 0), DAG_INDEX);
+ doReturn(dagName).when(dag).getName();
+ doReturn(dagId).when(dag).getID();
+ doReturn(dag).when(appContext).getCurrentDAG();
+
+ NamedEntityDescriptor<TaskCommunicatorDescriptor> namedEntityDescriptor =
+ new NamedEntityDescriptor<>("testTaskCommunicator", TaskCommForFailureTest.class.getName());
+ List<NamedEntityDescriptor> list = new LinkedList<>();
+ list.add(namedEntityDescriptor);
+
+
+ TaskCommunicatorManager taskCommManager =
+ new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class),
+ mock(ContainerHeartbeatHandler.class), list);
+ try {
+ taskCommManager.init(new Configuration());
+ taskCommManager.start();
+
+ taskCommManager.registerRunningContainer(mock(ContainerId.class), 0);
+ ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+
+ Event rawEvent = argumentCaptor.getValue();
+ assertTrue(rawEvent instanceof DAGEventTerminateDag);
+ DAGEventTerminateDag killEvent = (DAGEventTerminateDag) rawEvent;
+ assertTrue(killEvent.getDiagnosticInfo().contains("ReportError"));
+ assertTrue(killEvent.getDiagnosticInfo()
+ .contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name()));
+ assertTrue(killEvent.getDiagnosticInfo().contains("[0:testTaskCommunicator]"));
+
+
+ reset(eventHandler);
+
+ taskCommManager.dagComplete(dag);
+
+ argumentCaptor = ArgumentCaptor.forClass(Event.class);
+
+ verify(eventHandler, times(1)).handle(argumentCaptor.capture());
+ rawEvent = argumentCaptor.getValue();
+
+ assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError);
+ DAGAppMasterEventUserServiceFatalError event =
+ (DAGAppMasterEventUserServiceFatalError) rawEvent;
+ assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, event.getType());
+ assertTrue(event.getDiagnosticInfo().contains("ReportedFatalError"));
+ assertTrue(
+ event.getDiagnosticInfo().contains(ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
+ assertTrue(event.getDiagnosticInfo().contains("[0:testTaskCommunicator]"));
+
+ } finally {
+ taskCommManager.stop();
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
public void testTaskCommunicatorUserError() {
TaskCommunicatorContextImpl taskCommContext = mock(TaskCommunicatorContextImpl.class);
TaskCommunicator taskCommunicator = mock(TaskCommunicator.class, new ExceptionAnswer());
@@ -313,7 +387,6 @@ public class TestTaskCommunicatorManager {
}
}
-
static class TaskCommManagerForMultipleCommTest extends TaskCommunicatorManager {
// All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
@@ -460,4 +533,63 @@ public class TestTaskCommunicatorManager {
return null;
}
}
+
+ private static final String DAG_NAME = "dagName";
+ private static final int DAG_INDEX = 1;
+ public static class TaskCommForFailureTest extends TaskCommunicator {
+
+ public TaskCommForFailureTest(
+ TaskCommunicatorContext taskCommunicatorContext) {
+ super(taskCommunicatorContext);
+ }
+
+ @Override
+ public void registerRunningContainer(ContainerId containerId, String hostname, int port) throws
+ ServicePluginException {
+ getContext()
+ .reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, "ReportError", new DagInfoImplForTest(DAG_INDEX, DAG_NAME));
+ }
+
+ @Override
+ public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+ @Nullable String diagnostics) throws ServicePluginException {
+
+ }
+
+ @Override
+ public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
+ Map<String, LocalResource> additionalResources,
+ Credentials credentials, boolean credentialsChanged,
+ int priority) throws ServicePluginException {
+
+ }
+
+ @Override
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+ TaskAttemptEndReason endReason,
+ @Nullable String diagnostics) throws
+ ServicePluginException {
+
+ }
+
+ @Override
+ public InetSocketAddress getAddress() throws ServicePluginException {
+ return null;
+ }
+
+ @Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException {
+
+ }
+
+ @Override
+ public void dagComplete(int dagIdentifier) throws ServicePluginException {
+ getContext().reportError(ServicePluginErrorDefaults.INCONSISTENT_STATE, "ReportedFatalError", null);
+ }
+
+ @Override
+ public Object getMetaInfo() throws ServicePluginException {
+ return null;
+ }
+ }
}