You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:52 UTC
[5/7] tez git commit: TEZ-2708. Rename classes and variables post
TEZ-2003 changes. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
deleted file mode 100644
index 76d870c..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ /dev/null
@@ -1,784 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.serviceplugins.api.TaskScheduler;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
-import org.apache.tez.dag.api.client.DAGClientServer;
-import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMaster;
-import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
-import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
-import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
-import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
-import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
-import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
-import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
-import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
-import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
-import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
-import org.apache.tez.dag.app.web.WebUIService;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-
-import com.google.common.base.Preconditions;
-
-
-public class TaskSchedulerEventHandler extends AbstractService implements
- EventHandler<AMSchedulerEvent> {
- static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerEventHandler.class);
-
- static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
- static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
-
- protected final AppContext appContext;
- @SuppressWarnings("rawtypes")
- private final EventHandler eventHandler;
- private final String historyUrl;
- private DAGAppMaster dagAppMaster;
- private Map<ApplicationAccessType, String> appAcls = null;
- private Thread eventHandlingThread;
- private volatile boolean stopEventHandling;
- // Has a signal (SIGTERM etc) been issued?
- protected volatile boolean isSignalled = false;
- final DAGClientServer clientService;
- private final ContainerSignatureMatcher containerSignatureMatcher;
- private int cachedNodeCount = -1;
- private AtomicBoolean shouldUnregisterFlag =
- new AtomicBoolean(false);
- private final WebUIService webUI;
- private final NamedEntityDescriptor[] taskSchedulerDescriptors;
- protected final TaskScheduler[]taskSchedulers;
- protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
-
- // Single executor service shared by all Schedulers for context callbacks
- @VisibleForTesting
- final ExecutorService appCallbackExecutor;
-
- private final boolean isPureLocalMode;
- // If running in non local-only mode, the YARN task scheduler will always run to take care of
- // registration with YARN and heartbeats to YARN.
- // Splitting registration and heartbeats is not straight-forward due to the taskScheduler being
- // tied to a ContainerRequestType.
- // Custom AppIds to avoid container conflicts if there's multiple sources
- private final long SCHEDULER_APP_ID_BASE = 111101111;
- private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
-
- BlockingQueue<AMSchedulerEvent> eventQueue
- = new LinkedBlockingQueue<AMSchedulerEvent>();
-
- // Not tracking container / task to schedulerId. Instead relying on everything flowing through
- // the system and being propagated back via events.
-
- /**
- *
- * @param appContext
- * @param clientService
- * @param eventHandler
- * @param containerSignatureMatcher
- * @param webUI
- * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
- * An empty list defaults to using the YarnTaskScheduler as the only source.
- * @param isPureLocalMode whether the AM is running in local mode
- */
- @SuppressWarnings("rawtypes")
- public TaskSchedulerEventHandler(AppContext appContext,
- DAGClientServer clientService, EventHandler eventHandler,
- ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI,
- List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) {
- super(TaskSchedulerEventHandler.class.getName());
- Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
- "TaskSchedulerDescriptors must be specified");
- this.appContext = appContext;
- this.eventHandler = eventHandler;
- this.clientService = clientService;
- this.containerSignatureMatcher = containerSignatureMatcher;
- this.webUI = webUI;
- this.historyUrl = getHistoryUrl();
- this.isPureLocalMode = isPureLocalMode;
- this.appCallbackExecutor = createAppCallbackExecutorService();
- if (this.webUI != null) {
- this.webUI.setHistoryUrl(this.historyUrl);
- }
-
- this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]);
-
- taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length];
- taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
- }
-
- public Map<ApplicationAccessType, String> getApplicationAcls() {
- return appAcls;
- }
-
- public void setSignalled(boolean isSignalled) {
- this.isSignalled = isSignalled;
- LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
- }
-
- public int getNumClusterNodes() {
- return cachedNodeCount;
- }
-
- public Resource getAvailableResources(int schedulerId) {
- return taskSchedulers[schedulerId].getAvailableResources();
- }
-
- public Resource getTotalResources(int schedulerId) {
- return taskSchedulers[schedulerId].getTotalResources();
- }
-
- private ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
- .setDaemon(true)
- .build());
- }
-
- public synchronized void handleEvent(AMSchedulerEvent sEvent) {
- LOG.info("Processing the event " + sEvent.toString());
- switch (sEvent.getType()) {
- case S_TA_LAUNCH_REQUEST:
- handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
- break;
- case S_TA_ENDED: // TaskAttempt considered complete.
- AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
- switch(event.getState()) {
- case FAILED:
- case KILLED:
- handleTAUnsuccessfulEnd(event);
- break;
- case SUCCEEDED:
- handleTASucceeded(event);
- break;
- default:
- throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
- }
- break;
- case S_CONTAINER_DEALLOCATE:
- handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
- break;
- case S_NODE_UNBLACKLISTED:
- // fall through
- case S_NODE_BLACKLISTED:
- handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
- break;
- case S_NODE_UNHEALTHY:
- break;
- case S_NODE_HEALTHY:
- // Consider changing this to work like BLACKLISTING.
- break;
- default:
- break;
- }
- }
-
- @Override
- public void handle(AMSchedulerEvent event) {
- int qSize = eventQueue.size();
- if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
- }
- int remCapacity = eventQueue.remainingCapacity();
- if (remCapacity < 1000) {
- LOG.warn("Very low remaining capacity in the event-queue "
- + "of RMContainerAllocator: " + remCapacity);
- }
- try {
- eventQueue.put(event);
- } catch (InterruptedException e) {
- throw new TezUncheckedException(e);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void sendEvent(Event<?> event) {
- eventHandler.handle(event);
- }
-
- private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
- if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
- taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
- } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
- taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
- } else {
- throw new TezUncheckedException("Invalid event type: " + event.getType());
- }
- }
-
- private void handleContainerDeallocate(
- AMSchedulerEventDeallocateContainer event) {
- ContainerId containerId = event.getContainerId();
- // TODO what happens to the task that was connected to this container?
- // current assumption is that it will eventually call handleTaStopRequest
- //TaskAttempt taskAttempt = (TaskAttempt)
- taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
- // TODO does this container need to be stopped via C_STOP_REQUEST
- sendEvent(new AMContainerEventStopRequest(containerId));
- }
-
- private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
- TaskAttempt attempt = event.getAttempt();
- // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
- boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
- .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
- // use stored value of container id in case the scheduler has removed this
- // assignment because the task has been deallocated earlier.
- // retroactive case
- ContainerId attemptContainerId = attempt.getAssignedContainerID();
-
- if(!wasContainerAllocated) {
- LOG.info("Task: " + attempt.getID() +
- " has no container assignment in the scheduler");
- if (attemptContainerId != null) {
- LOG.error("No container allocated to task: " + attempt.getID()
- + " according to scheduler. Task reported container id: "
- + attemptContainerId);
- }
- }
-
- if (attemptContainerId != null) {
- // TODO either ways send the necessary events
- // Ask the container to stop.
- sendEvent(new AMContainerEventStopRequest(attemptContainerId));
- // Inform the Node - the task has asked to be STOPPED / has already
- // stopped.
- // AMNodeImpl blacklisting logic does not account for KILLED attempts.
- sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
- get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
- attemptContainerId,
- attempt.getID(), event.getState() == TaskAttemptState.FAILED));
- }
- }
-
- private void handleTASucceeded(AMSchedulerEventTAEnded event) {
- TaskAttempt attempt = event.getAttempt();
- ContainerId usedContainerId = event.getUsedContainerId();
-
- // This could be null if a task fails / is killed before a container is
- // assigned to it.
- if (event.getUsedContainerId() != null) {
- sendEvent(new AMContainerEventTASucceeded(usedContainerId,
- event.getAttemptID()));
- sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
- get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
- event.getAttemptID()));
- }
-
- boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
- true, null, event.getDiagnostics());
- if (!wasContainerAllocated) {
- LOG.error("De-allocated successful task: " + attempt.getID()
- + ", but TaskScheduler reported no container assigned to task");
- }
- }
-
- private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
- TaskAttempt taskAttempt = event.getTaskAttempt();
- TaskLocationHint locationHint = event.getLocationHint();
- String hosts[] = null;
- String racks[] = null;
- if (locationHint != null) {
- TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
- if (taskAffinity != null) {
- Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
- Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity
- + " for attempt: " + taskAttempt.getID());
- int taskIndex = taskAffinity.getTaskIndex();
- Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(),
- "Invalid taskIndex in task based affinity " + taskAffinity
- + " for attempt: " + taskAttempt.getID());
- TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
- if (affinityAttempt != null) {
- Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
- taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
- event.getCapability(),
- affinityAttempt.getAssignedContainerID(),
- Priority.newInstance(event.getPriority()),
- event.getContainerContext(),
- event);
- return;
- }
- LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity
- + " but no locality information exists for it. Ignoring hint.");
- // fall through with null hosts/racks
- } else {
- hosts = (locationHint.getHosts() != null) ? locationHint
- .getHosts().toArray(
- new String[locationHint.getHosts().size()]) : null;
- racks = (locationHint.getRacks() != null) ? locationHint.getRacks()
- .toArray(new String[locationHint.getRacks().size()]) : null;
- }
- }
-
- taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
- event.getCapability(),
- hosts,
- racks,
- Priority.newInstance(event.getPriority()),
- event.getContainerContext(),
- event);
- }
-
- @VisibleForTesting
- TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
- AppContext appContext,
- NamedEntityDescriptor taskSchedulerDescriptor,
- long customAppIdIdentifier,
- int schedulerId) {
- TaskSchedulerContext rawContext =
- new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
- customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
- TaskSchedulerContext wrappedContext = wrapTaskSchedulerContext(rawContext);
- String schedulerName = taskSchedulerDescriptor.getEntityName();
- if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
- return createYarnTaskScheduler(wrappedContext, schedulerId);
- } else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
- return createUberTaskScheduler(wrappedContext, schedulerId);
- } else {
- return createCustomTaskScheduler(wrappedContext, taskSchedulerDescriptor, schedulerId);
- }
- }
-
- @VisibleForTesting
- TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
- return new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
- }
-
- @VisibleForTesting
- TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext,
- int schedulerId) {
- LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
- return new YarnTaskSchedulerService(taskSchedulerContext);
- }
-
- @VisibleForTesting
- TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext,
- int schedulerId) {
- LOG.info("Creating TaskScheduler: Local TaskScheduler");
- return new LocalTaskSchedulerService(taskSchedulerContext);
- }
-
- @SuppressWarnings("unchecked")
- TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
- NamedEntityDescriptor taskSchedulerDescriptor,
- int schedulerId) {
- LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
- taskSchedulerDescriptor.getClassName());
- Class<? extends TaskScheduler> taskSchedulerClazz =
- (Class<? extends TaskScheduler>) ReflectionUtils
- .getClazz(taskSchedulerDescriptor.getClassName());
- try {
- Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
- .getConstructor(TaskSchedulerContext.class);
- return ctor.newInstance(taskSchedulerContext);
- } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
- throw new TezUncheckedException(e);
- }
- }
-
- @VisibleForTesting
- protected void instantiateSchedulers(String host, int port, String trackingUrl,
- AppContext appContext) {
- // Iterate over the list and create all the taskSchedulers
- int j = 0;
- for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
- long customAppIdIdentifier;
- if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals(
- TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
- customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
- } else {
- customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
- }
- LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
- customAppIdIdentifier);
- taskSchedulers[i] = createTaskScheduler(host, port,
- trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
- taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);
- }
- }
-
-
- @Override
- public synchronized void serviceStart() {
- InetSocketAddress serviceAddr = clientService.getBindAddress();
- dagAppMaster = appContext.getAppMaster();
- // if web service is enabled then set tracking url. else disable it (value = "").
- // the actual url set on the rm web ui will be the proxy url set by WebAppProxyServlet, which
- // always try to connect to AM and proxy the response. hence it wont work if the webUIService
- // is not enabled.
- String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
- instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
-
- for (int i = 0 ; i < taskSchedulers.length ; i++) {
- taskSchedulerServiceWrappers[i].init(getConfig());
- taskSchedulerServiceWrappers[i].start();
- if (shouldUnregisterFlag.get()) {
- // Flag may have been set earlier when task scheduler was not initialized
- // External services could need to talk to some other entity.
- taskSchedulers[i].setShouldUnregister();
- }
- }
-
- this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
- @Override
- public void run() {
-
- AMSchedulerEvent event;
-
- while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
- try {
- if (TaskSchedulerEventHandler.this.eventQueue.peek() == null) {
- notifyForTest();
- }
- event = TaskSchedulerEventHandler.this.eventQueue.take();
- } catch (InterruptedException e) {
- if(!stopEventHandling) {
- LOG.warn("Continuing after interrupt : ", e);
- }
- continue;
- }
-
- try {
- handleEvent(event);
- } catch (Throwable t) {
- LOG.error("Error in handling event type " + event.getType()
- + " to the TaskScheduler", t);
- // Kill the AM.
- sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
- return;
- } finally {
- notifyForTest();
- }
- }
- }
- };
- this.eventHandlingThread.start();
- }
-
- protected void notifyForTest() {
- }
-
- public void initiateStop() {
- for (int i = 0 ; i < taskSchedulers.length ; i++) {
- taskSchedulers[i].initiateStop();
- }
- }
-
- @Override
- public void serviceStop() throws InterruptedException {
- synchronized(this) {
- this.stopEventHandling = true;
- if (eventHandlingThread != null)
- eventHandlingThread.interrupt();
- }
- for (int i = 0 ; i < taskSchedulers.length ; i++) {
- if (taskSchedulers[i] != null) {
- taskSchedulerServiceWrappers[i].stop();
- }
- }
- LOG.info("Shutting down AppCallbackExecutor");
- appCallbackExecutor.shutdownNow();
- appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
- }
-
- // TaskSchedulerAppCallback methods with schedulerId, where relevant
- public synchronized void taskAllocated(int schedulerId, Object task,
- Object appCookie,
- Container container) {
- AMSchedulerEventTALaunchRequest event =
- (AMSchedulerEventTALaunchRequest) appCookie;
- ContainerId containerId = container.getId();
- if (appContext.getAllContainers()
- .addContainerIfNew(container, schedulerId, event.getLauncherId(),
- event.getTaskCommId())) {
- appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
- sendEvent(new AMNodeEventContainerAllocated(container
- .getNodeId(), schedulerId, container.getId()));
- }
-
-
- TaskAttempt taskAttempt = event.getTaskAttempt();
- // TODO - perhaps check if the task still needs this container
- // because the deallocateTask downcall may have raced with the
- // taskAllocated() upcall
- assert task.equals(taskAttempt);
-
- if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
- sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
- event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
- }
- sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
- sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
- event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
- .getContainerContext().getCredentials(), event.getPriority()));
- }
-
- public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
- // SchedulerId isn't used here since no node updates are sent out
- // Inform the Containers about completion.
- AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
- if (amContainer != null) {
- String message = "Container completed. ";
- TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
- int exitStatus = containerStatus.getExitStatus();
- if (exitStatus == ContainerExitStatus.PREEMPTED) {
- message = "Container preempted externally. ";
- errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
- } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
- message = "Container disk failed. ";
- errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
- } else if (exitStatus != ContainerExitStatus.SUCCESS){
- message = "Container failed, exitCode=" + exitStatus + ". ";
- }
- if (containerStatus.getDiagnostics() != null) {
- message += containerStatus.getDiagnostics();
- }
- sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
- }
- }
-
- public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
- // SchedulerId isn't used here since no node updates are sent out
- AMContainer amContainer = appContext.getAllContainers().get(containerId);
- if (amContainer != null) {
- sendEvent(new AMContainerEventStopRequest(containerId));
- }
- }
-
- @SuppressWarnings("unchecked")
- public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
- for (NodeReport nr : updatedNodes) {
- // Scheduler will find out from the node, if at all.
- // Relying on the RM to not allocate containers on an unhealthy node.
- eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId));
- }
- }
-
- public synchronized void appShutdownRequested(int schedulerId) {
- // This can happen if the RM has been restarted. If it is in that state,
- // this application must clean itself up.
- LOG.info("App shutdown requested by scheduler {}", schedulerId);
- sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
- }
-
- public synchronized void setApplicationRegistrationData(
- int schedulerId,
- Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls,
- ByteBuffer clientAMSecretKey) {
- this.appContext.getClusterInfo().setMaxContainerCapability(
- maxContainerCapability);
- this.appAcls = appAcls;
- this.clientService.setClientAMSecretKey(clientAMSecretKey);
- }
-
- // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
- // TaskScheduler uses a separate thread for it's callbacks. Since this method
- // returns a value which is required, the TaskScheduler wait for the call to
- // complete and can hence lead to a deadlock if called from within a TSEH lock.
- public AppFinalStatus getFinalAppStatus() {
- FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
- StringBuffer sb = new StringBuffer();
- if (dagAppMaster == null) {
- finishState = FinalApplicationStatus.UNDEFINED;
- sb.append("App not yet initialized");
- } else {
- DAGAppMasterState appMasterState = dagAppMaster.getState();
- if (appMasterState == DAGAppMasterState.SUCCEEDED) {
- finishState = FinalApplicationStatus.SUCCEEDED;
- } else if (appMasterState == DAGAppMasterState.KILLED
- || (appMasterState == DAGAppMasterState.RUNNING && isSignalled)) {
- finishState = FinalApplicationStatus.KILLED;
- } else if (appMasterState == DAGAppMasterState.FAILED
- || appMasterState == DAGAppMasterState.ERROR) {
- finishState = FinalApplicationStatus.FAILED;
- } else {
- finishState = FinalApplicationStatus.UNDEFINED;
- }
- List<String> diagnostics = dagAppMaster.getDiagnostics();
- if(diagnostics != null) {
- for (String s : diagnostics) {
- sb.append(s).append("\n");
- }
- }
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Setting job diagnostics to " + sb.toString());
- }
-
- // if history url is set use the same, if historyUrl is set to "" then rm ui disables the
- // history url
- return new AppFinalStatus(finishState, sb.toString(), historyUrl);
- }
-
-
-
- // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
- // TaskScheduler uses a separate thread for it's callbacks. Since this method
- // returns a value which is required, the TaskScheduler wait for the call to
- // complete and can hence lead to a deadlock if called from within a TSEH lock.
- public float getProgress(int schedulerId) {
- // at this point allocate has been called and so node count must be available
- // may change after YARN-1722
- // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
- // node updates from the cluster.
-
- // Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
-
- // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
- int nodeCount = taskSchedulers[0].getClusterNodeCount();
- if (nodeCount != cachedNodeCount) {
- cachedNodeCount = nodeCount;
- sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
- }
- return dagAppMaster.getProgress();
- }
-
- public void onError(int schedulerId, Throwable t) {
- LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
- sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
- }
-
- public void dagCompleted() {
- for (int i = 0 ; i < taskSchedulers.length ; i++) {
- taskSchedulers[i].dagComplete();
- }
- }
-
- public void dagSubmitted() {
- // Nothing to do right now. Indicates that a new DAG has been submitted and
- // the context has updated information.
- }
-
- public void preemptContainer(int schedulerId, ContainerId containerId) {
- // TODO Why is this making a call back into the scheduler, when the call is originating from there.
- // An AMContainer instance should already exist if an attempt is being made to preempt it
- AMContainer amContainer = appContext.getAllContainers().get(containerId);
- taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
- // Inform the Containers about completion.
- sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
- "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
- }
-
- public void setShouldUnregisterFlag() {
- LOG.info("TaskScheduler notified that it should unregister from RM");
- this.shouldUnregisterFlag.set(true);
- for (int i = 0 ; i < taskSchedulers.length ; i++) {
- if (this.taskSchedulers[i] != null) {
- this.taskSchedulers[i].setShouldUnregister();
- }
- }
- }
-
- public ContainerSignatureMatcher getContainerSignatureMatcher() {
- return containerSignatureMatcher;
- }
-
- public boolean hasUnregistered() {
- boolean result = true;
- for (int i = 0 ; i < taskSchedulers.length ; i++) {
- result = result & this.taskSchedulers[i].hasUnregistered();
- if (result == false) {
- return result;
- }
- }
- return result;
- }
-
- @VisibleForTesting
- public String getHistoryUrl() {
- Configuration config = this.appContext.getAMConf();
- String historyUrl = "";
-
- String loggingClass = config.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
- String historyUrlTemplate = config.get(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
- TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT);
- String historyUrlBase = config.get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
-
-
- if (loggingClass.equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
- !historyUrlTemplate.isEmpty() &&
- !historyUrlBase.isEmpty()) {
- // replace the placeholders, while tolerating extra or missing "/" in input. replace all
- // instances of consecutive "/" with single (except for the http(s):// case
- historyUrl = historyUrlTemplate
- .replaceAll(APPLICATION_ID_PLACEHOLDER, appContext.getApplicationID().toString())
- .replaceAll(HISTORY_URL_BASE, historyUrlBase)
- .replaceAll("([^:])/{2,}", "$1/");
-
- // make sure we have a valid scheme
- if (!historyUrl.startsWith("http")) {
- historyUrl = "http://" + historyUrl;
- }
- }
-
- return historyUrl;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
new file mode 100644
index 0000000..29143a2
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -0,0 +1,786 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
+import org.apache.tez.dag.api.client.DAGClientServer;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
+import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
+import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
+import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
+import org.apache.tez.dag.app.rm.container.AMContainerState;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
+import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.app.web.WebUIService;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+
+import com.google.common.base.Preconditions;
+
+
+public class TaskSchedulerManager extends AbstractService implements
+ EventHandler<AMSchedulerEvent> {
+ static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerManager.class);
+
+ static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
+ static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
+
+ protected final AppContext appContext;
+ @SuppressWarnings("rawtypes")
+ private final EventHandler eventHandler;
+ private final String historyUrl;
+ private DAGAppMaster dagAppMaster;
+ private Map<ApplicationAccessType, String> appAcls = null;
+ private Thread eventHandlingThread;
+ private volatile boolean stopEventHandling;
+ // Has a signal (SIGTERM etc) been issued?
+ protected volatile boolean isSignalled = false;
+ final DAGClientServer clientService;
+ private final ContainerSignatureMatcher containerSignatureMatcher;
+ private int cachedNodeCount = -1;
+ private AtomicBoolean shouldUnregisterFlag =
+ new AtomicBoolean(false);
+ private final WebUIService webUI;
+ private final NamedEntityDescriptor[] taskSchedulerDescriptors;
+ protected final TaskScheduler[]taskSchedulers;
+ protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
+
+ // Single executor service shared by all Schedulers for context callbacks
+ @VisibleForTesting
+ final ExecutorService appCallbackExecutor;
+
+ private final boolean isPureLocalMode;
+ // If running in non local-only mode, the YARN task scheduler will always run to take care of
+ // registration with YARN and heartbeats to YARN.
+ // Splitting registration and heartbeats is not straight-forward due to the taskScheduler being
+ // tied to a ContainerRequestType.
+ // Custom AppIds to avoid container conflicts if there's multiple sources
+ private final long SCHEDULER_APP_ID_BASE = 111101111;
+ private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
+
+ BlockingQueue<AMSchedulerEvent> eventQueue
+ = new LinkedBlockingQueue<AMSchedulerEvent>();
+
+ // Not tracking container / task to schedulerId. Instead relying on everything flowing through
+ // the system and being propagated back via events.
+
+ /**
+ *
+ * @param appContext
+ * @param clientService
+ * @param eventHandler
+ * @param containerSignatureMatcher
+ * @param webUI
+ * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
+ * An empty list defaults to using the YarnTaskScheduler as the only source.
+ * @param isPureLocalMode whether the AM is running in local mode
+ */
+ @SuppressWarnings("rawtypes")
+ public TaskSchedulerManager(AppContext appContext,
+ DAGClientServer clientService, EventHandler eventHandler,
+ ContainerSignatureMatcher containerSignatureMatcher,
+ WebUIService webUI,
+ List<NamedEntityDescriptor> schedulerDescriptors,
+ boolean isPureLocalMode) {
+ super(TaskSchedulerManager.class.getName());
+ Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
+ "TaskSchedulerDescriptors must be specified");
+ this.appContext = appContext;
+ this.eventHandler = eventHandler;
+ this.clientService = clientService;
+ this.containerSignatureMatcher = containerSignatureMatcher;
+ this.webUI = webUI;
+ this.historyUrl = getHistoryUrl();
+ this.isPureLocalMode = isPureLocalMode;
+ this.appCallbackExecutor = createAppCallbackExecutorService();
+ if (this.webUI != null) {
+ this.webUI.setHistoryUrl(this.historyUrl);
+ }
+
+ this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]);
+
+ taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length];
+ taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
+ }
+
+ public Map<ApplicationAccessType, String> getApplicationAcls() {
+ return appAcls;
+ }
+
+ public void setSignalled(boolean isSignalled) {
+ this.isSignalled = isSignalled;
+ LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
+ }
+
+ public int getNumClusterNodes() {
+ return cachedNodeCount;
+ }
+
+ public Resource getAvailableResources(int schedulerId) {
+ return taskSchedulers[schedulerId].getAvailableResources();
+ }
+
+ public Resource getTotalResources(int schedulerId) {
+ return taskSchedulers[schedulerId].getTotalResources();
+ }
+
+ private ExecutorService createAppCallbackExecutorService() {
+ return Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
+ .setDaemon(true)
+ .build());
+ }
+
+ public synchronized void handleEvent(AMSchedulerEvent sEvent) {
+ LOG.info("Processing the event " + sEvent.toString());
+ switch (sEvent.getType()) {
+ case S_TA_LAUNCH_REQUEST:
+ handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
+ break;
+ case S_TA_ENDED: // TaskAttempt considered complete.
+ AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
+ switch(event.getState()) {
+ case FAILED:
+ case KILLED:
+ handleTAUnsuccessfulEnd(event);
+ break;
+ case SUCCEEDED:
+ handleTASucceeded(event);
+ break;
+ default:
+ throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
+ }
+ break;
+ case S_CONTAINER_DEALLOCATE:
+ handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
+ break;
+ case S_NODE_UNBLACKLISTED:
+ // fall through
+ case S_NODE_BLACKLISTED:
+ handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
+ break;
+ case S_NODE_UNHEALTHY:
+ break;
+ case S_NODE_HEALTHY:
+ // Consider changing this to work like BLACKLISTING.
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ int qSize = eventQueue.size();
+ if (qSize != 0 && qSize % 1000 == 0) {
+ LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+ }
+ int remCapacity = eventQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue "
+ + "of RMContainerAllocator: " + remCapacity);
+ }
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ eventHandler.handle(event);
+ }
+
+ private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
+ if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
+ taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
+ } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
+ taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
+ } else {
+ throw new TezUncheckedException("Invalid event type: " + event.getType());
+ }
+ }
+
+ private void handleContainerDeallocate(
+ AMSchedulerEventDeallocateContainer event) {
+ ContainerId containerId = event.getContainerId();
+ // TODO what happens to the task that was connected to this container?
+ // current assumption is that it will eventually call handleTaStopRequest
+ //TaskAttempt taskAttempt = (TaskAttempt)
+ taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
+ // TODO does this container need to be stopped via C_STOP_REQUEST
+ sendEvent(new AMContainerEventStopRequest(containerId));
+ }
+
+ private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
+ TaskAttempt attempt = event.getAttempt();
+ // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
+ boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
+ .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
+ // use stored value of container id in case the scheduler has removed this
+ // assignment because the task has been deallocated earlier.
+ // retroactive case
+ ContainerId attemptContainerId = attempt.getAssignedContainerID();
+
+ if(!wasContainerAllocated) {
+ LOG.info("Task: " + attempt.getID() +
+ " has no container assignment in the scheduler");
+ if (attemptContainerId != null) {
+ LOG.error("No container allocated to task: " + attempt.getID()
+ + " according to scheduler. Task reported container id: "
+ + attemptContainerId);
+ }
+ }
+
+ if (attemptContainerId != null) {
+ // TODO either ways send the necessary events
+ // Ask the container to stop.
+ sendEvent(new AMContainerEventStopRequest(attemptContainerId));
+ // Inform the Node - the task has asked to be STOPPED / has already
+ // stopped.
+ // AMNodeImpl blacklisting logic does not account for KILLED attempts.
+ sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
+ get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
+ attemptContainerId,
+ attempt.getID(), event.getState() == TaskAttemptState.FAILED));
+ }
+ }
+
+ private void handleTASucceeded(AMSchedulerEventTAEnded event) {
+ TaskAttempt attempt = event.getAttempt();
+ ContainerId usedContainerId = event.getUsedContainerId();
+
+ // This could be null if a task fails / is killed before a container is
+ // assigned to it.
+ if (event.getUsedContainerId() != null) {
+ sendEvent(new AMContainerEventTASucceeded(usedContainerId,
+ event.getAttemptID()));
+ sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
+ get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
+ event.getAttemptID()));
+ }
+
+ boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
+ true, null, event.getDiagnostics());
+ if (!wasContainerAllocated) {
+ LOG.error("De-allocated successful task: " + attempt.getID()
+ + ", but TaskScheduler reported no container assigned to task");
+ }
+ }
+
+ private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
+ TaskAttempt taskAttempt = event.getTaskAttempt();
+ TaskLocationHint locationHint = event.getLocationHint();
+ String hosts[] = null;
+ String racks[] = null;
+ if (locationHint != null) {
+ TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
+ if (taskAffinity != null) {
+ Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
+ Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity
+ + " for attempt: " + taskAttempt.getID());
+ int taskIndex = taskAffinity.getTaskIndex();
+ Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(),
+ "Invalid taskIndex in task based affinity " + taskAffinity
+ + " for attempt: " + taskAttempt.getID());
+ TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
+ if (affinityAttempt != null) {
+ Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
+ taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
+ event.getCapability(),
+ affinityAttempt.getAssignedContainerID(),
+ Priority.newInstance(event.getPriority()),
+ event.getContainerContext(),
+ event);
+ return;
+ }
+ LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity
+ + " but no locality information exists for it. Ignoring hint.");
+ // fall through with null hosts/racks
+ } else {
+ hosts = (locationHint.getHosts() != null) ? locationHint
+ .getHosts().toArray(
+ new String[locationHint.getHosts().size()]) : null;
+ racks = (locationHint.getRacks() != null) ? locationHint.getRacks()
+ .toArray(new String[locationHint.getRacks().size()]) : null;
+ }
+ }
+
+ taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
+ event.getCapability(),
+ hosts,
+ racks,
+ Priority.newInstance(event.getPriority()),
+ event.getContainerContext(),
+ event);
+ }
+
+ @VisibleForTesting
+ TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
+ AppContext appContext,
+ NamedEntityDescriptor taskSchedulerDescriptor,
+ long customAppIdIdentifier,
+ int schedulerId) {
+ TaskSchedulerContext rawContext =
+ new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
+ customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
+ TaskSchedulerContext wrappedContext = wrapTaskSchedulerContext(rawContext);
+ String schedulerName = taskSchedulerDescriptor.getEntityName();
+ if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
+ return createYarnTaskScheduler(wrappedContext, schedulerId);
+ } else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
+ return createUberTaskScheduler(wrappedContext, schedulerId);
+ } else {
+ return createCustomTaskScheduler(wrappedContext, taskSchedulerDescriptor, schedulerId);
+ }
+ }
+
+ @VisibleForTesting
+ TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
+ return new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
+ }
+
+ @VisibleForTesting
+ TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext,
+ int schedulerId) {
+ LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
+ return new YarnTaskSchedulerService(taskSchedulerContext);
+ }
+
+ @VisibleForTesting
+ TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext,
+ int schedulerId) {
+ LOG.info("Creating TaskScheduler: Local TaskScheduler");
+ return new LocalTaskSchedulerService(taskSchedulerContext);
+ }
+
+ @SuppressWarnings("unchecked")
+ TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
+ NamedEntityDescriptor taskSchedulerDescriptor,
+ int schedulerId) {
+ LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(),
+ taskSchedulerDescriptor.getClassName());
+ Class<? extends TaskScheduler> taskSchedulerClazz =
+ (Class<? extends TaskScheduler>) ReflectionUtils
+ .getClazz(taskSchedulerDescriptor.getClassName());
+ try {
+ Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
+ .getConstructor(TaskSchedulerContext.class);
+ return ctor.newInstance(taskSchedulerContext);
+ } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @VisibleForTesting
+ protected void instantiateSchedulers(String host, int port, String trackingUrl,
+ AppContext appContext) {
+ // Iterate over the list and create all the taskSchedulers
+ int j = 0;
+ for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
+ long customAppIdIdentifier;
+ if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals(
+ TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
+ customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
+ } else {
+ customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
+ }
+ LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerDescriptors[i].getEntityName() + "]=" +
+ customAppIdIdentifier);
+ taskSchedulers[i] = createTaskScheduler(host, port,
+ trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
+ taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);
+ }
+ }
+
+
+ @Override
+ public synchronized void serviceStart() {
+ InetSocketAddress serviceAddr = clientService.getBindAddress();
+ dagAppMaster = appContext.getAppMaster();
+ // if web service is enabled then set tracking url. else disable it (value = "").
+ // the actual url set on the rm web ui will be the proxy url set by WebAppProxyServlet, which
+ // always try to connect to AM and proxy the response. hence it wont work if the webUIService
+ // is not enabled.
+ String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
+ instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
+
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ taskSchedulerServiceWrappers[i].init(getConfig());
+ taskSchedulerServiceWrappers[i].start();
+ if (shouldUnregisterFlag.get()) {
+ // Flag may have been set earlier when task scheduler was not initialized
+ // External services could need to talk to some other entity.
+ taskSchedulers[i].setShouldUnregister();
+ }
+ }
+
+ this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
+ @Override
+ public void run() {
+
+ AMSchedulerEvent event;
+
+ while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ try {
+ if (TaskSchedulerManager.this.eventQueue.peek() == null) {
+ notifyForTest();
+ }
+ event = TaskSchedulerManager.this.eventQueue.take();
+ } catch (InterruptedException e) {
+ if(!stopEventHandling) {
+ LOG.warn("Continuing after interrupt : ", e);
+ }
+ continue;
+ }
+
+ try {
+ handleEvent(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " to the TaskScheduler", t);
+ // Kill the AM.
+ sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
+ return;
+ } finally {
+ notifyForTest();
+ }
+ }
+ }
+ };
+ this.eventHandlingThread.start();
+ }
+
+ protected void notifyForTest() {
+ }
+
+ public void initiateStop() {
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ taskSchedulers[i].initiateStop();
+ }
+ }
+
+ @Override
+ public void serviceStop() throws InterruptedException {
+ synchronized(this) {
+ this.stopEventHandling = true;
+ if (eventHandlingThread != null)
+ eventHandlingThread.interrupt();
+ }
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ if (taskSchedulers[i] != null) {
+ taskSchedulerServiceWrappers[i].stop();
+ }
+ }
+ LOG.info("Shutting down AppCallbackExecutor");
+ appCallbackExecutor.shutdownNow();
+ appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
+ }
+
+ // TaskSchedulerAppCallback methods with schedulerId, where relevant
+ public synchronized void taskAllocated(int schedulerId, Object task,
+ Object appCookie,
+ Container container) {
+ AMSchedulerEventTALaunchRequest event =
+ (AMSchedulerEventTALaunchRequest) appCookie;
+ ContainerId containerId = container.getId();
+ if (appContext.getAllContainers()
+ .addContainerIfNew(container, schedulerId, event.getLauncherId(),
+ event.getTaskCommId())) {
+ appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
+ sendEvent(new AMNodeEventContainerAllocated(container
+ .getNodeId(), schedulerId, container.getId()));
+ }
+
+
+ TaskAttempt taskAttempt = event.getTaskAttempt();
+ // TODO - perhaps check if the task still needs this container
+ // because the deallocateTask downcall may have raced with the
+ // taskAllocated() upcall
+ assert task.equals(taskAttempt);
+
+ if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
+ sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
+ event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
+ }
+ sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
+ sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
+ event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
+ .getContainerContext().getCredentials(), event.getPriority()));
+ }
+
+ public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
+ // SchedulerId isn't used here since no node updates are sent out
+ // Inform the Containers about completion.
+ AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
+ if (amContainer != null) {
+ String message = "Container completed. ";
+ TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
+ int exitStatus = containerStatus.getExitStatus();
+ if (exitStatus == ContainerExitStatus.PREEMPTED) {
+ message = "Container preempted externally. ";
+ errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
+ } else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
+ message = "Container disk failed. ";
+ errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
+ } else if (exitStatus != ContainerExitStatus.SUCCESS){
+ message = "Container failed, exitCode=" + exitStatus + ". ";
+ }
+ if (containerStatus.getDiagnostics() != null) {
+ message += containerStatus.getDiagnostics();
+ }
+ sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
+ }
+ }
+
+ public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
+ // SchedulerId isn't used here since no node updates are sent out
+ AMContainer amContainer = appContext.getAllContainers().get(containerId);
+ if (amContainer != null) {
+ sendEvent(new AMContainerEventStopRequest(containerId));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
+ for (NodeReport nr : updatedNodes) {
+ // Scheduler will find out from the node, if at all.
+ // Relying on the RM to not allocate containers on an unhealthy node.
+ eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId));
+ }
+ }
+
+ public synchronized void appShutdownRequested(int schedulerId) {
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ LOG.info("App shutdown requested by scheduler {}", schedulerId);
+ sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
+ }
+
+ public synchronized void setApplicationRegistrationData(
+ int schedulerId,
+ Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer clientAMSecretKey) {
+ this.appContext.getClusterInfo().setMaxContainerCapability(
+ maxContainerCapability);
+ this.appAcls = appAcls;
+ this.clientService.setClientAMSecretKey(clientAMSecretKey);
+ }
+
+ // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
+ // TaskScheduler uses a separate thread for it's callbacks. Since this method
+ // returns a value which is required, the TaskScheduler wait for the call to
+ // complete and can hence lead to a deadlock if called from within a TSEH lock.
+ public AppFinalStatus getFinalAppStatus() {
+ FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+ StringBuffer sb = new StringBuffer();
+ if (dagAppMaster == null) {
+ finishState = FinalApplicationStatus.UNDEFINED;
+ sb.append("App not yet initialized");
+ } else {
+ DAGAppMasterState appMasterState = dagAppMaster.getState();
+ if (appMasterState == DAGAppMasterState.SUCCEEDED) {
+ finishState = FinalApplicationStatus.SUCCEEDED;
+ } else if (appMasterState == DAGAppMasterState.KILLED
+ || (appMasterState == DAGAppMasterState.RUNNING && isSignalled)) {
+ finishState = FinalApplicationStatus.KILLED;
+ } else if (appMasterState == DAGAppMasterState.FAILED
+ || appMasterState == DAGAppMasterState.ERROR) {
+ finishState = FinalApplicationStatus.FAILED;
+ } else {
+ finishState = FinalApplicationStatus.UNDEFINED;
+ }
+ List<String> diagnostics = dagAppMaster.getDiagnostics();
+ if(diagnostics != null) {
+ for (String s : diagnostics) {
+ sb.append(s).append("\n");
+ }
+ }
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Setting job diagnostics to " + sb.toString());
+ }
+
+ // if history url is set use the same, if historyUrl is set to "" then rm ui disables the
+ // history url
+ return new AppFinalStatus(finishState, sb.toString(), historyUrl);
+ }
+
+
+
+ // Not synchronized to avoid deadlocks from TaskScheduler callbacks.
+ // TaskScheduler uses a separate thread for it's callbacks. Since this method
+ // returns a value which is required, the TaskScheduler wait for the call to
+ // complete and can hence lead to a deadlock if called from within a TSEH lock.
+ public float getProgress(int schedulerId) {
+ // at this point allocate has been called and so node count must be available
+ // may change after YARN-1722
+ // This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
+ // node updates from the cluster.
+
+ // Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
+
+ // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
+ int nodeCount = taskSchedulers[0].getClusterNodeCount();
+ if (nodeCount != cachedNodeCount) {
+ cachedNodeCount = nodeCount;
+ sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
+ }
+ return dagAppMaster.getProgress();
+ }
+
+ public void onError(int schedulerId, Throwable t) {
+ LOG.info("Error reported by scheduler {} - {}", schedulerId, t);
+ sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
+ }
+
+ public void dagCompleted() {
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ taskSchedulers[i].dagComplete();
+ }
+ }
+
+ public void dagSubmitted() {
+ // Nothing to do right now. Indicates that a new DAG has been submitted and
+ // the context has updated information.
+ }
+
+ public void preemptContainer(int schedulerId, ContainerId containerId) {
+ // TODO Why is this making a call back into the scheduler, when the call is originating from there.
+ // An AMContainer instance should already exist if an attempt is being made to preempt it
+ AMContainer amContainer = appContext.getAllContainers().get(containerId);
+ taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
+ // Inform the Containers about completion.
+ sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
+ "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
+ }
+
+ public void setShouldUnregisterFlag() {
+ LOG.info("TaskScheduler notified that it should unregister from RM");
+ this.shouldUnregisterFlag.set(true);
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ if (this.taskSchedulers[i] != null) {
+ this.taskSchedulers[i].setShouldUnregister();
+ }
+ }
+ }
+
+ public ContainerSignatureMatcher getContainerSignatureMatcher() {
+ return containerSignatureMatcher;
+ }
+
+ public boolean hasUnregistered() {
+ boolean result = true;
+ for (int i = 0 ; i < taskSchedulers.length ; i++) {
+ result = result & this.taskSchedulers[i].hasUnregistered();
+ if (result == false) {
+ return result;
+ }
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ public String getHistoryUrl() {
+ Configuration config = this.appContext.getAMConf();
+ String historyUrl = "";
+
+ String loggingClass = config.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "");
+ String historyUrlTemplate = config.get(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
+ TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT);
+ String historyUrlBase = config.get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
+
+
+ if (loggingClass.equals("org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService") &&
+ !historyUrlTemplate.isEmpty() &&
+ !historyUrlBase.isEmpty()) {
+ // replace the placeholders, while tolerating extra or missing "/" in input. replace all
+ // instances of consecutive "/" with single (except for the http(s):// case
+ historyUrl = historyUrlTemplate
+ .replaceAll(APPLICATION_ID_PLACEHOLDER, appContext.getApplicationID().toString())
+ .replaceAll(HISTORY_URL_BASE, historyUrlBase)
+ .replaceAll("([^:])/{2,}", "$1/");
+
+ // make sure we have a valid scheme
+ if (!historyUrl.startsWith("http")) {
+ historyUrl = "http://" + historyUrl;
+ }
+ }
+
+ return historyUrl;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 99cec2b..69c21d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -50,15 +50,15 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
import org.apache.tez.dag.app.rm.AMSchedulerEventDeallocateContainer;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
+import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.ContainerLauncherStopRequestEvent;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
@@ -81,7 +81,7 @@ public class AMContainerImpl implements AMContainer {
private final Container container;
private final AppContext appContext;
private final ContainerHeartbeatHandler containerHeartbeatHandler;
- private final TaskAttemptListener taskAttemptListener;
+ private final TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
protected final EventHandler eventHandler;
private final ContainerSignatureMatcher signatureMatcher;
private final int schedulerId;
@@ -308,7 +308,7 @@ public class AMContainerImpl implements AMContainer {
// Attempting to use a container based purely on reosurces required, etc needs
// additional change - JvmID, YarnChild, etc depend on TaskType.
public AMContainerImpl(Container container, ContainerHeartbeatHandler chh,
- TaskAttemptListener tal, ContainerSignatureMatcher signatureMatcher,
+ TaskCommunicatorManagerInterface tal, ContainerSignatureMatcher signatureMatcher,
AppContext appContext, int schedulerId, int launcherId, int taskCommId) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
@@ -319,7 +319,7 @@ public class AMContainerImpl implements AMContainer {
this.signatureMatcher = signatureMatcher;
this.appContext = appContext;
this.containerHeartbeatHandler = chh;
- this.taskAttemptListener = tal;
+ this.taskCommunicatorManagerInterface = tal;
this.failedAssignments = new LinkedList<TezTaskAttemptID>();
this.schedulerId = schedulerId;
this.launcherId = launcherId;
@@ -466,7 +466,7 @@ public class AMContainerImpl implements AMContainer {
containerContext.getLocalResources(),
containerContext.getEnvironment(),
containerContext.getJavaOpts(),
- container.taskAttemptListener.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
+ container.taskCommunicatorManagerInterface.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(),
container.appContext, container.container.getResource(),
container.appContext.getAMConf());
@@ -1095,28 +1095,28 @@ public class AMContainerImpl implements AMContainer {
}
protected void sendStartRequestToNM(ContainerLaunchContext clc) {
- sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, schedulerId, taskCommId));
+ sendEvent(new ContainerLauncherLaunchRequestEvent(clc, container, launcherId, schedulerId, taskCommId));
}
protected void sendStopRequestToNM() {
- sendEvent(new NMCommunicatorStopRequestEvent(containerId,
+ sendEvent(new ContainerLauncherStopRequestEvent(containerId,
container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId));
}
protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason, String diagnostics) {
- taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics);
+ taskCommunicatorManagerInterface.unregisterTaskAttempt(attemptId, taskCommId, endReason, diagnostics);
}
protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
- taskAttemptListener.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
+ taskCommunicatorManagerInterface.registerTaskAttempt(amContainerTask, this.containerId, taskCommId);
}
protected void registerWithTAListener() {
- taskAttemptListener.registerRunningContainer(containerId, taskCommId);
+ taskCommunicatorManagerInterface.registerRunningContainer(containerId, taskCommId);
}
protected void unregisterFromTAListener(ContainerEndReason endReason, String diagnostics) {
- this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason, diagnostics);
+ this.taskCommunicatorManagerInterface.unregisterRunningContainer(containerId, taskCommId, endReason, diagnostics);
}
protected void registerWithContainerListener() {
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index fcb9eaf..ab43db1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@ -31,19 +31,19 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
public class AMContainerMap extends AbstractService implements EventHandler<AMContainerEvent> {
private static final Logger LOG = LoggerFactory.getLogger(AMContainerMap.class);
private final ContainerHeartbeatHandler chh;
- private final TaskAttemptListener tal;
+ private final TaskCommunicatorManagerInterface tal;
private final AppContext context;
private final ContainerSignatureMatcher containerSignatureMatcher;
private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
- public AMContainerMap(ContainerHeartbeatHandler chh, TaskAttemptListener tal,
+ public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface tal,
ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
super("AMContainerMaps");
this.chh = chh;
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index b09eb86..3cab2da 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -60,8 +60,8 @@ import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.launcher.ContainerLauncherManager;
+import org.apache.tez.dag.app.rm.ContainerLauncherEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -122,13 +122,13 @@ public class MockDAGAppMaster extends DAGAppMaster {
// It can be used to preempt the container for a given task
public class MockContainerLauncher extends ContainerLauncher implements Runnable {
- BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>();
+ BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>();
Thread eventHandlingThread;
ListeningExecutorService executorService;
Map<ContainerId, ContainerData> containers = Maps.newConcurrentMap();
ArrayBlockingQueue<Worker> workers;
- TaskAttemptListenerImpTezDag taListener;
+ TaskCommunicatorManager taskCommunicatorManager;
TezTaskCommunicatorImpl taskCommunicator;
AtomicBoolean startScheduling = new AtomicBoolean(true);
@@ -187,8 +187,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
@Override
public void start() throws Exception {
- taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
- taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0);
+ taskCommunicatorManager = (TaskCommunicatorManager) getTaskCommunicatorManager();
+ taskCommunicator = (TezTaskCommunicatorImpl) taskCommunicatorManager.getTaskCommunicator(0);
eventHandlingThread = new Thread(this);
eventHandlingThread.start();
ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency,
@@ -256,7 +256,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
}
public void preemptContainer(ContainerData cData) {
- getTaskSchedulerEventHandler().containerCompleted(0, null,
+ getTaskSchedulerManager().containerCompleted(0, null,
ContainerStatus.newInstance(cData.cId, null, "Preempted", ContainerExitStatus.PREEMPTED));
cData.clear();
}
@@ -495,7 +495,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
throw new TezUncheckedException(e);
}
containerLauncherContext =
- new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener(), userPayload);
+ new ContainerLauncherContextImpl(getContext(), getTaskCommunicatorManager(), userPayload);
containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
shutdownHandler = new MockDAGAppMasterShutdownHandler();
this.initFailFlag = initFailFlag;
@@ -507,10 +507,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
// use mock container launcher for tests
@Override
- protected ContainerLauncherRouter createContainerLauncherRouter(List<NamedEntityDescriptor> containerLauncherDescirptors,
- boolean isLocal)
+ protected ContainerLauncherManager createContainerLauncherManager(
+ List<NamedEntityDescriptor> containerLauncherDescirptors,
+ boolean isLocal)
throws UnknownHostException {
- return new ContainerLauncherRouter(containerLauncher, getContext());
+ return new ContainerLauncherManager(containerLauncher, getContext());
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 7584b4c..b0bc571 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -200,7 +200,7 @@ public class TestMockDAGAppMaster {
mockLauncher.waitTillContainersLaunched();
ContainerData cData = mockLauncher.getContainers().values().iterator().next();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
- mockApp.getTaskSchedulerEventHandler().preemptContainer(0, cData.cId);
+ mockApp.getTaskSchedulerManager().preemptContainer(0, cData.cId);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();