You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:39 UTC
[37/50] [abbrv] tez git commit: TEZ-2123. Fix component managers to
use pluggable components. Enable hybrid mode. (sseth)
TEZ-2123. Fix component managers to use pluggable components. Enable
hybrid mode. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/25980c1a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/25980c1a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/25980c1a
Branch: refs/heads/TEZ-2003
Commit: 25980c1a70a05fb915e89d246643dc8549e793cc
Parents: adab48f
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Feb 20 11:59:03 2015 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 00:13:29 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 4 +-
.../apache/tez/dag/app/TaskAttemptListener.java | 12 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 30 ++--
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 4 +-
.../TezRootInputInitializerContextImpl.java | 2 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +-
.../tez/dag/app/dag/impl/VertexManager.java | 2 +-
.../app/launcher/ContainerLauncherRouter.java | 2 +-
.../app/launcher/LocalContainerLauncher.java | 10 +-
.../rm/AMSchedulerEventDeallocateContainer.java | 7 +-
.../rm/AMSchedulerEventNodeBlacklistUpdate.java | 8 +-
.../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 10 +-
.../dag/app/rm/LocalTaskSchedulerService.java | 19 ++-
.../tez/dag/app/rm/NMCommunicatorEvent.java | 12 +-
.../rm/NMCommunicatorLaunchRequestEvent.java | 11 +-
.../app/rm/NMCommunicatorStopRequestEvent.java | 4 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 151 ++++++++++++-----
.../tez/dag/app/rm/container/AMContainer.java | 3 +
.../AMContainerEventLaunchRequest.java | 15 +-
.../dag/app/rm/container/AMContainerImpl.java | 39 +++--
.../dag/app/rm/container/AMContainerMap.java | 4 +-
.../apache/tez/dag/app/rm/node/AMNodeImpl.java | 6 +-
.../apache/tez/dag/app/MockDAGAppMaster.java | 2 +-
.../app/TestTaskAttemptListenerImplTezDag.java | 31 ++--
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 69 ++++----
.../tez/dag/app/dag/impl/TestVertexImpl.java | 8 +-
.../tez/dag/app/rm/TestContainerReuse.java | 34 ++--
.../tez/dag/app/rm/TestLocalTaskScheduler.java | 2 +-
.../app/rm/TestLocalTaskSchedulerService.java | 18 ++-
.../app/rm/TestTaskSchedulerEventHandler.java | 11 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 2 +-
.../dag/app/rm/container/TestAMContainer.java | 108 +++++++------
.../app/rm/container/TestAMContainerMap.java | 6 +-
.../org/apache/tez/examples/JoinValidate.java | 30 +++-
.../TezTestServiceContainerLauncher.java | 5 +-
.../rm/TezTestServiceTaskSchedulerService.java | 100 ++----------
.../tez/examples/JoinValidateConfigured.java | 53 ++++++
.../tez/tests/TestExternalTezServices.java | 160 ++++++++++++++-----
39 files changed, 638 insertions(+), 359 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 4bfe08f..1a2264c 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -4,5 +4,6 @@ ALL CHANGES:
TEZ-2090. Add tests for jobs running in external services.
TEZ-2117. Add a manager for ContainerLaunchers running in the AM.
TEZ-2122. Setup pluggable components at AM/Vertex level.
+ TEZ-2123. Fix component managers to use pluggable components. (Enable hybrid mode)
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6814cda..89b6506 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -513,7 +513,7 @@ public class DAGAppMaster extends AbstractService {
this.taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService,
- taskSchedulerClassIdentifiers);
+ taskSchedulerClassIdentifiers, isLocal);
addIfService(taskSchedulerEventHandler, true);
if (enableWebUIService()) {
@@ -2283,6 +2283,7 @@ public class DAGAppMaster extends AbstractService {
// Tez default classnames are populated as TezConfiguration.TEZ_AM_SERVICE_PLUGINS_DEFAULT
private String[] parsePlugins(BiMap<String, Integer> pluginMap, String[] pluginStrings,
String context) {
+ // TODO TEZ-2003 Duplicate error checking - ideally in the client itself. Depends on the final API.
Preconditions.checkState(pluginStrings != null && pluginStrings.length > 0,
"Plugin strings should not be null or empty: " + context);
@@ -2320,6 +2321,7 @@ public class DAGAppMaster extends AbstractService {
}
pluginMap.put(identifierString, index);
classNames[index] = className;
+ index++;
}
return classNames;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index 9caa7cf..e4dad27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TezTaskAttemptID;
/**
@@ -29,18 +30,17 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
*/
public interface TaskAttemptListener {
- InetSocketAddress getAddress();
+ void registerRunningContainer(ContainerId containerId, int taskCommId);
- void registerRunningContainer(ContainerId containerId);
-
- void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId);
+ void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
- void unregisterRunningContainer(ContainerId containerId);
+ void unregisterRunningContainer(ContainerId containerId, int taskCommId);
- void unregisterTaskAttempt(TezTaskAttemptID attemptID);
+ void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId);
void dagComplete(DAG dag);
void dagSubmitted();
+ TaskCommunicator getTaskCommunicator(int taskCommIndex);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index fc4d787..71b0d2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -273,11 +273,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
return task.canCommit(taskAttemptId);
}
- @Override
- public InetSocketAddress getAddress() {
- return taskCommunicators[0].getAddress();
- }
-
// The TaskAttemptListener register / unregister methods in this class are not thread safe.
// The Tez framework should not invoke these methods from multiple threads.
@Override
@@ -297,7 +292,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void registerRunningContainer(ContainerId containerId) {
+ public void registerRunningContainer(ContainerId containerId, int taskCommId) {
if (LOG.isDebugEnabled()) {
LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
}
@@ -307,11 +302,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
"Multiple registrations for containerId: " + containerId);
}
NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
- taskCommunicators[0].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
+ taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
+ nodeId.getPort());
}
@Override
- public void unregisterRunningContainer(ContainerId containerId) {
+ public void unregisterRunningContainer(ContainerId containerId, int taskCommId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
}
@@ -319,12 +315,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
if (containerInfo.taskAttemptId != null) {
registeredAttempts.remove(containerInfo.taskAttemptId);
}
- taskCommunicators[0].registerContainerEnd(containerId);
+ taskCommunicators[taskCommId].registerContainerEnd(containerId);
}
@Override
public void registerTaskAttempt(AMContainerTask amContainerTask,
- ContainerId containerId) {
+ ContainerId containerId, int taskCommId) {
ContainerInfo containerInfo = registeredContainers.get(containerId);
if (containerInfo == null) {
throw new TezUncheckedException("Registering task attempt: "
@@ -354,13 +350,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
+ amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ " when already assigned to: " + containerIdFromMap);
}
- taskCommunicators[0].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
+ taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
amContainerTask.haveCredentialsChanged());
}
@Override
- public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
+ public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) {
ContainerId containerId = registeredAttempts.remove(attemptId);
if (containerId == null) {
LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -374,7 +370,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, NULL_CONTAINER_INFO);
- taskCommunicators[0].unregisterRunningTaskAttempt(attemptId);
+ taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId);
+ }
+
+ @Override
+ public TaskCommunicator getTaskCommunicator(int taskCommIndex) {
+ return taskCommunicators[taskCommIndex];
}
private void pingContainerHeartbeatHandler(ContainerId containerId) {
@@ -413,7 +414,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
return taskAttemptEvent;
}
- public TaskCommunicator getTaskCommunicator() {
- return taskCommunicators[0];
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index c18dc00..c80571d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1218,7 +1218,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// Inform the scheduler
if (sendSchedulerEvent()) {
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
- .getTaskAttemptState()));
+ .getTaskAttemptState(), ta.getVertex().getTaskSchedulerIdentifier()));
}
}
}
@@ -1300,7 +1300,7 @@ public class TaskAttemptImpl implements TaskAttempt,
// Inform the Scheduler.
ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId,
- TaskAttemptState.SUCCEEDED));
+ TaskAttemptState.SUCCEEDED, ta.getVertex().getTaskSchedulerIdentifier()));
// Inform the task.
ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index d4ef4d5..4ca4024 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -96,7 +96,7 @@ public class TezRootInputInitializerContextImpl implements
@Override
public Resource getTotalAvailableResource() {
- return appContext.getTaskScheduler().getTotalResources();
+ return appContext.getTaskScheduler().getTotalResources(vertex.getTaskSchedulerIdentifier());
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ddf670f..81e1732 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4207,7 +4207,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
eventHandler, getTotalTasks(),
appContext.getTaskScheduler().getNumClusterNodes(),
getTaskResource(),
- appContext.getTaskScheduler().getTotalResources());
+ appContext.getTaskScheduler().getTotalResources(taskSchedulerIdentifier));
List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
inputList = Lists.newArrayListWithCapacity(inputsWithInitializers.size());
for (String inputName : inputsWithInitializers) {
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 945d9ba..1300fc0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -286,7 +286,7 @@ public class VertexManager {
@Override
public synchronized Resource getTotalAvailableResource() {
checkAndThrowIfDone();
- return appContext.getTaskScheduler().getTotalResources();
+ return appContext.getTaskScheduler().getTotalResources(managedVertex.getTaskSchedulerIdentifier());
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 621e4a8..4f9b5bf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -124,6 +124,6 @@ public class ContainerLauncherRouter extends AbstractService
@Override
public void handle(NMCommunicatorEvent event) {
- containerLaunchers[0].handle(event);
+ containerLaunchers[event.getLauncherId()].handle(event);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index e9ba9d7..9a38732 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -59,7 +59,6 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.TaskAttemptListenerImpTezDag;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
@@ -88,9 +87,9 @@ public class LocalContainerLauncher extends AbstractService implements
private static final Logger LOG = LoggerFactory.getLogger(LocalContainerLauncher.class);
private final AppContext context;
- private final TezTaskUmbilicalProtocol taskUmbilicalProtocol;
private final AtomicBoolean serviceStopped = new AtomicBoolean(false);
private final String workingDirectory;
+ private final TaskAttemptListener tal;
private final Map<String, String> localEnv = new HashMap<String, String>();
private final ExecutionContext executionContext;
private int numExecutors;
@@ -116,9 +115,8 @@ public class LocalContainerLauncher extends AbstractService implements
String workingDirectory) throws UnknownHostException {
super(LocalContainerLauncher.class.getName());
this.context = context;
- TaskAttemptListenerImpTezDag taListener = (TaskAttemptListenerImpTezDag)taskAttemptListener;
- TezTaskCommunicatorImpl taskComm = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator();
- this.taskUmbilicalProtocol = taskComm.getUmbilical();
+ this.tal = taskAttemptListener;
+
this.workingDirectory = workingDirectory;
AuxiliaryServiceHelper.setServiceDataIntoEnv(
ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv);
@@ -219,7 +217,7 @@ public class LocalContainerLauncher extends AbstractService implements
tezChild =
createTezChild(context.getAMConf(), event.getContainerId(), tokenIdentifier,
context.getApplicationAttemptId().getAttemptId(), context.getLocalDirs(),
- taskUmbilicalProtocol,
+ ((TezTaskCommunicatorImpl)tal.getTaskCommunicator(event.getTaskCommId())).getUmbilical(),
TezCommonUtils.parseCredentialsBytes(event.getContainerLaunchContext().getTokens().array()));
} catch (InterruptedException e) {
handleLaunchFailed(e, event.getContainerId());
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
index 1b51920..5270aa2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java
@@ -23,15 +23,20 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
private final ContainerId containerId;
+ private final int schedulerId;
- public AMSchedulerEventDeallocateContainer(ContainerId containerId) {
+ public AMSchedulerEventDeallocateContainer(ContainerId containerId, int schedulerId) {
super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
this.containerId = containerId;
+ this.schedulerId = schedulerId;
}
public ContainerId getContainerId() {
return this.containerId;
}
+ public int getSchedulerId() {
+ return schedulerId;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
index ed7ebc3..679705a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java
@@ -23,14 +23,20 @@ import org.apache.hadoop.yarn.api.records.NodeId;
public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent {
private final NodeId nodeId;
+ private final int schedulerId;
- public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add) {
+ public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add, int schedulerId) {
super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED
: AMSchedulerEventType.S_NODE_UNBLACKLISTED));
this.nodeId = nodeId;
+ this.schedulerId = schedulerId;
}
public NodeId getNodeId() {
return this.nodeId;
}
+
+ public int getSchedulerId() {
+ return schedulerId;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
index 90e76b7..2ace642 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java
@@ -26,14 +26,16 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
private final TaskAttempt attempt;
private final ContainerId containerId;
- private TaskAttemptState state;
+ private final TaskAttemptState state;
+ private final int schedulerId;
public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId,
- TaskAttemptState state) {
+ TaskAttemptState state, int schedulerId) {
super(AMSchedulerEventType.S_TA_ENDED);
this.attempt = attempt;
this.containerId = containerId;
this.state = state;
+ this.schedulerId = schedulerId;
}
public TezTaskAttemptID getAttemptID() {
@@ -51,4 +53,8 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
public ContainerId getUsedContainerId() {
return this.containerId;
}
+
+ public int getSchedulerId() {
+ return schedulerId;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 51d8b9d..72a074f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -63,10 +64,11 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
final int appHostPort;
final String appTrackingUrl;
final AppContext appContext;
+ final long customContainerAppId;
public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient,
ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
- int appHostPort, String appTrackingUrl, AppContext appContext) {
+ int appHostPort, String appTrackingUrl, long customContainerAppId, AppContext appContext) {
super(LocalTaskSchedulerService.class.getName());
this.realAppClient = appClient;
this.appCallbackExecutor = createAppCallbackExecutorService();
@@ -78,6 +80,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
this.appContext = appContext;
taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
taskAllocations = new LinkedHashMap<Object, Container>();
+ this.customContainerAppId = customContainerAppId;
}
private ExecutorService createAppCallbackExecutorService() {
@@ -164,7 +167,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
return new AsyncDelegateRequestHandler(taskRequestQueue,
- new LocalContainerFactory(appContext),
+ new LocalContainerFactory(appContext, customContainerAppId),
taskAllocations,
appClientDelegate,
conf);
@@ -195,17 +198,19 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
}
static class LocalContainerFactory {
- final AppContext appContext;
AtomicInteger nextId;
+ final ApplicationAttemptId customAppAttemptId;
- public LocalContainerFactory(AppContext appContext) {
- this.appContext = appContext;
+ public LocalContainerFactory(AppContext appContext, long appIdLong) {
this.nextId = new AtomicInteger(1);
+ ApplicationId appId = ApplicationId
+ .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+ this.customAppAttemptId = ApplicationAttemptId
+ .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
}
public Container createContainer(Resource capability, Priority priority) {
- ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
- ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+ ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
String nodeHttpAddress = "127.0.0.1:0";
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
index 8bdeb28..f86894f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorEvent.java
@@ -28,13 +28,15 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
private final ContainerId containerId;
private final NodeId nodeId;
private final Token containerToken;
+ private final int launcherId;
public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
- Token containerToken, NMCommunicatorEventType type) {
+ Token containerToken, NMCommunicatorEventType type, int launcherId) {
super(type);
this.containerId = containerId;
this.nodeId = nodeId;
this.containerToken = containerToken;
+ this.launcherId = launcherId;
}
public ContainerId getContainerId() {
@@ -48,10 +50,14 @@ public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType>
public Token getContainerToken() {
return this.containerToken;
}
-
+
+ public int getLauncherId() {
+ return launcherId;
+ }
+
public String toSrting() {
return super.toString() + " for container " + containerId + ", nodeId: "
- + nodeId;
+ + nodeId + ", launcherId: " + launcherId;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
index c3b12c0..a38345c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
@@ -25,13 +25,16 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
private final ContainerLaunchContext clc;
private final Container container;
+ // The task communicator index for the specific container being launched.
+ private final int taskCommId;
public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
- Container container) {
+ Container container, int launcherId, int taskCommId) {
super(container.getId(), container.getNodeId(), container
- .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+ .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST, launcherId);
this.clc = clc;
this.container = container;
+ this.taskCommId = taskCommId;
}
public ContainerLaunchContext getContainerLaunchContext() {
@@ -42,6 +45,10 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
return container;
}
+ public int getTaskCommId() {
+ return taskCommId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
index 277d1e7..c9b5c44 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorStopRequestEvent.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.api.records.Token;
public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
- Token containerToken) {
+ Token containerToken, int launcherId) {
super(containerId, nodeId, containerToken,
- NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST, launcherId);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 72389e7..5a0ace8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -108,9 +108,22 @@ public class TaskSchedulerEventHandler extends AbstractService
private final String[] taskSchedulerClasses;
protected final TaskSchedulerService []taskSchedulers;
+ private final boolean isPureLocalMode;
+ // If running in non local-only mode, the YARN task scheduler will always run to take care of
+ // registration with YARN and heartbeats to YARN.
+ // Splitting registration and heartbeats is not straigh-forward due to the taskScheduler being
+ // tied to a ContainerRequestType.
+ private final int yarnTaskSchedulerIndex;
+ // Custom AppIds to avoid container conflicts if there's multiple sources
+ private final long SCHEDULER_APP_ID_BASE = 111101111;
+ private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
+
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
+ // Not tracking container / task to schedulerId. Instead relying on everything flowing through
+ // the system and being propagated back via events.
+
/**
*
* @param appContext
@@ -125,7 +138,7 @@ public class TaskSchedulerEventHandler extends AbstractService
public TaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
- String [] schedulerClasses) {
+ String [] schedulerClasses, boolean isPureLocalMode) {
super(TaskSchedulerEventHandler.class.getName());
this.appContext = appContext;
this.eventHandler = eventHandler;
@@ -133,13 +146,39 @@ public class TaskSchedulerEventHandler extends AbstractService
this.containerSignatureMatcher = containerSignatureMatcher;
this.webUI = webUI;
this.historyUrl = getHistoryUrl();
+ this.isPureLocalMode = isPureLocalMode;
if (this.webUI != null) {
this.webUI.setHistoryUrl(this.historyUrl);
}
- if (schedulerClasses == null || schedulerClasses.length == 0) {
- this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+
+ // Override everything for pure local mode
+ if (isPureLocalMode) {
+ this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT};
+ this.yarnTaskSchedulerIndex = -1;
} else {
- this.taskSchedulerClasses = schedulerClasses;
+ if (schedulerClasses == null || schedulerClasses.length ==0) {
+ this.taskSchedulerClasses = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT};
+ this.yarnTaskSchedulerIndex = 0;
+ } else {
+ // Ensure the YarnScheduler will be setup and note it's index. This will be responsible for heartbeats and YARN registration.
+ int foundYarnTaskSchedulerIndex = -1;
+ for (int i = 0 ; i < schedulerClasses.length ; i++) {
+ if (schedulerClasses[i].equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
+ foundYarnTaskSchedulerIndex = i;
+ break;
+ }
+ }
+ if (foundYarnTaskSchedulerIndex == -1) { // Not found. Add at the end.
+ this.taskSchedulerClasses = new String[schedulerClasses.length+1];
+ foundYarnTaskSchedulerIndex = this.taskSchedulerClasses.length -1;
+ for (int i = 0 ; i < schedulerClasses.length ; i++) { // Copy over the rest.
+ this.taskSchedulerClasses[i] = schedulerClasses[i];
+ }
+ } else {
+ this.taskSchedulerClasses = schedulerClasses;
+ }
+ this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
+ }
}
taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length];
}
@@ -157,12 +196,12 @@ public class TaskSchedulerEventHandler extends AbstractService
return cachedNodeCount;
}
- public Resource getAvailableResources() {
- return taskSchedulers[0].getAvailableResources();
+ public Resource getAvailableResources(int schedulerId) {
+ return taskSchedulers[schedulerId].getAvailableResources();
}
- public Resource getTotalResources() {
- return taskSchedulers[0].getTotalResources();
+ public Resource getTotalResources(int schedulerId) {
+ return taskSchedulers[schedulerId].getTotalResources();
}
public synchronized void handleEvent(AMSchedulerEvent sEvent) {
@@ -176,7 +215,7 @@ public class TaskSchedulerEventHandler extends AbstractService
switch(event.getState()) {
case FAILED:
case KILLED:
- handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent);
+ handleTAUnsuccessfulEnd(event);
break;
case SUCCEEDED:
handleTASucceeded(event);
@@ -228,9 +267,9 @@ public class TaskSchedulerEventHandler extends AbstractService
private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
- taskSchedulers[0].blacklistNode(event.getNodeId());
+ taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
} else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
- taskSchedulers[0].unblacklistNode(event.getNodeId());
+ taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
} else {
throw new TezUncheckedException("Invalid event type: " + event.getType());
}
@@ -242,14 +281,14 @@ public class TaskSchedulerEventHandler extends AbstractService
// TODO what happens to the task that was connected to this container?
// current assumption is that it will eventually call handleTaStopRequest
//TaskAttempt taskAttempt = (TaskAttempt)
- taskSchedulers[0].deallocateContainer(containerId);
+ taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
// TODO does this container need to be stopped via C_STOP_REQUEST
sendEvent(new AMContainerEventStopRequest(containerId));
}
private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
TaskAttempt attempt = event.getAttempt();
- boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, false);
+ boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, false);
// use stored value of container id in case the scheduler has removed this
// assignment because the task has been deallocated earlier.
// retroactive case
@@ -291,7 +330,8 @@ public class TaskSchedulerEventHandler extends AbstractService
event.getAttemptID()));
}
- boolean wasContainerAllocated = taskSchedulers[0].deallocateTask(attempt, true);
+ boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
+ true);
if (!wasContainerAllocated) {
LOG.error("De-allocated successful task: " + attempt.getID()
+ ", but TaskScheduler reported no container assigned to task");
@@ -316,7 +356,7 @@ public class TaskSchedulerEventHandler extends AbstractService
TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
if (affinityAttempt != null) {
Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
- taskSchedulers[0].allocateTask(taskAttempt,
+ taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
event.getCapability(),
affinityAttempt.getAssignedContainerID(),
Priority.newInstance(event.getPriority()),
@@ -336,7 +376,7 @@ public class TaskSchedulerEventHandler extends AbstractService
}
}
- taskSchedulers[0].allocateTask(taskAttempt,
+ taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
event.getCapability(),
hosts,
racks,
@@ -347,7 +387,8 @@ public class TaskSchedulerEventHandler extends AbstractService
private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
AppContext appContext,
- String schedulerClassName) {
+ String schedulerClassName,
+ long customAppIdIdentifier) {
if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
@@ -355,7 +396,7 @@ public class TaskSchedulerEventHandler extends AbstractService
} else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
LOG.info("Creating TaskScheduler: Local TaskScheduler");
return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
- host, port, trackingUrl, appContext);
+ host, port, trackingUrl, customAppIdIdentifier, appContext);
} else {
LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
// TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface.
@@ -364,9 +405,10 @@ public class TaskSchedulerEventHandler extends AbstractService
try {
Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
.getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
- int.class, String.class, Configuration.class);
+ int.class, String.class, long.class, Configuration.class);
ctor.setAccessible(true);
- return ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+ return ctor.newInstance(this, appContext, host, port, trackingUrl, customAppIdIdentifier,
+ getConfig());
} catch (NoSuchMethodException e) {
throw new TezUncheckedException(e);
} catch (InvocationTargetException e) {
@@ -381,10 +423,19 @@ public class TaskSchedulerEventHandler extends AbstractService
@VisibleForTesting
protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) {
+ // TODO Add error checking for components being used in the Vertex when running in pure local mode.
// Iterate over the list and create all the taskSchedulers
+ int j = 0;
for (int i = 0; i < taskSchedulerClasses.length; i++) {
+ long customAppIdIdentifier;
+ if (isPureLocalMode || taskSchedulerClasses[i].equals(
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { // Use the app identifier from the appId.
+ customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
+ } else {
+ customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
+ }
taskSchedulers[i] = createTaskScheduler(host, port,
- trackingUrl, appContext, taskSchedulerClasses[i]);
+ trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier);
}
}
@@ -403,12 +454,12 @@ public class TaskSchedulerEventHandler extends AbstractService
for (int i = 0 ; i < taskSchedulers.length ; i++) {
taskSchedulers[i].init(getConfig());
taskSchedulers[i].start();
- }
-
- // TODO TEZ-2118 Start using multiple task schedulers
- if (shouldUnregisterFlag.get()) {
- // Flag may have been set earlier when task scheduler was not initialized
- taskSchedulers[0].setShouldUnregister();
+ if (shouldUnregisterFlag.get()) {
+ // Flag may have been set earlier when task scheduler was not initialized
+ // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ?
+ // External services could need to talk to some other entity.
+ taskSchedulers[i].setShouldUnregister();
+ }
}
this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
@@ -457,8 +508,10 @@ public class TaskSchedulerEventHandler extends AbstractService
if (eventHandlingThread != null)
eventHandlingThread.interrupt();
}
- if (taskSchedulers[0] != null) {
- ((AbstractService)taskSchedulers[0]).stop();
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ if (taskSchedulers[i] != null) {
+ taskSchedulers[i].stop();
+ }
}
}
@@ -467,15 +520,18 @@ public class TaskSchedulerEventHandler extends AbstractService
public synchronized void taskAllocated(Object task,
Object appCookie,
Container container) {
+ AMSchedulerEventTALaunchRequest event =
+ (AMSchedulerEventTALaunchRequest) appCookie;
ContainerId containerId = container.getId();
- if (appContext.getAllContainers().addContainerIfNew(container)) {
+ if (appContext.getAllContainers()
+ .addContainerIfNew(container, event.getSchedulerId(), event.getLauncherId(),
+ event.getTaskCommId())) {
appContext.getNodeTracker().nodeSeen(container.getNodeId());
sendEvent(new AMNodeEventContainerAllocated(container
.getNodeId(), container.getId()));
}
- AMSchedulerEventTALaunchRequest event =
- (AMSchedulerEventTALaunchRequest) appCookie;
+
TaskAttempt taskAttempt = event.getTaskAttempt();
// TODO - perhaps check if the task still needs this container
// because the deallocateTask downcall may have raced with the
@@ -484,7 +540,7 @@ public class TaskSchedulerEventHandler extends AbstractService
if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
- event.getContainerContext()));
+ event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
}
sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
@@ -603,6 +659,9 @@ public class TaskSchedulerEventHandler extends AbstractService
public float getProgress() {
// at this point allocate has been called and so node count must be available
// may change after YARN-1722
+ // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
+ // node updates from the cluster.
+ // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
int nodeCount = taskSchedulers[0].getClusterNodeCount();
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
@@ -618,7 +677,9 @@ public class TaskSchedulerEventHandler extends AbstractService
}
public void dagCompleted() {
- taskSchedulers[0].dagComplete();
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ taskSchedulers[i].dagComplete();
+ }
}
public void dagSubmitted() {
@@ -628,7 +689,10 @@ public class TaskSchedulerEventHandler extends AbstractService
@Override
public void preemptContainer(ContainerId containerId) {
- taskSchedulers[0].deallocateContainer(containerId);
+ // TODO Why is this making a call back into the scheduler, when the call is originating from there.
+ // An AMContainer instance should already exist if an attempt is being made to preempt it
+ AMContainer amContainer = appContext.getAllContainers().get(containerId);
+ taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
// Inform the Containers about completion.
sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
"Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
@@ -637,13 +701,24 @@ public class TaskSchedulerEventHandler extends AbstractService
public void setShouldUnregisterFlag() {
LOG.info("TaskScheduler notified that it should unregister from RM");
this.shouldUnregisterFlag.set(true);
- if (this.taskSchedulers[0] != null) {
- this.taskSchedulers[0].setShouldUnregister();
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ if (this.taskSchedulers[i] != null) {
+ // TODO TEZ-2003 registration required for all schedulers ?
+ this.taskSchedulers[i].setShouldUnregister();
+ }
}
}
public boolean hasUnregistered() {
- return this.taskSchedulers[0].hasUnregistered();
+ boolean result = true;
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ // TODO TEZ-2003 registration required for all schedulers ?
+ result |= this.taskSchedulers[i].hasUnregistered();
+ if (result == false) {
+ return result;
+ }
+ }
+ return result;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
index 0fc2e12..6616896 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainer.java
@@ -34,4 +34,7 @@ public interface AMContainer extends EventHandler<AMContainerEvent>{
public List<TezTaskAttemptID> getAllTaskAttempts();
public TezTaskAttemptID getCurrentTaskAttempt();
+ public int getTaskSchedulerIdentifier();
+ public int getContainerLauncherIdentifier();
+ public int getTaskCommunicatorIdentifier();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
index d973264..92e5817 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
@@ -27,12 +27,17 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
private final TezVertexID vertexId;
private final ContainerContext containerContext;
+ private final int launcherId;
+ private final int taskCommId;
public AMContainerEventLaunchRequest(ContainerId containerId,
- TezVertexID vertexId, ContainerContext containerContext) {
+ TezVertexID vertexId, ContainerContext containerContext,
+ int launcherId, int taskCommId) {
super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
this.vertexId = vertexId;
this.containerContext = containerContext;
+ this.launcherId = launcherId;
+ this.taskCommId = taskCommId;
}
public TezDAGID getDAGId() {
@@ -46,4 +51,12 @@ public class AMContainerEventLaunchRequest extends AMContainerEvent {
public ContainerContext getContainerContext() {
return this.containerContext;
}
+
+ public int getLauncherId() {
+ return launcherId;
+ }
+
+ public int getTaskCommId() {
+ return taskCommId;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 1acec9c..39df2e8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -81,6 +81,9 @@ public class AMContainerImpl implements AMContainer {
private final TaskAttemptListener taskAttemptListener;
protected final EventHandler eventHandler;
private final ContainerSignatureMatcher signatureMatcher;
+ private final int schedulerId;
+ private final int launcherId;
+ private final int taskCommId;
private final List<TezTaskAttemptID> completedAttempts =
new LinkedList<TezTaskAttemptID>();
@@ -302,7 +305,7 @@ public class AMContainerImpl implements AMContainer {
// additional change - JvmID, YarnChild, etc depend on TaskType.
public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher,
- AppContext appContext) {
+ AppContext appContext, int schedulerId, int launcherId, int taskCommId) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -314,6 +317,9 @@ public class AMContainerImpl implements AMContainer {
this.containerHeartbeatHandler = chh;
this.taskAttemptListener = tal;
this.failedAssignments = new LinkedList<TezTaskAttemptID>();
+ this.schedulerId = schedulerId;
+ this.launcherId = launcherId;
+ this.taskCommId = taskCommId;
this.stateMachine = stateMachineFactory.make(this);
}
@@ -363,6 +369,21 @@ public class AMContainerImpl implements AMContainer {
}
}
+ @Override
+ public int getTaskSchedulerIdentifier() {
+ return this.schedulerId;
+ }
+
+ @Override
+ public int getContainerLauncherIdentifier() {
+ return this.launcherId;
+ }
+
+ @Override
+ public int getTaskCommunicatorIdentifier() {
+ return this.taskCommId;
+ }
+
public boolean isInErrorState() {
return inError;
}
@@ -432,7 +453,7 @@ public class AMContainerImpl implements AMContainer {
containerContext.getLocalResources(),
containerContext.getEnvironment(),
containerContext.getJavaOpts(),
- container.taskAttemptListener.getAddress(), containerContext.getCredentials(),
+ container.taskAttemptListener.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
container.appContext, container.container.getResource(),
container.appContext.getAMConf());
@@ -1014,7 +1035,7 @@ public class AMContainerImpl implements AMContainer {
}
protected void deAllocate() {
- sendEvent(new AMSchedulerEventDeallocateContainer(containerId));
+ sendEvent(new AMSchedulerEventDeallocateContainer(containerId, schedulerId));
}
protected void sendTerminatedToTaskAttempt(
@@ -1044,28 +1065,28 @@ public class AMContainerImpl implements AMContainer {
}
protected void sendStartRequestToNM(ContainerLaunchContext clc) {
- sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container));
+ sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, taskCommId));
}
protected void sendStopRequestToNM() {
sendEvent(new NMCommunicatorStopRequestEvent(containerId,
- container.getNodeId(), container.getContainerToken()));
+ container.getNodeId(), container.getContainerToken(), launcherId));
}
protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
- taskAttemptListener.unregisterTaskAttempt(attemptId);
+ taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId);
}
protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
- taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId);
+ taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
}
protected void registerWithTAListener() {
- taskAttemptListener.registerRunningContainer(containerId);
+ taskAttemptListener.registerRunningContainer(containerId, taskCommId);
}
protected void unregisterFromTAListener() {
- this.taskAttemptListener.unregisterRunningContainer(containerId);
+ this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId);
}
protected void registerWithContainerListener() {
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 574c38e..938096d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -62,9 +62,9 @@ public class AMContainerMap extends AbstractService implements EventHandler<AMCo
}
}
- public boolean addContainerIfNew(Container container) {
+ public boolean addContainerIfNew(Container container, int schedulerId, int launcherId, int taskCommId) {
AMContainer amc = new AMContainerImpl(container, chh, tal,
- containerSignatureMatcher, context);
+ containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
return (containerMap.putIfAbsent(container.getId(), amc) == null);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
index b93cab3..0d8e4cd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java
@@ -257,7 +257,8 @@ public class AMNodeImpl implements AMNode {
// these containers are not useful anymore
pastContainers.addAll(containers);
containers.clear();
- sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
+ // TODO TEZ-2124 node tracking per ext source
+ sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0));
}
@SuppressWarnings("unchecked")
@@ -363,7 +364,8 @@ public class AMNodeImpl implements AMNode {
public void transition(AMNodeImpl node, AMNodeEvent nEvent) {
node.ignoreBlacklisting = ignore;
if (node.getState() == AMNodeState.BLACKLISTED) {
- node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false));
+ // TODO TEZ-2124 node tracking per ext source
+ node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 26fc1ab..a466bc6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -192,7 +192,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
@Override
public void serviceStart() throws Exception {
taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
- taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator();
+ taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0);
eventHandlingThread = new Thread(this);
eventHandlingThread.start();
ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency,
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 16bd1d3..bffb5b9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -108,9 +108,16 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(dag).when(appContext).getCurrentDAG();
doReturn(appAcls).when(appContext).getApplicationACLs();
doReturn(amContainerMap).when(appContext).getAllContainers();
+ NodeId nodeId = NodeId.newInstance("localhost", 0);
+
+ AMContainer amContainer = mock(AMContainer.class);
+ Container container = mock(Container.class);
+ doReturn(nodeId).when(container).getNodeId();
+ doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+ doReturn(container).when(amContainer).getContainer();
taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
- mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
+ mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null);
taskSpec = mock(TaskSpec.class);
doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
@@ -121,7 +128,7 @@ public class TestTaskAttemptListenerImplTezDag {
@Test(timeout = 5000)
public void testGetTask() throws IOException {
- TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
+ TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
ContainerId containerId1 = createContainerId(appId, 1);
@@ -131,55 +138,55 @@ public class TestTaskAttemptListenerImplTezDag {
ContainerId containerId2 = createContainerId(appId, 2);
ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
- taskAttemptListener.registerRunningContainer(containerId2);
+ taskAttemptListener.registerRunningContainer(containerId2, 0);
containerTask = tezUmbilical.getTask(containerContext2);
assertNull(containerTask);
// Valid task registered
- taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2);
+ taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
containerTask = tezUmbilical.getTask(containerContext2);
assertFalse(containerTask.shouldDie());
assertEquals(taskSpec, containerTask.getTaskSpec());
// Task unregistered. Should respond to heartbeats
- taskAttemptListener.unregisterTaskAttempt(taskAttemptId);
+ taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0);
containerTask = tezUmbilical.getTask(containerContext2);
assertNull(containerTask);
// Container unregistered. Should send a shouldDie = true
- taskAttemptListener.unregisterRunningContainer(containerId2);
+ taskAttemptListener.unregisterRunningContainer(containerId2, 0);
containerTask = tezUmbilical.getTask(containerContext2);
assertTrue(containerTask.shouldDie());
ContainerId containerId3 = createContainerId(appId, 3);
ContainerContext containerContext3 = new ContainerContext(containerId3.toString());
- taskAttemptListener.registerRunningContainer(containerId3);
+ taskAttemptListener.registerRunningContainer(containerId3, 0);
// Register task to container3, followed by unregistering container 3 all together
TaskSpec taskSpec2 = mock(TaskSpec.class);
TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
- taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3);
- taskAttemptListener.unregisterRunningContainer(containerId3);
+ taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
+ taskAttemptListener.unregisterRunningContainer(containerId3, 0);
containerTask = tezUmbilical.getTask(containerContext3);
assertTrue(containerTask.shouldDie());
}
@Test(timeout = 5000)
public void testGetTaskMultiplePulls() throws IOException {
- TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator();
+ TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
ContainerId containerId1 = createContainerId(appId, 1);
doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1);
ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
- taskAttemptListener.registerRunningContainer(containerId1);
+ taskAttemptListener.registerRunningContainer(containerId1, 0);
containerTask = tezUmbilical.getTask(containerContext1);
assertNull(containerTask);
// Register task
- taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1);
+ taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
containerTask = tezUmbilical.getTask(containerContext1);
assertFalse(containerTask.shouldDie());
assertEquals(taskSpec, containerTask.getTaskSpec());
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 60c4c88..9df225c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -273,8 +274,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -323,8 +325,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -345,7 +348,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -424,8 +427,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -446,7 +450,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -489,8 +493,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -511,7 +516,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -581,8 +586,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -604,7 +610,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -712,8 +718,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -735,7 +742,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -804,8 +811,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -826,7 +834,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -899,8 +907,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -921,7 +930,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -1002,8 +1011,9 @@ public class TestTaskAttempt {
MockEventHandler eventHandler = spy(new MockEventHandler());
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1024,7 +1034,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
@@ -1102,8 +1112,9 @@ public class TestTaskAttempt {
MockEventHandler mockEh = new MockEventHandler();
MockEventHandler eventHandler = spy(mockEh);
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
Configuration taskConf = new Configuration();
taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1124,7 +1135,7 @@ public class TestTaskAttempt {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appCtx);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
doReturn(containers).when(appCtx).getAllContainers();
http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 99ec6cf..df29eaa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2176,7 +2176,7 @@ public class TestVertexImpl {
doReturn(dagId).when(appContext).getCurrentDAGID();
doReturn(dagId).when(dag).getID();
doReturn(taskScheduler).when(appContext).getTaskScheduler();
- doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
+ doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources(0);
doReturn(historyEventHandler).when(appContext).getHistoryHandler();
doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
@@ -2942,7 +2942,7 @@ public class TestVertexImpl {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appContext);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(containers).when(appContext).getAllContainers();
ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
@@ -2977,7 +2977,7 @@ public class TestVertexImpl {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appContext);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(containers).when(appContext).getAllContainers();
ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
@@ -3013,7 +3013,7 @@ public class TestVertexImpl {
AMContainerMap containers = new AMContainerMap(
mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
new ContainerContextMatcher(), appContext);
- containers.addContainerIfNew(container);
+ containers.addContainerIfNew(container, 0, 0, 0);
doReturn(containers).when(appContext).getAllContainers();
ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));