You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:54 UTC
[7/7] tez git commit: TEZ-2708. Rename classes and variables post
TEZ-2003 changes. (sseth)
TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8b278ea8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8b278ea8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8b278ea8
Branch: refs/heads/master
Commit: 8b278ea84f4c64e7144c07fa79b38a6a719541d2
Parents: dc0ee01
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Aug 25 16:48:00 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Aug 25 16:48:00 2015 -0700
----------------------------------------------------------------------
tez-dag/findbugs-exclude.xml | 15 +-
.../java/org/apache/tez/dag/app/AppContext.java | 4 +-
.../dag/app/ContainerLauncherContextImpl.java | 4 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 113 +--
.../apache/tez/dag/app/TaskAttemptListener.java | 46 --
.../dag/app/TaskAttemptListenerImpTezDag.java | 449 -----------
.../dag/app/TaskCommunicatorContextImpl.java | 22 +-
.../tez/dag/app/TaskCommunicatorManager.java | 449 +++++++++++
.../app/TaskCommunicatorManagerInterface.java | 46 ++
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 8 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 10 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 10 +-
.../dag/app/launcher/ContainerLauncherImpl.java | 410 ----------
.../app/launcher/ContainerLauncherManager.java | 217 +++++
.../app/launcher/ContainerLauncherRouter.java | 215 -----
.../app/launcher/LocalContainerLauncher.java | 8 +-
.../app/launcher/TezContainerLauncherImpl.java | 410 ++++++++++
.../tez/dag/app/rm/ContainerLauncherEvent.java | 116 +++
.../dag/app/rm/ContainerLauncherEventType.java | 25 +
.../rm/ContainerLauncherLaunchRequestEvent.java | 79 ++
.../rm/ContainerLauncherStopRequestEvent.java | 34 +
.../tez/dag/app/rm/NMCommunicatorEvent.java | 115 ---
.../tez/dag/app/rm/NMCommunicatorEventType.java | 25 -
.../rm/NMCommunicatorLaunchRequestEvent.java | 78 --
.../app/rm/NMCommunicatorStopRequestEvent.java | 33 -
.../dag/app/rm/TaskSchedulerContextImpl.java | 29 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 784 ------------------
.../tez/dag/app/rm/TaskSchedulerManager.java | 786 +++++++++++++++++++
.../dag/app/rm/container/AMContainerImpl.java | 26 +-
.../dag/app/rm/container/AMContainerMap.java | 6 +-
.../apache/tez/dag/app/MockDAGAppMaster.java | 23 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 2 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 426 ----------
.../app/TestTaskAttemptListenerImplTezDag2.java | 137 ----
.../app/TestTaskCommunicatorContextImpl.java | 2 +-
.../dag/app/TestTaskCommunicatorManager.java | 2 +-
.../dag/app/TestTaskCommunicatorManager1.java | 425 ++++++++++
.../dag/app/TestTaskCommunicatorManager2.java | 136 ++++
.../apache/tez/dag/app/dag/impl/TestCommit.java | 6 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 12 +-
.../tez/dag/app/dag/impl/TestDAGRecovery.java | 4 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 58 +-
.../app/dag/impl/TestTaskAttemptRecovery.java | 4 +-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 16 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 4 +-
.../tez/dag/app/dag/impl/TestVertexImpl.java | 31 +-
.../tez/dag/app/dag/impl/TestVertexImpl2.java | 4 +-
.../dag/app/dag/impl/TestVertexRecovery.java | 10 +-
.../launcher/TestContainerLauncherManager.java | 359 +++++++++
.../launcher/TestContainerLauncherRouter.java | 359 ---------
.../tez/dag/app/rm/TestContainerReuse.java | 310 ++++----
.../app/rm/TestTaskSchedulerEventHandler.java | 707 -----------------
.../dag/app/rm/TestTaskSchedulerHelpers.java | 14 +-
.../dag/app/rm/TestTaskSchedulerManager.java | 708 +++++++++++++++++
.../dag/app/rm/container/TestAMContainer.java | 24 +-
.../app/rm/container/TestAMContainerMap.java | 6 +-
.../tez/dag/app/rm/node/TestAMNodeTracker.java | 38 +-
.../tez/service/impl/ContainerRunnerImpl.java | 23 +-
.../apache/tez/runtime/task/TezTaskRunner.java | 451 -----------
.../tez/runtime/task/TestTaskExecution.java | 362 ---------
.../tez/runtime/task/TestTaskExecution2.java | 2 +-
62 files changed, 4220 insertions(+), 5027 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 9d15035..6db3b7c 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -87,13 +87,13 @@
</Match>
<Match>
- <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerEventHandler"/>
+ <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerManager"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/>
</Match>
<Match>
- <Class name="org.apache.tez.dag.app.launcher.ContainerLauncherRouter" />
- <Method name="handle" params="org.apache.tez.dag.app.rm.NMCommunicatorEvent" returns="void" />
+ <Class name="org.apache.tez.dag.app.launcher.ContainerLauncherManager" />
+ <Method name="handle" params="org.apache.tez.dag.app.rm.ContainerLauncherEvent" returns="void" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
@@ -151,7 +151,7 @@
<Field name="context"/>
<Field name="currentDAG"/>
<Field name="state"/>
- <Field name="taskSchedulerEventHandler"/>
+ <Field name="taskSchedulerManager"/>
<Field name="versionMismatch"/>
<Field name="versionMismatchDiagnostics"/>
<Field name="containers"/>
@@ -165,13 +165,6 @@
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
- <!-- TEZ-1955 -->
- <Match>
- <Class name="org.apache.tez.dag.app.rm.TaskSchedulerEventHandler"/>
- <Field name="taskScheduler"/>
- <Bug pattern="IS2_INCONSISTENT_SYNC"/>
- </Match>
-
<!-- TEZ-1956 -->
<Match>
<Class name="org.apache.tez.dag.app.rm.YarnTaskSchedulerService"/>
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 516fcef..657267b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.node.AMNodeTracker;
import org.apache.tez.common.security.ACLManager;
@@ -88,7 +88,7 @@ public interface AppContext {
AMNodeTracker getNodeTracker();
- TaskSchedulerEventHandler getTaskScheduler();
+ TaskSchedulerManager getTaskScheduler();
boolean isSession();
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index 3a2efc5..20bfd13 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -33,10 +33,10 @@ import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
public class ContainerLauncherContextImpl implements ContainerLauncherContext {
private final AppContext context;
- private final TaskAttemptListener tal;
+ private final TaskCommunicatorManagerInterface tal;
private final UserPayload initialUserPayload;
- public ContainerLauncherContextImpl(AppContext appContext, TaskAttemptListener tal, UserPayload initialUserPayload) {
+ public ContainerLauncherContextImpl(AppContext appContext, TaskCommunicatorManagerInterface tal, UserPayload initialUserPayload) {
this.context = appContext;
this.tal = tal;
this.initialUserPayload = initialUserPayload;
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 84b3095..34e7c2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -40,7 +40,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -149,10 +148,10 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
-import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
+import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
-import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
-import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.ContainerLauncherEventType;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
@@ -236,16 +235,16 @@ public class DAGAppMaster extends AbstractService {
private AppContext context;
private Configuration amConf;
private AsyncDispatcher dispatcher;
- private ContainerLauncherRouter containerLauncherRouter;
+ private ContainerLauncherManager containerLauncherManager;
private ContainerHeartbeatHandler containerHeartbeatHandler;
private TaskHeartbeatHandler taskHeartbeatHandler;
- private TaskAttemptListener taskAttemptListener;
+ private TaskCommunicatorManagerInterface taskCommunicatorManager;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
private Token<JobTokenIdentifier> sessionToken;
private DagEventDispatcher dagEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
- private TaskSchedulerEventHandler taskSchedulerEventHandler;
+ private TaskSchedulerManager taskSchedulerManager;
private WebUIService webUIService;
private HistoryEventHandler historyEventHandler;
private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
@@ -472,13 +471,13 @@ public class DAGAppMaster extends AbstractService {
//service to handle requests to TaskUmbilicalProtocol
- taskAttemptListener = createTaskAttemptListener(context,
+ taskCommunicatorManager = createTaskCommunicatorManager(context,
taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorDescriptors);
- addIfService(taskAttemptListener, true);
+ addIfService(taskCommunicatorManager, true);
containerSignatureMatcher = createContainerSignatureMatcher();
containers = new AMContainerMap(containerHeartbeatHandler,
- taskAttemptListener, containerSignatureMatcher, context);
+ taskCommunicatorManager, containerSignatureMatcher, context);
addIfService(containers, true);
dispatcher.register(AMContainerEventType.class, containers);
@@ -521,29 +520,30 @@ public class DAGAppMaster extends AbstractService {
- this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
+ this.taskSchedulerManager = new TaskSchedulerManager(context,
clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
taskSchedulerDescriptors, isLocal);
- addIfService(taskSchedulerEventHandler, true);
+ addIfService(taskSchedulerManager, true);
if (enableWebUIService()) {
- addIfServiceDependency(taskSchedulerEventHandler, webUIService);
+ addIfServiceDependency(taskSchedulerManager, webUIService);
}
if (isLastAMRetry) {
LOG.info("AM will unregister as this is the last attempt"
+ ", currentAttempt=" + appAttemptID.getAttemptId()
+ ", maxAttempts=" + maxAppAttempts);
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
}
dispatcher.register(AMSchedulerEventType.class,
- taskSchedulerEventHandler);
- addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer);
+ taskSchedulerManager);
+ addIfServiceDependency(taskSchedulerManager, clientRpcServer);
- this.containerLauncherRouter = createContainerLauncherRouter(containerLauncherDescriptors, isLocal);
- addIfService(containerLauncherRouter, true);
- dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter);
+ this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors,
+ isLocal);
+ addIfService(containerLauncherManager, true);
+ dispatcher.register(ContainerLauncherEventType.class, containerLauncherManager);
historyEventHandler = createHistoryEventHandler(context);
addIfService(historyEventHandler, true);
@@ -632,8 +632,8 @@ public class DAGAppMaster extends AbstractService {
}
@VisibleForTesting
- protected TaskSchedulerEventHandler getTaskSchedulerEventHandler() {
- return taskSchedulerEventHandler;
+ protected TaskSchedulerManager getTaskSchedulerManager() {
+ return taskSchedulerManager;
}
@VisibleForTesting
@@ -661,7 +661,7 @@ public class DAGAppMaster extends AbstractService {
sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.INTERNAL_ERROR));
} else {
LOG.info("Internal Error. Finishing directly as no dag is active.");
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
}
break;
@@ -673,7 +673,7 @@ public class DAGAppMaster extends AbstractService {
System.out.println(timeStamp + " Completed Dag: " + finishEvt.getDAGId().toString());
if (!isSession) {
LOG.info("Not a session, AM will unregister as DAG has completed");
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
_updateLoggers(currentDAG, "_post");
setStateOnDAGCompletion();
LOG.info("Shutting down on completion of dag:" +
@@ -722,7 +722,7 @@ public class DAGAppMaster extends AbstractService {
+ finishEvt.getDAGState()
+ ". Error. Shutting down.");
state = DAGAppMasterState.ERROR;
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
break;
}
@@ -741,11 +741,11 @@ public class DAGAppMaster extends AbstractService {
// Leaving the taskSchedulerEventHandler here for now. Doesn't generate new events.
// However, eventually it needs to be moved out.
- this.taskSchedulerEventHandler.dagCompleted();
+ this.taskSchedulerManager.dagCompleted();
state = DAGAppMasterState.IDLE;
} else {
LOG.info("Session shutting down now.");
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
if (this.historyEventHandler.hasRecoveryFailed()) {
state = DAGAppMasterState.FAILED;
} else {
@@ -772,8 +772,8 @@ public class DAGAppMaster extends AbstractService {
DAGAppMasterEventDagCleanup cleanupEvent = (DAGAppMasterEventDagCleanup) event;
LOG.info("Cleaning up DAG: name=" + cleanupEvent.getDag().getName() + ", with id=" +
cleanupEvent.getDag().getID());
- containerLauncherRouter.dagComplete(cleanupEvent.getDag());
- taskAttemptListener.dagComplete(cleanupEvent.getDag());
+ containerLauncherManager.dagComplete(cleanupEvent.getDag());
+ taskCommunicatorManager.dagComplete(cleanupEvent.getDag());
nodes.dagComplete(cleanupEvent.getDag());
containers.dagComplete(cleanupEvent.getDag());
TezTaskAttemptID.clearCache();
@@ -785,9 +785,9 @@ public class DAGAppMaster extends AbstractService {
break;
case NEW_DAG_SUBMITTED:
// Inform sub-components that a new DAG has been submitted.
- taskSchedulerEventHandler.dagSubmitted();
- containerLauncherRouter.dagSubmitted();
- taskAttemptListener.dagSubmitted();
+ taskSchedulerManager.dagSubmitted();
+ containerLauncherManager.dagSubmitted();
+ taskCommunicatorManager.dagSubmitted();
break;
default:
throw new TezUncheckedException(
@@ -917,7 +917,7 @@ public class DAGAppMaster extends AbstractService {
// create single dag
DAGImpl newDag =
new DAGImpl(dagId, amConf, dagPB, dispatcher.getEventHandler(),
- taskAttemptListener, dagCredentials, clock,
+ taskCommunicatorManager, dagCredentials, clock,
appMasterUgi.getShortUserName(),
taskHeartbeatHandler, context);
@@ -1048,13 +1048,13 @@ public class DAGAppMaster extends AbstractService {
}
}
- protected TaskAttemptListener createTaskAttemptListener(AppContext context,
- TaskHeartbeatHandler thh,
- ContainerHeartbeatHandler chh,
- List<NamedEntityDescriptor> entityDescriptors) {
- TaskAttemptListener lis =
- new TaskAttemptListenerImpTezDag(context, thh, chh, entityDescriptors);
- return lis;
+ protected TaskCommunicatorManagerInterface createTaskCommunicatorManager(AppContext context,
+ TaskHeartbeatHandler thh,
+ ContainerHeartbeatHandler chh,
+ List<NamedEntityDescriptor> entityDescriptors) {
+ TaskCommunicatorManagerInterface tcm =
+ new TaskCommunicatorManager(context, thh, chh, entityDescriptors);
+ return tcm;
}
protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
@@ -1074,10 +1074,11 @@ public class DAGAppMaster extends AbstractService {
return chh;
}
- protected ContainerLauncherRouter createContainerLauncherRouter(List<NamedEntityDescriptor> containerLauncherDescriptors,
- boolean isLocal) throws
+ protected ContainerLauncherManager createContainerLauncherManager(
+ List<NamedEntityDescriptor> containerLauncherDescriptors,
+ boolean isLocal) throws
UnknownHostException {
- return new ContainerLauncherRouter(context, taskAttemptListener, workingDirectory,
+ return new ContainerLauncherManager(context, taskCommunicatorManager, workingDirectory,
containerLauncherDescriptors, isLocal);
}
@@ -1101,12 +1102,12 @@ public class DAGAppMaster extends AbstractService {
return dispatcher;
}
- public ContainerLauncherRouter getContainerLauncherRouter() {
- return containerLauncherRouter;
+ public ContainerLauncherManager getContainerLauncherManager() {
+ return containerLauncherManager;
}
- public TaskAttemptListener getTaskAttemptListener() {
- return taskAttemptListener;
+ public TaskCommunicatorManagerInterface getTaskCommunicatorManager() {
+ return taskCommunicatorManager;
}
public ContainerId getAppContainerId() {
@@ -1213,7 +1214,7 @@ public class DAGAppMaster extends AbstractService {
public void shutdownTezAM() throws TezException {
sessionStopped.set(true);
synchronized (this) {
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
if (currentDAG != null
&& !currentDAG.isComplete()) {
//send a DAG_KILL message
@@ -1473,8 +1474,8 @@ public class DAGAppMaster extends AbstractService {
}
@Override
- public TaskSchedulerEventHandler getTaskScheduler() {
- return taskSchedulerEventHandler;
+ public TaskSchedulerManager getTaskScheduler() {
+ return taskSchedulerManager;
}
@Override
@@ -1574,7 +1575,7 @@ public class DAGAppMaster extends AbstractService {
throw new TezUncheckedException(
"Cannot get ApplicationACLs before all services have started");
}
- return taskSchedulerEventHandler.getApplicationAcls();
+ return taskSchedulerManager.getApplicationAcls();
}
@Override
@@ -1805,7 +1806,7 @@ public class DAGAppMaster extends AbstractService {
if (versionMismatch) {
// Short-circuit and return as no DAG should not be run
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return;
}
@@ -1825,7 +1826,7 @@ public class DAGAppMaster extends AbstractService {
LOG.error("Error occurred when trying to recover data from previous attempt."
+ " Shutting down AM", e);
this.state = DAGAppMasterState.ERROR;
- this.taskSchedulerEventHandler.setShouldUnregisterFlag();
+ this.taskSchedulerManager.setShouldUnregisterFlag();
shutdownHandler.shutdown();
return;
}
@@ -1929,7 +1930,7 @@ public class DAGAppMaster extends AbstractService {
private void initiateStop() {
- taskSchedulerEventHandler.initiateStop();
+ taskSchedulerManager.initiateStop();
}
@Override
@@ -1954,8 +1955,8 @@ public class DAGAppMaster extends AbstractService {
LOG.debug("Checking whether tez scratch data dir should be deleted, deleteTezScratchData="
+ deleteTezScratchData);
}
- if (deleteTezScratchData && this.taskSchedulerEventHandler != null
- && this.taskSchedulerEventHandler.hasUnregistered()) {
+ if (deleteTezScratchData && this.taskSchedulerManager != null
+ && this.taskSchedulerManager.hasUnregistered()) {
// Delete tez scratch data dir
if (this.tezSystemStagingDir != null) {
try {
@@ -2208,7 +2209,7 @@ public class DAGAppMaster extends AbstractService {
// Notify TaskScheduler that a SIGTERM has been received so that it
// unregisters quickly with proper status
LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler");
- appMaster.taskSchedulerEventHandler.setSignalled(true);
+ appMaster.taskSchedulerManager.setSignalled(true);
}
if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED,
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
deleted file mode 100644
index 761bdb0..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-/**
- * This class listens for changes to the state of a Task.
- */
-public interface TaskAttemptListener {
-
- void registerRunningContainer(ContainerId containerId, int taskCommId);
-
- void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
-
- void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics);
-
- void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics);
-
- void dagComplete(DAG dag);
-
- void dagSubmitted();
-
- TaskCommunicator getTaskCommunicator(int taskCommIndex);
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
deleted file mode 100644
index 185193f..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.dag.app;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.collections4.ListUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-
-@SuppressWarnings("unchecked")
-@InterfaceAudience.Private
-public class TaskAttemptListenerImpTezDag extends AbstractService implements
- TaskAttemptListener {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(TaskAttemptListenerImpTezDag.class);
-
- private final AppContext context;
- private final TaskCommunicator[] taskCommunicators;
- private final TaskCommunicatorContext[] taskCommunicatorContexts;
- protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
-
- protected final TaskHeartbeatHandler taskHeartbeatHandler;
- protected final ContainerHeartbeatHandler containerHeartbeatHandler;
-
- private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
-
- private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
- new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
- private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
- new ConcurrentHashMap<ContainerId, ContainerInfo>();
-
- // Defined primarily to work around ConcurrentMaps not accepting null values
- private static final class ContainerInfo {
- TezTaskAttemptID taskAttemptId;
- ContainerInfo(TezTaskAttemptID taskAttemptId) {
- this.taskAttemptId = taskAttemptId;
- }
- }
-
- private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);
-
-
- public TaskAttemptListenerImpTezDag(AppContext context,
- TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
- List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
- super(TaskAttemptListenerImpTezDag.class.getName());
- this.context = context;
- this.taskHeartbeatHandler = thh;
- this.containerHeartbeatHandler = chh;
- Preconditions.checkArgument(
- taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(),
- "TaskCommunicators must be specified");
- this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
- this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
- this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
- for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
- UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
- taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
- taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
- taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
- }
- // TODO TEZ-2118 Start using taskCommunicator indices properly
- }
-
- @Override
- public void serviceStart() {
- // TODO Why is init tied to serviceStart
- for (int i = 0 ; i < taskCommunicators.length ; i++) {
- taskCommunicatorServiceWrappers[i].init(getConfig());
- taskCommunicatorServiceWrappers[i].start();
- }
- }
-
- @Override
- public void serviceStop() {
- for (int i = 0 ; i < taskCommunicators.length ; i++) {
- taskCommunicatorServiceWrappers[i].stop();
- }
- }
-
- @VisibleForTesting
- TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
- int taskCommIndex) {
- if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
- return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
- } else if (taskCommDescriptor.getEntityName()
- .equals(TezConstants.getTezUberServicePluginName())) {
- return createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
- } else {
- return createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex],
- taskCommDescriptor);
- }
- }
-
- @VisibleForTesting
- TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
- LOG.info("Using Default Task Communicator");
- return new TezTaskCommunicatorImpl(taskCommunicatorContext);
- }
-
- @VisibleForTesting
- TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
- LOG.info("Using Default Local Task Communicator");
- return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
- }
-
- @VisibleForTesting
- TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
- NamedEntityDescriptor taskCommDescriptor) {
- LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
- taskCommDescriptor.getClassName());
- Class<? extends TaskCommunicator> taskCommClazz =
- (Class<? extends TaskCommunicator>) ReflectionUtils
- .getClazz(taskCommDescriptor.getClassName());
- try {
- Constructor<? extends TaskCommunicator> ctor =
- taskCommClazz.getConstructor(TaskCommunicatorContext.class);
- return ctor.newInstance(taskCommunicatorContext);
- } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
- }
-
- public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
- throws IOException, TezException {
- ContainerId containerId = ConverterUtils.toContainerId(request
- .getContainerIdentifier());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received heartbeat from container"
- + ", request=" + request);
- }
-
- if (!registeredContainers.containsKey(containerId)) {
- LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
- ", asking it to die");
- return RESPONSE_SHOULD_DIE;
- }
-
- // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container
- // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed,
- // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system
- // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events).
- // So - avoiding synchronization.
-
- pingContainerHeartbeatHandler(containerId);
- TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
- TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
- if (taskAttemptID != null) {
- ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
- if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
- // This can happen when a task heartbeats. Meanwhile the container is unregistered.
- // The information will eventually make it through to the plugin via a corresponding unregister.
- // There's a race in that case between the unregister making it through, and this method returning.
- // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
- // so that the plugin can handle the scenario. Alternately augment the response with error codes.
- // Error codes would be better than exceptions.
- LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
- return RESPONSE_SHOULD_DIE;
- }
-
- List<TezEvent> inEvents = request.getEvents();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ping from " + taskAttemptID.toString() +
- " events: " + (inEvents != null ? inEvents.size() : -1));
- }
-
- long currTime = context.getClock().getTime();
- List<TezEvent> otherEvents = new ArrayList<TezEvent>();
- // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
- // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
- // to VertexImpl to ensure the events ordering
- // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
- // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
- for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
- // for now, set the event time on the AM when it is received.
- // this avoids any time disparity between machines.
- tezEvent.setEventReceivedTime(currTime);
- final EventType eventType = tezEvent.getEventType();
- if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
- TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
- (TaskStatusUpdateEvent) tezEvent.getEvent());
- context.getEventHandler().handle(taskAttemptEvent);
- } else {
- otherEvents.add(tezEvent);
- }
- }
- if(!otherEvents.isEmpty()) {
- TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
- context.getEventHandler().handle(
- new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
- }
- taskHeartbeatHandler.pinged(taskAttemptID);
- eventInfo = context
- .getCurrentDAG()
- .getVertex(taskAttemptID.getTaskID().getVertexID())
- .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
- request.getMaxEvents());
- }
- return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
- }
- public void taskAlive(TezTaskAttemptID taskAttemptId) {
- taskHeartbeatHandler.pinged(taskAttemptId);
- }
-
- public void containerAlive(ContainerId containerId) {
- pingContainerHeartbeatHandler(containerId);
- }
-
- public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
- context.getEventHandler()
- .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
- pingContainerHeartbeatHandler(containerId);
- }
-
- public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
- String diagnostics) {
- // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
- // and messages from the scheduler will release the container.
- // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
- // instead of waiting for the unregister to flow through the Container.
- // Fix along the same lines as TEZ-2124 by introducing an explict context.
- context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
- diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
- taskAttemptEndReason)));
- }
-
- public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
- String diagnostics) {
- // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
- // and messages from the scheduler will release the container.
- // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
- // instead of waiting for the unregister to flow through the Container.
- // Fix along the same lines as TEZ-2124 by introducing an explict context.
- context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
- TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
- taskAttemptEndReason)));
- }
-
- public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
- Exception {
- taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
- }
-
-
- /**
- * Child checking whether it can commit.
- * <p/>
- * <br/>
- * Repeatedly polls the ApplicationMaster whether it
- * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
- * centralized commit protocol handling by the JobTracker.
- */
-// @Override
- public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
- LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
- // An attempt is asking if it can commit its output. This can be decided
- // only by the task which is managing the multiple attempts. So redirect the
- // request there.
- taskHeartbeatHandler.progressing(taskAttemptId);
- pingContainerHeartbeatHandler(taskAttemptId);
-
- DAG job = context.getCurrentDAG();
- Task task =
- job.getVertex(taskAttemptId.getTaskID().getVertexID()).
- getTask(taskAttemptId.getTaskID());
- return task.canCommit(taskAttemptId);
- }
-
- // The TaskAttemptListener register / unregister methods in this class are not thread safe.
- // The Tez framework should not invoke these methods from multiple threads.
- @Override
- public void dagComplete(DAG dag) {
- // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures.
- // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die.
- // Container structures remain unchanged - since they could be re-used across restarts.
- // This becomes more relevant when task kills without container kills are allowed.
-
- // TODO TEZ-2336. Send a signal to containers indicating DAG completion.
-
- // Inform all communicators of the dagCompletion.
- for (int i = 0 ; i < taskCommunicators.length ; i++) {
- ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
- taskCommunicators[i].dagComplete(dag.getName());
- ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
- }
-
- }
-
- @Override
- public void dagSubmitted() {
- // Nothing to do right now. Indicates that a new DAG has been submitted and
- // the context has updated information.
- }
-
- @Override
- public void registerRunningContainer(ContainerId containerId, int taskCommId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
- }
- ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO);
- if (oldInfo != null) {
- throw new TezUncheckedException(
- "Multiple registrations for containerId: " + containerId);
- }
- NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
- taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
- nodeId.getPort());
- }
-
- @Override
- public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
- }
- ContainerInfo containerInfo = registeredContainers.remove(containerId);
- if (containerInfo.taskAttemptId != null) {
- registeredAttempts.remove(containerInfo.taskAttemptId);
- }
- taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
- }
-
- @Override
- public void registerTaskAttempt(AMContainerTask amContainerTask,
- ContainerId containerId, int taskCommId) {
- ContainerInfo containerInfo = registeredContainers.get(containerId);
- if (containerInfo == null) {
- throw new TezUncheckedException("Registering task attempt: "
- + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
- }
- if (containerInfo.taskAttemptId != null) {
- throw new TezUncheckedException("Registering task attempt: "
- + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
- + " with existing assignment to: " +
- containerInfo.taskAttemptId);
- }
-
- // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
- registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID()));
-
- ContainerId containerIdFromMap = registeredAttempts.put(
- amContainerTask.getTask().getTaskAttemptID(), containerId);
- if (containerIdFromMap != null) {
- throw new TezUncheckedException("Registering task attempt: "
- + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
- + " when already assigned to: " + containerIdFromMap);
- }
- taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
- amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
- amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
- }
-
- @Override
- public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
- ContainerId containerId = registeredAttempts.remove(attemptId);
- if (containerId == null) {
- LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
- return;
- }
- ContainerInfo containerInfo = registeredContainers.get(containerId);
- if (containerInfo == null) {
- LOG.warn("Unregister task attempt: " + attemptId +
- " from non-registered container: " + containerId);
- return;
- }
- // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
- registeredContainers.put(containerId, NULL_CONTAINER_INFO);
- taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
- }
-
- @Override
- public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
- return taskCommunicators[taskCommIndex];
- }
-
- private void pingContainerHeartbeatHandler(ContainerId containerId) {
- containerHeartbeatHandler.pinged(containerId);
- }
-
- private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
- ContainerId containerId = registeredAttempts.get(taskAttemptId);
- if (containerId != null) {
- containerHeartbeatHandler.pinged(containerId);
- } else {
- LOG.warn("Handling communication from attempt: " + taskAttemptId
- + ", ContainerId not known for this attempt");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 9d57ac3..071b008 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -47,7 +47,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
// TODO TEZ-2003 (post) TEZ-2669 Propagate errors baack to the AM with proper error reporting
private final AppContext context;
- private final TaskAttemptListenerImpTezDag taskAttemptListener;
+ private final TaskCommunicatorManager taskCommunicatorManager;
private final int taskCommunicatorIndex;
private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
@@ -56,11 +56,11 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
private DAG dag;
public TaskCommunicatorContextImpl(AppContext appContext,
- TaskAttemptListenerImpTezDag taskAttemptListener,
+ TaskCommunicatorManager taskCommunicatorManager,
UserPayload userPayload,
int taskCommunicatorIndex) {
this.context = appContext;
- this.taskAttemptListener = taskAttemptListener;
+ this.taskCommunicatorManager = taskCommunicatorManager;
this.userPayload = userPayload;
this.taskCommunicatorIndex = taskCommunicatorIndex;
@@ -86,13 +86,13 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
@Override
public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
- return taskAttemptListener.canCommit(taskAttemptId);
+ return taskCommunicatorManager.canCommit(taskAttemptId);
}
@Override
public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException,
TezException {
- return taskAttemptListener.heartbeat(request);
+ return taskCommunicatorManager.heartbeat(request);
}
@Override
@@ -108,31 +108,31 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
@Override
public void taskAlive(TezTaskAttemptID taskAttemptId) {
- taskAttemptListener.taskAlive(taskAttemptId);
+ taskCommunicatorManager.taskAlive(taskAttemptId);
}
@Override
public void containerAlive(ContainerId containerId) {
if (isKnownContainer(containerId)) {
- taskAttemptListener.containerAlive(containerId);
+ taskCommunicatorManager.containerAlive(containerId);
}
}
@Override
public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
- taskAttemptListener.taskStartedRemotely(taskAttemptId, containerId);
+ taskCommunicatorManager.taskStartedRemotely(taskAttemptId, containerId);
}
@Override
public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
@Nullable String diagnostics) {
- taskAttemptListener.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics);
+ taskCommunicatorManager.taskKilled(taskAttemptId, taskAttemptEndReason, diagnostics);
}
@Override
public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
@Nullable String diagnostics) {
- taskAttemptListener.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics);
+ taskCommunicatorManager.taskFailed(taskAttemptId, taskAttemptEndReason, diagnostics);
}
@@ -196,7 +196,7 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
@Override
public void onStateUpdated(VertexStateUpdate event) {
try {
- taskAttemptListener.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
+ taskCommunicatorManager.vertexStateUpdateNotificationReceived(event, taskCommunicatorIndex);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
new file mode 100644
index 0000000..42df259
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+
+@SuppressWarnings("unchecked")
+@InterfaceAudience.Private
+public class TaskCommunicatorManager extends AbstractService implements
+ TaskCommunicatorManagerInterface {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TaskCommunicatorManager.class);
+
+ private final AppContext context;
+ private final TaskCommunicator[] taskCommunicators;
+ private final TaskCommunicatorContext[] taskCommunicatorContexts;
+ protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
+
+ protected final TaskHeartbeatHandler taskHeartbeatHandler;
+ protected final ContainerHeartbeatHandler containerHeartbeatHandler;
+
+ private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
+
+ private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
+ new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
+ private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
+ new ConcurrentHashMap<ContainerId, ContainerInfo>();
+
+ // Defined primarily to work around ConcurrentMaps not accepting null values
+ private static final class ContainerInfo {
+ TezTaskAttemptID taskAttemptId;
+ ContainerInfo(TezTaskAttemptID taskAttemptId) {
+ this.taskAttemptId = taskAttemptId;
+ }
+ }
+
+ private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);
+
+
+ public TaskCommunicatorManager(AppContext context,
+ TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
+ List<NamedEntityDescriptor> taskCommunicatorDescriptors) {
+ super(TaskCommunicatorManager.class.getName());
+ this.context = context;
+ this.taskHeartbeatHandler = thh;
+ this.containerHeartbeatHandler = chh;
+ Preconditions.checkArgument(
+ taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty(),
+ "TaskCommunicators must be specified");
+ this.taskCommunicators = new TaskCommunicator[taskCommunicatorDescriptors.size()];
+ this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
+ this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
+ for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
+ UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
+ taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
+ taskCommunicators[i] = createTaskCommunicator(taskCommunicatorDescriptors.get(i), i);
+ taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
+ }
+ // TODO TEZ-2118 Start using taskCommunicator indices properly
+ }
+
+ @Override
+ public void serviceStart() {
+ // TODO Why is init tied to serviceStart
+ for (int i = 0 ; i < taskCommunicators.length ; i++) {
+ taskCommunicatorServiceWrappers[i].init(getConfig());
+ taskCommunicatorServiceWrappers[i].start();
+ }
+ }
+
+ @Override
+ public void serviceStop() {
+ for (int i = 0 ; i < taskCommunicators.length ; i++) {
+ taskCommunicatorServiceWrappers[i].stop();
+ }
+ }
+
+ @VisibleForTesting
+ TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
+ int taskCommIndex) {
+ if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
+ return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+ } else if (taskCommDescriptor.getEntityName()
+ .equals(TezConstants.getTezUberServicePluginName())) {
+ return createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+ } else {
+ return createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex],
+ taskCommDescriptor);
+ }
+ }
+
+ @VisibleForTesting
+ TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+ LOG.info("Using Default Task Communicator");
+ return new TezTaskCommunicatorImpl(taskCommunicatorContext);
+ }
+
+ @VisibleForTesting
+ TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+ LOG.info("Using Default Local Task Communicator");
+ return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
+ }
+
+ @VisibleForTesting
+ TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
+ NamedEntityDescriptor taskCommDescriptor) {
+ LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
+ taskCommDescriptor.getClassName());
+ Class<? extends TaskCommunicator> taskCommClazz =
+ (Class<? extends TaskCommunicator>) ReflectionUtils
+ .getClazz(taskCommDescriptor.getClassName());
+ try {
+ Constructor<? extends TaskCommunicator> ctor =
+ taskCommClazz.getConstructor(TaskCommunicatorContext.class);
+ return ctor.newInstance(taskCommunicatorContext);
+ } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
+ throws IOException, TezException {
+ ContainerId containerId = ConverterUtils.toContainerId(request
+ .getContainerIdentifier());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received heartbeat from container"
+ + ", request=" + request);
+ }
+
+ if (!registeredContainers.containsKey(containerId)) {
+ LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
+ ", asking it to die");
+ return RESPONSE_SHOULD_DIE;
+ }
+
+ // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container
+ // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed,
+ // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system
+ // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events).
+ // So - avoiding synchronization.
+
+ pingContainerHeartbeatHandler(containerId);
+ TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
+ TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
+ if (taskAttemptID != null) {
+ ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
+ if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
+ // This can happen when a task heartbeats. Meanwhile the container is unregistered.
+ // The information will eventually make it through to the plugin via a corresponding unregister.
+ // There's a race in that case between the unregister making it through, and this method returning.
+ // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
+ // so that the plugin can handle the scenario. Alternately augment the response with error codes.
+ // Error codes would be better than exceptions.
+ LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
+ return RESPONSE_SHOULD_DIE;
+ }
+
+ List<TezEvent> inEvents = request.getEvents();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ping from " + taskAttemptID.toString() +
+ " events: " + (inEvents != null ? inEvents.size() : -1));
+ }
+
+ long currTime = context.getClock().getTime();
+ List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+ // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
+ // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
+ // to VertexImpl to ensure the events ordering
+ // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
+ // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
+ for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ // for now, set the event time on the AM when it is received.
+ // this avoids any time disparity between machines.
+ tezEvent.setEventReceivedTime(currTime);
+ final EventType eventType = tezEvent.getEventType();
+ if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+ TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+ (TaskStatusUpdateEvent) tezEvent.getEvent());
+ context.getEventHandler().handle(taskAttemptEvent);
+ } else {
+ otherEvents.add(tezEvent);
+ }
+ }
+ if(!otherEvents.isEmpty()) {
+ TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
+ context.getEventHandler().handle(
+ new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
+ }
+ taskHeartbeatHandler.pinged(taskAttemptID);
+ eventInfo = context
+ .getCurrentDAG()
+ .getVertex(taskAttemptID.getTaskID().getVertexID())
+ .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
+ request.getMaxEvents());
+ }
+ return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
+ }
+ public void taskAlive(TezTaskAttemptID taskAttemptId) {
+ taskHeartbeatHandler.pinged(taskAttemptId);
+ }
+
+ public void containerAlive(ContainerId containerId) {
+ pingContainerHeartbeatHandler(containerId);
+ }
+
+ public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
+ context.getEventHandler()
+ .handle(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
+ pingContainerHeartbeatHandler(containerId);
+ }
+
+ public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ String diagnostics) {
+ // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+ // and messages from the scheduler will release the container.
+ // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
+ // instead of waiting for the unregister to flow through the Container.
+ // Fix along the same lines as TEZ-2124 by introducing an explict context.
+ context.getEventHandler().handle(new TaskAttemptEventAttemptKilled(taskAttemptId,
+ diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+ taskAttemptEndReason)));
+ }
+
+ public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ String diagnostics) {
+ // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
+ // and messages from the scheduler will release the container.
+ // TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
+ // instead of waiting for the unregister to flow through the Container.
+ // Fix along the same lines as TEZ-2124 by introducing an explict context.
+ context.getEventHandler().handle(new TaskAttemptEventAttemptFailed(taskAttemptId,
+ TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
+ taskAttemptEndReason)));
+ }
+
+ public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) throws
+ Exception {
+ taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+ }
+
+
+ /**
+ * Child checking whether it can commit.
+ * <p/>
+ * <br/>
+ * Repeatedly polls the ApplicationMaster whether it
+ * {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
+ * centralized commit protocol handling by the JobTracker.
+ */
+// @Override
+ public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+ LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
+ // An attempt is asking if it can commit its output. This can be decided
+ // only by the task which is managing the multiple attempts. So redirect the
+ // request there.
+ taskHeartbeatHandler.progressing(taskAttemptId);
+ pingContainerHeartbeatHandler(taskAttemptId);
+
+ DAG job = context.getCurrentDAG();
+ Task task =
+ job.getVertex(taskAttemptId.getTaskID().getVertexID()).
+ getTask(taskAttemptId.getTaskID());
+ return task.canCommit(taskAttemptId);
+ }
+
+ // The TaskAttemptListener register / unregister methods in this class are not thread safe.
+ // The Tez framework should not invoke these methods from multiple threads.
+ @Override
+ public void dagComplete(DAG dag) {
+ // TODO TEZ-2335. Cleanup TaskHeartbeat handler structures.
+ // TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die.
+ // Container structures remain unchanged - since they could be re-used across restarts.
+ // This becomes more relevant when task kills without container kills are allowed.
+
+ // TODO TEZ-2336. Send a signal to containers indicating DAG completion.
+
+ // Inform all communicators of the dagCompletion.
+ for (int i = 0 ; i < taskCommunicators.length ; i++) {
+ ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
+ taskCommunicators[i].dagComplete(dag.getName());
+ ((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+ }
+
+ }
+
+ @Override
+ public void dagSubmitted() {
+ // Nothing to do right now. Indicates that a new DAG has been submitted and
+ // the context has updated information.
+ }
+
+ @Override
+ public void registerRunningContainer(ContainerId containerId, int taskCommId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
+ }
+ ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO);
+ if (oldInfo != null) {
+ throw new TezUncheckedException(
+ "Multiple registrations for containerId: " + containerId);
+ }
+ NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
+ taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
+ nodeId.getPort());
+ }
+
+ @Override
+ public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
+ }
+ ContainerInfo containerInfo = registeredContainers.remove(containerId);
+ if (containerInfo.taskAttemptId != null) {
+ registeredAttempts.remove(containerInfo.taskAttemptId);
+ }
+ taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
+ }
+
+ @Override
+ public void registerTaskAttempt(AMContainerTask amContainerTask,
+ ContainerId containerId, int taskCommId) {
+ ContainerInfo containerInfo = registeredContainers.get(containerId);
+ if (containerInfo == null) {
+ throw new TezUncheckedException("Registering task attempt: "
+ + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
+ }
+ if (containerInfo.taskAttemptId != null) {
+ throw new TezUncheckedException("Registering task attempt: "
+ + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ + " with existing assignment to: " +
+ containerInfo.taskAttemptId);
+ }
+
+ // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
+ registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID()));
+
+ ContainerId containerIdFromMap = registeredAttempts.put(
+ amContainerTask.getTask().getTaskAttemptID(), containerId);
+ if (containerIdFromMap != null) {
+ throw new TezUncheckedException("Registering task attempt: "
+ + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ + " when already assigned to: " + containerIdFromMap);
+ }
+ taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+ amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
+ amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
+ }
+
+ @Override
+ public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
+ ContainerId containerId = registeredAttempts.remove(attemptId);
+ if (containerId == null) {
+ LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
+ return;
+ }
+ ContainerInfo containerInfo = registeredContainers.get(containerId);
+ if (containerInfo == null) {
+ LOG.warn("Unregister task attempt: " + attemptId +
+ " from non-registered container: " + containerId);
+ return;
+ }
+ // Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
+ registeredContainers.put(containerId, NULL_CONTAINER_INFO);
+ taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
+ }
+
+ @Override
+ public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
+ return taskCommunicators[taskCommIndex];
+ }
+
+ private void pingContainerHeartbeatHandler(ContainerId containerId) {
+ containerHeartbeatHandler.pinged(containerId);
+ }
+
+ private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
+ ContainerId containerId = registeredAttempts.get(taskAttemptId);
+ if (containerId != null) {
+ containerHeartbeatHandler.pinged(containerId);
+ } else {
+ LOG.warn("Handling communication from attempt: " + taskAttemptId
+ + ", ContainerId not known for this attempt");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
new file mode 100644
index 0000000..8d060a2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+/**
+ * This class listens for changes to the state of a Task.
+ */
+public interface TaskCommunicatorManagerInterface {
+
+ void registerRunningContainer(ContainerId containerId, int taskCommId);
+
+ void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
+
+ void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics);
+
+ void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason, String diagnostics);
+
+ void dagComplete(DAG dag);
+
+ void dagSubmitted();
+
+ TaskCommunicator getTaskCommunicator(int taskCommIndex);
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 6b474ff..756ed28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGReport;
@@ -160,7 +160,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
private final Lock readLock;
private final Lock writeLock;
private final String dagName;
- private final TaskAttemptListener taskAttemptListener;
+ private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private final Object tasksSyncHandle = new Object();
@@ -489,7 +489,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Configuration amConf,
DAGPlan jobPlan,
EventHandler eventHandler,
- TaskAttemptListener taskAttemptListener,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
Credentials dagCredentials,
Clock clock,
String appUserName,
@@ -511,7 +511,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
this.clock = clock;
this.appContext = appContext;
- this.taskAttemptListener = taskAttemptListener;
+ this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -1538,7 +1538,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
VertexImpl v = new VertexImpl(
vertexId, vertexPlan, vertexName, dag.dagConf,
- dag.eventHandler, dag.taskAttemptListener,
+ dag.eventHandler, dag.taskCommunicatorManagerInterface,
dag.clock, dag.taskHeartbeatHandler,
!dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6957b1d..3f2e3a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -61,7 +61,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
@@ -403,17 +403,17 @@ public class TaskAttemptImpl implements TaskAttempt,
@SuppressWarnings("rawtypes")
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
- TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
Task task) {
- this(taskId, attemptNumber, eventHandler, taskAttemptListener, conf, clock,
+ this(taskId, attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock,
taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
task, null);
}
public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
- TaskAttemptListener taskAttemptListener, Configuration conf, Clock clock,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
boolean isRescheduled,
Resource resource, ContainerContext containerContext, boolean leafVertex,
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 1b55295..ea14483 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -56,7 +56,7 @@ import org.apache.tez.dag.api.oldrecords.TaskReport;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
@@ -115,7 +115,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.getProperty("line.separator");
protected final Configuration conf;
- protected final TaskAttemptListener taskAttemptListener;
+ protected final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final EventHandler eventHandler;
private final TezTaskID taskId;
@@ -341,7 +341,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
public TaskImpl(TezVertexID vertexId, int taskIndex,
EventHandler eventHandler, Configuration conf,
- TaskAttemptListener taskAttemptListener,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
boolean leafVertex, Resource resource,
ContainerContext containerContext,
@@ -357,7 +357,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
maxFailedAttempts = this.conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
taskId = TezTaskID.getInstance(vertexId, taskIndex);
- this.taskAttemptListener = taskAttemptListener;
+ this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
this.appContext = appContext;
@@ -817,7 +817,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
- taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
+ taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
(failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3cc439f..a1dcf6c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -94,7 +94,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptEventInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
@@ -212,7 +212,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
private final Lock readLock;
private final Lock writeLock;
- private final TaskAttemptListener taskAttemptListener;
+ private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private final TaskHeartbeatHandler taskHeartbeatHandler;
private final Object tasksSyncHandle = new Object();
@@ -890,7 +890,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, Configuration dagConf, EventHandler eventHandler,
- TaskAttemptListener taskAttemptListener, Clock clock,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
TaskHeartbeatHandler thh, boolean commitVertexOutputs,
AppContext appContext, VertexLocationHint vertexLocationHint,
Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
@@ -911,7 +911,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
this.appContext = appContext;
this.commitVertexOutputs = commitVertexOutputs;
- this.taskAttemptListener = taskAttemptListener;
+ this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -2331,7 +2331,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
return new TaskImpl(this.getVertexId(), taskIndex,
this.eventHandler,
vertexConf,
- this.taskAttemptListener,
+ this.taskCommunicatorManagerInterface,
this.clock,
this.taskHeartbeatHandler,
this.appContext,