You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [25/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,1555 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventCounterUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventDiagnosticsUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.NotRunningJob;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventTASucceeded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventTaskAttemptEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+/**
+ * Allocates the container from the ResourceManager scheduler.
+ */
+public class RMContainerAllocator extends AbstractService
+ implements ContainerAllocator {
+
+ static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+
+ public static final
+ float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+
+ public static final Priority PRIORITY_FAST_FAIL_MAP;
+ public static final Priority PRIORITY_REDUCE;
+ public static final Priority PRIORITY_MAP;
+
+ private Thread eventHandlingThread;
+ private volatile boolean stopEventHandling;
+
+ static {
+ PRIORITY_FAST_FAIL_MAP = BuilderUtils.newPriority(5);
+ PRIORITY_REDUCE = BuilderUtils.newPriority(10);
+ PRIORITY_MAP = BuilderUtils.newPriority(20);
+ }
+
+ protected final AppContext appContext;
+ protected final Clock clock;
+ protected Job job = null;
+ protected final JobId jobId;
+ private final ContainerRequestor requestor;
+ @SuppressWarnings("rawtypes")
+ private final EventHandler eventHandler;
+ private final AMContainerMap containerMap;
+
+ // Run the scheduler if it hasn't run for this interval.
+ private long scheduleInterval = 1000l;
+
+ Timer scheduleTimer;
+ ScheduleTimerTask scheduleTimerTask;
+ private long lastScheduleTime = 0l;
+ private int lastCompletedTasks = 0;
+ private int completedMaps = 0;
+ private int completedReduces = 0;
+
+ /*
+ Vocabulary Used:
+ pending -> requests which are NOT yet sent to RM
+ scheduled -> requests which are sent to RM but not yet assigned
+ assigned -> requests which are assigned to a container
+ completed -> request corresponding to which container has completed
+
+ Lifecycle of map
+ scheduled->assigned->completed
+
+ Lifecycle of reduce
+ pending->scheduled->assigned->completed
+
+ Maps are scheduled as soon as their requests are received. Reduces are
+ added to the pending and are ramped up (added to scheduled) based
+ on completed maps and current availability in the cluster.
+ */
+
+ //reduces which are not yet scheduled
+ private final LinkedList<ContainerRequestInfo> pendingReduces =
+ new LinkedList<ContainerRequestInfo>();
+
+ // holds information about the assigned containers to task attempts.
+ // Effectively the currently running tasks.
+ private final AssignedRequests assignedRequests = new AssignedRequests();
+
+ //holds scheduled requests to be fulfilled by RM
+ private final ScheduledRequests scheduledRequests = new ScheduledRequests();
+
+ // Populated whenever a new container is available - effectively from the RM.
+ private LinkedHashSet<ContainerId> availableUnlaunchedContainerIds =
+ new LinkedHashSet<ContainerId>();
+ // Populated whenever an already running container becomes available for use.
+ private LinkedHashSet<ContainerId> availableLaunchedContainerIds =
+ new LinkedHashSet<ContainerId>();
+
+ // List of pre-allocated containerIds.
+ private Set<ContainerId> preAllocatedAvailableContainerIds =
+ Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+ // List of pre-allocated containerIds which have been seen, and moved over to
+ // the available containerId pool.
+ private Set<ContainerId> preAllocatedSeenContainerIds =
+ Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+
+ private final Map<TaskAttemptId, AMSchedulerTALaunchRequestEvent>
+ attemptToLaunchRequestMap = new HashMap<TaskAttemptId, AMSchedulerTALaunchRequestEvent>();
+
+ private int containersAllocated = 0;
+ private int newContainerAllocations = 0;
+ private int existingContainerAllocations = 0;
+ private int containersReleased = 0;
+ private int hostLocalAssigned = 0;
+ private int rackLocalAssigned = 0;
+
+ private boolean recalculateReduceSchedule = false;
+ private int mapResourceReqt;//memory
+ private int reduceResourceReqt;//memory
+
+ private boolean reduceStarted = false;
+ private float maxReduceRampupLimit = 0;
+ private float maxReducePreemptionLimit = 0;
+ private float reduceSlowStart = 0;
+
+ private boolean shouldReUse;
+ private int maxAttemptsPerContainer;
+
+ BlockingQueue<AMSchedulerEvent> eventQueue
+ = new LinkedBlockingQueue<AMSchedulerEvent>();
+
+ public RMContainerAllocator(ContainerRequestor requestor,
+ AppContext appContext) {
+ super("RMContainerAllocator");
+ this.requestor = requestor;
+ this.appContext = appContext;
+ this.clock = appContext.getClock();
+ this.eventHandler = appContext.getEventHandler();
+ ApplicationId appId = appContext.getApplicationID();
+ // JobId should not be required here.
+ // Currently used for error notification, clc construction, etc. Should not
+ // be
+ JobID id = TypeConverter.fromYarn(appId);
+ JobId jobId = TypeConverter.toYarn(id);
+ this.jobId = jobId;
+
+ this.containerMap = appContext.getAllContainers();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ reduceSlowStart = conf.getFloat(
+ MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
+ DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);
+ maxReduceRampupLimit = conf.getFloat(
+ MRJobConfig.MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT,
+ MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT);
+ maxReducePreemptionLimit = conf.getFloat(
+ MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
+ MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
+ scheduleInterval = conf.getLong(
+ MRJobConfig.MR_AM_SCHEDULER_INTERVAL,
+ MRJobConfig.DEFAULT_MR_AM_SCHEDULER_INTERVAL);
+ shouldReUse =
+ conf.getBoolean(MRJobConfig.MR_AM_SCHEDULER_REUSE_ENABLE,
+ MRJobConfig.DEFAULT_MR_AM_SCHEDULER_REUSE_ENABLE);
+ maxAttemptsPerContainer =
+ conf.getInt(
+ MRJobConfig.MR_AM_SCHEDULER_REUSE_MAX_ATTEMPTS_PER_CONTAINER,
+ MRJobConfig.DEFAULT_MR_AM_SCHEDULER_REUSE_MAX_ATTEMPTS_PER_CONTAINER);
+ if (!shouldReUse) { // For the next log message.
+ maxAttemptsPerContainer = 1;
+ } else {
+ if (maxAttemptsPerContainer == 0 || maxAttemptsPerContainer < -1) {
+ throw new YarnException("Invlaid configuration: "
+ + MRJobConfig.MR_AM_SCHEDULER_REUSE_MAX_ATTEMPTS_PER_CONTAINER
+ + " cannot be " + maxAttemptsPerContainer);
+ }
+ }
+ LOG.info("AMSchedulerConfiguration: " + "ReUseEnabled: " + shouldReUse
+ + ", maxAttemptsPerContainer: " + maxAttemptsPerContainer
+ + ", reduceSlowStart: " + reduceSlowStart + ", maxReduceRampupLimit: "
+ + maxReduceRampupLimit + ", maxReducePreemptionLimit: "
+ + maxReducePreemptionLimit + ", scheduleThreadInterval: "
+ + scheduleInterval + " ms");
+ RackResolver.init(conf);
+ }
+
+ @Override
+ public void start() {
+ this.eventHandlingThread = new Thread() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+
+ AMSchedulerEvent event;
+
+ while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+ try {
+ event = RMContainerAllocator.this.eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ return;
+ }
+
+ try {
+ handleEvent(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " to the ContainreAllocator", t);
+ // Kill the AM.
+ eventHandler.handle(new JobEvent(getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ return;
+ }
+ }
+ }
+ };
+ this.eventHandlingThread.start();
+
+ scheduleTimer = new Timer("AMSchedulerTimer", true);
+ scheduleTimerTask = new ScheduleTimerTask();
+ scheduleTimer.scheduleAtFixedRate(scheduleTimerTask, scheduleInterval,
+ scheduleInterval);
+ this.job = getJob();
+
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ this.stopEventHandling = true;
+ if (eventHandlingThread != null)
+ eventHandlingThread.interrupt();
+ if (scheduleTimerTask != null) {
+ scheduleTimerTask.stop();
+ }
+ super.stop();
+ LOG.info("Final Scheduler Stats: " + getStat());
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(Event<?> event) {
+ eventHandler.handle(event);
+ }
+
+ protected Job getJob() {
+ if (this.job == null) {
+ this.job = appContext.getJob(jobId);
+ }
+ if (this.job instanceof NotRunningJob) {
+ this.job = null;
+ }
+ return this.job;
+ }
+
+ private class ScheduleTimerTask extends TimerTask {
+ private volatile boolean shouldRun = true;
+
+ @Override
+ public void run() {
+ if (clock.getTime() - lastScheduleTime > scheduleInterval && shouldRun) {
+ handle(new AMSchedulerEventContainersAllocated(
+ Collections.<ContainerId> emptyList(), false));
+ }
+ }
+
+ public void stop() {
+ shouldRun = false;
+ this.cancel();
+ }
+ }
+
+ public boolean getIsReduceStarted() {
+ return reduceStarted;
+ }
+
+ public void setIsReduceStarted(boolean reduceStarted) {
+ this.reduceStarted = reduceStarted;
+ }
+
+ @Override
+ public void handle(AMSchedulerEvent event) {
+ int qSize = eventQueue.size();
+ if (qSize != 0 && qSize % 1000 == 0) {
+ LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+ }
+ int remCapacity = eventQueue.remainingCapacity();
+ if (remCapacity < 1000) {
+ LOG.warn("Very low remaining capacity in the event-queue "
+ + "of RMContainerAllocator: " + remCapacity);
+ }
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ // Synchronization required for now. At some point, change this to make
+ // individual sections synchronized. Adding to the available container list is
+ // one bit which will need to considered.
+ protected synchronized void handleEvent(AMSchedulerEvent sEvent) {
+ LOG.info("XXX: Processing the event " + sEvent.toString());
+ switch (sEvent.getType()) {
+ case S_TA_LAUNCH_REQUEST:
+ recalculateReduceSchedule = true;
+ handleTaLaunchRequest((AMSchedulerTALaunchRequestEvent) sEvent);
+ break;
+ case S_TA_ENDED: // TaskAttempt considered complete.
+ recalculateReduceSchedule = true;
+ AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
+ switch(event.getState()) {
+ case FAILED:
+ case KILLED:
+ handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent);
+ break;
+ case SUCCEEDED:
+ handleTASucceeded(event);
+ break;
+ default:
+ throw new YarnException("Unexecpted TA_ENDED state: " + event.getState());
+ }
+ break;
+ case S_CONTAINERS_ALLOCATED:
+ // Conditional recalculateReduceSchedule
+ handleContainersAllocated((AMSchedulerEventContainersAllocated) sEvent);
+ break;
+ case S_CONTAINER_COMPLETED:
+ // Remove if present in availableContainerList. Container stop can be
+ // from other sources.
+ handleContainerCompleted((AMSchedulerEventContainerCompleted) sEvent);
+ recalculateReduceSchedule = true;
+ break;
+ case S_NODE_BLACKLISTED:
+ handleNodeBlacklisted((AMSchedulerEventNodeBlacklisted) sEvent);
+ break;
+ case S_NODE_UNHEALTHY:
+ // Ignore. RM will not allocated containers on this node.
+ // Consider changing this to work like BLACKLISTING.
+ break;
+ case S_NODE_HEALTHY:
+ // Ignore. RM will start allocating containers if there's pending
+ // requests.
+ break;
+ }
+ }
+
+ protected void handleTaLaunchRequest(AMSchedulerTALaunchRequestEvent event) {
+ // Add to queue of pending tasks.
+ recalculateReduceSchedule = true;
+ attemptToLaunchRequestMap.put(event.getAttemptID(), event);
+ if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+ mapResourceReqt = maybeComputeNormalizedRequestForType(event,
+ TaskType.MAP, mapResourceReqt);
+ event.getCapability().setMemory(mapResourceReqt);
+ scheduledRequests.addMap(event);
+ } else { // Reduce
+ reduceResourceReqt = maybeComputeNormalizedRequestForType(event,
+ TaskType.REDUCE, reduceResourceReqt);
+ event.getCapability().setMemory(reduceResourceReqt);
+ if (event.isRescheduled()) {
+ pendingReduces.addFirst(new ContainerRequestInfo(new ContainerRequest(
+ event.getCapability(), event.getHosts(), event.getRacks(),
+ PRIORITY_REDUCE), event));
+ } else {
+ pendingReduces.addLast(new ContainerRequestInfo(new ContainerRequest(
+ event.getCapability(), event.getHosts(), event.getRacks(),
+ PRIORITY_REDUCE), event));
+ }
+ }
+ }
+
+ protected void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
+ TaskAttemptId aId = event.getAttemptID();
+ attemptToLaunchRequestMap.remove(aId);
+ boolean removed = pendingReduces.remove(aId);
+ if (!removed) {
+ // likely a pre-empted reduce from within the scheduler itself ?
+ removed = scheduledRequests.remove(aId);
+ if (!removed) {
+ // Maybe assigned.
+ ContainerId containerId = assignedRequests.remove(aId);
+ if (containerId == null) {
+ // Failure after Success. Pull the ContainerId from the TaskAttempt.
+ containerId = event.getUsedContainerId();
+ }
+ if (containerId != null) {
+ // Ask the container to stop.
+ sendEvent(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ // Inform the Node - the task has asked to be STOPPED / has already
+ // stopped.
+ sendEvent(new AMNodeEventTaskAttemptEnded(containerMap
+ .get(containerId).getContainer().getNodeId(), containerId,
+ event.getAttemptID(), event.getState() == TaskAttemptState.FAILED));
+ } else {
+ LOG.warn("Received a STOP request for unknown taskAttempt: "
+ + event.getAttemptID() + ". Event: " + event);
+ // This could be generated in case of recovery, with unhealthy nodes/
+ // fetch failures. Can be ignored, since Recovered containers don't
+ // need to be stopped.
+ }
+ }
+ }
+ }
+
+ protected void handleTASucceeded(AMSchedulerEventTAEnded event) {
+ attemptToLaunchRequestMap.remove(event.getAttemptID());
+ ContainerId containerId = assignedRequests.remove(event.getAttemptID());
+ if (containerId != null) {
+ sendEvent(new AMContainerEventTASucceeded(containerId,
+ event.getAttemptID()));
+ sendEvent(new AMNodeEventTaskAttemptSucceeded(containerMap
+ .get(containerId).getContainer().getNodeId(), containerId,
+ event.getAttemptID()));
+ containerAvailable(containerId);
+ } else {
+ LOG.warn("Received TaskAttemptSucceededEvent for unmapped TaskAttempt: "
+ + event.getAttemptID() + ". Event: " + event);
+ }
+ }
+
+ protected void handleContainersAllocated(
+ AMSchedulerEventContainersAllocated event) {
+
+ Job job = getJob();
+ if (job != null) {
+ completedMaps = getJob().getCompletedMaps();
+ completedReduces = getJob().getCompletedReduces();
+ // Not considering such containers for the pre-allocated pool since this
+ // set would have come in from the RMCommunicator.
+ availableUnlaunchedContainerIds.addAll(event.getContainerIds());
+ } else {
+ completedMaps = 0;
+ completedReduces = 0;
+ preAllocatedAvailableContainerIds.addAll(event.getContainerIds());
+ for (ContainerId cId : event.getContainerIds()) {
+ LOG.info("Got pre-allocated container: " + cId);
+ }
+ }
+ int completedTasks = completedMaps + completedReduces;
+
+ if (lastCompletedTasks != completedTasks) {
+ recalculateReduceSchedule = true;
+ lastCompletedTasks = completedTasks;
+ }
+
+ if (event.didHeadroomChange() || event.getContainerIds().size() > 0) {
+ recalculateReduceSchedule = true;
+ }
+ schedule();
+ }
+
+ protected void handleContainerCompleted(
+ AMSchedulerEventContainerCompleted event) {
+ availableUnlaunchedContainerIds.remove(event.getContainerId());
+ availableLaunchedContainerIds.remove(event.getContainerId());
+ preAllocatedAvailableContainerIds.remove(event.getContainerId());
+ }
+
+ // TODO Add a test later if TestRMContainerAllocator does not have one for
+ // blacklisting.
+ protected void handleNodeBlacklisted(AMSchedulerEventNodeBlacklisted event) {
+ NodeId nodeId = event.getNodeId();
+ String host = nodeId.getHost();
+ // Only maps would have asked for containers on a specific node.
+ List<TaskAttemptId> affectedAttemptIds = scheduledRequests.mapsHostMapping.get(host);
+ for (TaskAttemptId taId : affectedAttemptIds) {
+ ContainerRequestInfo cr = scheduledRequests.maps.get(taId);
+ scheduledRequests.remove(taId);
+ scheduledRequests.addMap(cr.launchRequestEvent);
+ }
+ // Instead of removing / re-adding each individual request, it may be more
+ // efficient to modify internal data structures, and send a request to the
+ // RMComm to completely forget about a host.
+ }
+
+ // TODO Override for re-use.
+ protected synchronized void assignContainers() {
+ if (LOG.isDebugEnabled()) {
+ for (ContainerId cId : preAllocatedAvailableContainerIds) {
+ LOG.debug("Pre-allocated and available container: " + cId);
+ }
+ for (ContainerId cId : preAllocatedSeenContainerIds) {
+ LOG.debug("Pre-allocated and seen container: " + cId);
+ }
+ for (ContainerId cId : availableUnlaunchedContainerIds) {
+ LOG.debug("Available container: " + cId);
+ }
+ for (ContainerId cId : availableLaunchedContainerIds) {
+ LOG.debug("Available launched container: " + cId);
+ }
+ }
+ LinkedHashSet<ContainerId> allAvailableContainers =
+ new LinkedHashSet<ContainerId>(availableUnlaunchedContainerIds.size()
+ + availableLaunchedContainerIds.size());
+ allAvailableContainers.addAll(availableLaunchedContainerIds);
+ allAvailableContainers.addAll(availableUnlaunchedContainerIds);
+
+ if (allAvailableContainers.size() > 0) {
+ LOG.info("Before Assign: " + getStat());
+ scheduledRequests.assign(allAvailableContainers);
+ availableUnlaunchedContainerIds.clear();
+ availableLaunchedContainerIds.clear();
+ LOG.info("After Assign: " + getStat());
+ }
+ }
+
+ // TODO Override for re-use.
+ protected void requestContainers() {
+ Job j = getJob();
+ if (j == null) {
+ return;
+ }
+ // Nothign else here. All requests are sent to the Requester immediately.
+ if (recalculateReduceSchedule) {
+ preemptReducesIfNeeded();
+ scheduleReduces(
+ j.getTotalMaps(), j.getCompletedMaps(),
+ scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
+ assignedRequests.maps.size(), assignedRequests.reduces.size(),
+ mapResourceReqt, reduceResourceReqt,
+ pendingReduces.size(),
+ maxReduceRampupLimit, reduceSlowStart);
+ recalculateReduceSchedule = false;
+ }
+ }
+
+ /* availableUnlaunchedContainerIds contains the currently available containers.
+ * Should be cleared appropriately.
+ */
+ private synchronized void schedule() {
+ assignContainers();
+ requestContainers();
+ lastScheduleTime = clock.getTime();
+ }
+
+ protected void containerAvailable(ContainerId containerId) {
+ if (shouldReUse) {
+ AMContainer amContainer = containerMap.get(containerId);
+ if (maxAttemptsPerContainer == -1 ||
+ amContainer.getAllTaskAttempts().size() < maxAttemptsPerContainer) {
+ if (amContainer.getState() == AMContainerState.ALLOCATED) {
+ availableUnlaunchedContainerIds.add(containerId);
+ } else {
+ availableLaunchedContainerIds.add(containerId);
+ }
+ handle(new AMSchedulerEventContainersAllocated(
+ Collections.<ContainerId> emptyList(), true));
+ } else {
+ sendEvent(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ }
+ } else {
+ sendEvent(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private int maybeComputeNormalizedRequestForType(
+ AMSchedulerTALaunchRequestEvent event, TaskType taskType,
+ int prevComputedSize) {
+ if (prevComputedSize == 0) {
+ int supportedMaxContainerCapability = appContext.getClusterInfo()
+ .getMaxContainerCapability().getMemory();
+ prevComputedSize = event.getCapability().getMemory();
+ int minSlotMemSize = appContext.getClusterInfo()
+ .getMinContainerCapability().getMemory();
+ prevComputedSize = (int) Math.ceil((float) prevComputedSize
+ / minSlotMemSize)
+ * minSlotMemSize;
+ eventHandler.handle(new JobHistoryEvent(getJob().getID(),
+ new NormalizedResourceEvent(TypeConverter.fromYarn(taskType),
+ prevComputedSize)));
+ LOG.info(taskType + "ResourceReqt:" + prevComputedSize);
+ if (prevComputedSize > supportedMaxContainerCapability) {
+ String diagMsg = taskType
+ + " capability required is more than the supported "
+ + "max container capability in the cluster. Killing the Job. "
+ + taskType + "ResourceReqt: " + prevComputedSize
+ + " maxContainerCapability:" + supportedMaxContainerCapability;
+ LOG.info(diagMsg);
+ eventHandler.handle(new JobEventDiagnosticsUpdate(getJob().getID(), diagMsg));
+ eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
+ }
+ }
+ return prevComputedSize;
+ }
+
+ private void preemptReducesIfNeeded() {
+ if (reduceResourceReqt == 0) {
+ return; //no reduces
+ }
+ //check if reduces have taken over the whole cluster and there are
+ //unassigned maps
+ if (scheduledRequests.maps.size() > 0) {
+ int memLimit = getMemLimit();
+ int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
+ assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
+ //availableMemForMap must be sufficient to run atleast 1 map
+ if (availableMemForMap < mapResourceReqt) {
+ //to make sure new containers are given to maps and not reduces
+ //ramp down all scheduled reduces if any
+ //(since reduces are scheduled at higher priority than maps)
+ LOG.info("Ramping down all scheduled reduces:" + scheduledRequests.reduces.size());
+ for (ContainerRequestInfo req : scheduledRequests.reduces.values()) {
+ pendingReduces.add(req);
+ }
+ scheduledRequests.reduces.clear();
+
+ //preempt for making space for atleast one map
+ int premeptionLimit = Math.max(mapResourceReqt,
+ (int) (maxReducePreemptionLimit * memLimit));
+
+ int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt,
+ premeptionLimit);
+
+ int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
+ toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
+
+ LOG.info("Going to preempt " + toPreempt);
+ assignedRequests.preemptReduce(toPreempt);
+ }
+ }
+ }
+
+ @Private
+ public void scheduleReduces(
+ int totalMaps, int completedMaps,
+ int scheduledMaps, int scheduledReduces,
+ int assignedMaps, int assignedReduces,
+ int mapResourceReqt, int reduceResourceReqt,
+ int numPendingReduces,
+ float maxReduceRampupLimit, float reduceSlowStart) {
+
+ if (numPendingReduces == 0) {
+ return;
+ }
+
+ LOG.info("Recalculating schedule...");
+
+ //check for slow start
+ if (!getIsReduceStarted()) {//not set yet
+ int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart *
+ totalMaps);
+ if(completedMaps < completedMapsForReduceSlowstart) {
+ LOG.info("Reduce slow start threshold not met. " +
+ "completedMapsForReduceSlowstart " +
+ completedMapsForReduceSlowstart);
+ return;
+ } else {
+ LOG.info("Reduce slow start threshold reached. Scheduling reduces.");
+ setIsReduceStarted(true);
+ }
+ }
+
+ //if all maps are assigned, then ramp up all reduces irrespective of the
+ //headroom
+ if (scheduledMaps == 0 && numPendingReduces > 0) {
+ LOG.info("All maps assigned. " +
+ "Ramping up all remaining reduces:" + numPendingReduces);
+ scheduleAllReduces();
+ return;
+ }
+
+ float completedMapPercent = 0f;
+ if (totalMaps != 0) {//support for 0 maps
+ completedMapPercent = (float)completedMaps/totalMaps;
+ } else {
+ completedMapPercent = 1;
+ }
+
+ int netScheduledMapMem =
+ (scheduledMaps + assignedMaps) * mapResourceReqt;
+
+ int netScheduledReduceMem =
+ (scheduledReduces + assignedReduces) * reduceResourceReqt;
+
+ int finalMapMemLimit = 0;
+ int finalReduceMemLimit = 0;
+
+ // ramp up the reduces based on completed map percentage
+ int totalMemLimit = getMemLimit();
+ int idealReduceMemLimit =
+ Math.min(
+ (int)(completedMapPercent * totalMemLimit),
+ (int) (maxReduceRampupLimit * totalMemLimit));
+ int idealMapMemLimit = totalMemLimit - idealReduceMemLimit;
+
+ // check if there aren't enough maps scheduled, give the free map capacity
+ // to reduce
+ if (idealMapMemLimit > netScheduledMapMem) {
+ int unusedMapMemLimit = idealMapMemLimit - netScheduledMapMem;
+ finalReduceMemLimit = idealReduceMemLimit + unusedMapMemLimit;
+ finalMapMemLimit = totalMemLimit - finalReduceMemLimit;
+ } else {
+ finalMapMemLimit = idealMapMemLimit;
+ finalReduceMemLimit = idealReduceMemLimit;
+ }
+
+ LOG.info("completedMapPercent " + completedMapPercent +
+ " totalMemLimit:" + totalMemLimit +
+ " finalMapMemLimit:" + finalMapMemLimit +
+ " finalReduceMemLimit:" + finalReduceMemLimit +
+ " netScheduledMapMem:" + netScheduledMapMem +
+ " netScheduledReduceMem:" + netScheduledReduceMem);
+
+ int rampUp =
+ (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt;
+
+ if (rampUp > 0) {
+ rampUp = Math.min(rampUp, numPendingReduces);
+ LOG.info("Ramping up " + rampUp);
+ rampUpReduces(rampUp);
+ } else if (rampUp < 0){
+ int rampDown = -1 * rampUp;
+ rampDown = Math.min(rampDown, scheduledReduces);
+ LOG.info("Ramping down " + rampDown);
+ rampDownReduces(rampDown);
+ }
+ }
+
+ @Private
+ public void scheduleAllReduces() {
+ for (ContainerRequestInfo req : pendingReduces) {
+ scheduledRequests.addReduce(req);
+ }
+ pendingReduces.clear();
+ }
+
+ @Private
+ public void rampUpReduces(int rampUp) {
+ //more reduce to be scheduled
+ for (int i = 0; i < rampUp; i++) {
+ ContainerRequestInfo request = pendingReduces.removeFirst();
+ scheduledRequests.addReduce(request);
+ }
+ }
+
+ @Private
+ public void rampDownReduces(int rampDown) {
+ //remove from the scheduled and move back to pending
+ for (int i = 0; i < rampDown; i++) {
+ ContainerRequestInfo request = scheduledRequests.removeReduce();
+ pendingReduces.add(request);
+ }
+ }
+
+ /**
+ * Synchronized to avoid findbugs warnings
+ */
+ private synchronized String getStat() {
+ return "PendingReduces:" + pendingReduces.size() +
+ " ScheduledMaps:" + scheduledRequests.maps.size() +
+ " ScheduledReduces:" + scheduledRequests.reduces.size() +
+ " AssignedMaps:" + assignedRequests.maps.size() +
+ " AssignedReduces:" + assignedRequests.reduces.size() +
+ " completedMaps:" + completedMaps +
+ " completedReduces:" + completedReduces +
+ " containersAllocated:" + containersAllocated + //Not super useful.
+ " newContainersAllocated: " + newContainerAllocations +
+ " existingContainersAllocated: " + existingContainerAllocations +
+ " containersReleased:" + containersReleased +
+ " hostLocalAssigned:" + hostLocalAssigned +
+ " rackLocalAssigned:" + rackLocalAssigned +
+ " availableResources(headroom):" + requestor.getAvailableResources();
+ // TODO Can hostLocal/rackLocal be handled elsewhere.
+ }
+
+
+
+ @Private
+ public int getMemLimit() {
+ int headRoom = requestor.getAvailableResources() != null ? requestor
+ .getAvailableResources().getMemory() : 0;
+ return headRoom + assignedRequests.maps.size() * mapResourceReqt
+ + assignedRequests.reduces.size() * reduceResourceReqt;
+ }
+
+
+ /**
+ * Tracks attempts for which a Container ask has been sent to the
+ * RMCommunicator.
+ */
+ private class ScheduledRequests {
+
+ private final LinkedList<TaskAttemptId> earlierFailedMaps =
+ new LinkedList<TaskAttemptId>();
+
+ /** Maps from a host to a list of Map tasks with data on the host */
+ private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping =
+ new HashMap<String, LinkedList<TaskAttemptId>>();
+ private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
+ new HashMap<String, LinkedList<TaskAttemptId>>();
+ private final Map<TaskAttemptId, ContainerRequestInfo> maps =
+ new LinkedHashMap<TaskAttemptId, ContainerRequestInfo>();
+
+ private final LinkedHashMap<TaskAttemptId, ContainerRequestInfo> reduces =
+ new LinkedHashMap<TaskAttemptId, ContainerRequestInfo>();
+
+
+ boolean remove(TaskAttemptId tId) {
+ ContainerRequestInfo req = null;
+ if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
+ req = maps.remove(tId);
+ } else {
+ req = reduces.remove(tId);
+ }
+ // TODO XXX: Remove from mapsHostMapping and mapsRackMapping
+
+ if (req == null) {
+ return false;
+ } else {
+ requestor.decContainerReq(req.getContainerRequest());
+ return true;
+ }
+ }
+
+ ContainerRequestInfo removeReduce() {
+ Iterator<Entry<TaskAttemptId, ContainerRequestInfo>> it = reduces.entrySet().iterator();
+ if (it.hasNext()) {
+ Entry<TaskAttemptId, ContainerRequestInfo> entry = it.next();
+ it.remove();
+ requestor.decContainerReq(entry.getValue().getContainerRequest());
+ return entry.getValue();
+ }
+ return null;
+ }
+
+ /**
+ * Considers node blacklisting while create container ask requests for the
+ * RMContainerAllocator.
+ */
+ void addMap(AMSchedulerTALaunchRequestEvent event) {
+ ContainerRequest request = null;
+
+ if (event.isRescheduled()) {
+ earlierFailedMaps.add(event.getAttemptID());
+ request = new ContainerRequest(event.getCapability(), event.getHosts(),
+ event.getRacks(), PRIORITY_FAST_FAIL_MAP);
+ LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
+ } else {
+ List<String> hosts = new LinkedList<String>();
+ for (String host : event.getHosts()) {
+ LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+ if (list == null) {
+ list = new LinkedList<TaskAttemptId>();
+ mapsHostMapping.put(host, list);
+ }
+ list.add(event.getAttemptID());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to host " + host);
+ }
+ if (!appContext.getAllNodes().isHostBlackListed(host)) {
+ hosts.add(host);
+ } else {
+ // Leaving the entries in mapsHostMapping etc. Will allow allocation
+ // in case all nodes get blacklisted / blacklisting gets enabled.
+ LOG.info("XXX: Host: " + host
+ + " is blacklisted. Not including in Container request");
+ }
+ }
+ for (String rack: event.getRacks()) {
+ LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+ if (list == null) {
+ list = new LinkedList<TaskAttemptId>();
+ mapsRackMapping.put(rack, list);
+ }
+ list.add(event.getAttemptID());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added attempt req to rack " + rack);
+ }
+ }
+ request = new ContainerRequest(event.getCapability(),
+ hosts.toArray(new String[0]), event.getRacks(), PRIORITY_MAP);
+ }
+ // ContainerRequestInfo ends up with the correct ContainerRequest, and the
+ // original event.
+ // Remove works on the basis of the ContainerRequest while asking the
+ // RMComm to decrement a container request.
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking pre-allocations, isMapPrio="
+ + !event.isRescheduled());
+ }
+ boolean usingPreAllocated = false;
+ if (request.priority.equals(PRIORITY_MAP)
+ && !preAllocatedAvailableContainerIds.isEmpty()) {
+ for (ContainerId cId : preAllocatedAvailableContainerIds) {
+ AMContainer amContainer = containerMap.get(cId);
+ if (amContainer == null
+ || amContainer.getContainer() == null) {
+ preAllocatedAvailableContainerIds.remove(cId);
+ continue;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking against pre-alloc container"
+ + ", containerId=" + cId.getId()
+ + ", containerNode=" + amContainer.getContainer().getNodeId());
+ }
+
+ TaskAttemptId tAttemptId = event.getAttemptID();
+ if (event.getHosts().length == 0) {
+ // Nothing to do
+ // Should fall under wild card match
+ break;
+ } else {
+ String containerHost =
+ amContainer.getContainer().getNodeId().getHost();
+ for (String host : event.getHosts()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking against pre-alloc container"
+ + ", containerId=" + cId.getId()
+ + ", containerHost=" + containerHost
+ + ", taskAttemptId=" + tAttemptId
+ + ", taskHost=" + host);
+ }
+ if (host.equals(containerHost)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expecting to use pre-allocated container for this map"
+ + " request, containerId=" + cId
+ + ", matchType=host");
+ }
+ usingPreAllocated = true;
+ break;
+ }
+ }
+ }
+ if (!usingPreAllocated) {
+ // do a rack match
+ String containerRack =
+ RackResolver.resolve(
+ amContainer.getContainer().getNodeId().getHost())
+ .getNetworkLocation();
+ for (String rack : event.getRacks()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking against pre-alloc container"
+ + ", containerId=" + cId.getId()
+ + ", containerRack=" + containerRack
+ + ", taskAttemptId=" + tAttemptId
+ + ", taskRack=" + rack);
+ }
+ if (rack.equals(containerRack)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expecting to use pre-allocated container for this map"
+ + " request, containerId=" + cId
+ + ", matchType=rack");
+ }
+ usingPreAllocated = true;
+ break;
+ }
+ }
+ }
+
+ if (usingPreAllocated) {
+ preAllocatedAvailableContainerIds.remove(cId);
+ preAllocatedSeenContainerIds.add(cId);
+ availableUnlaunchedContainerIds.add(cId);
+ maps.put(event.getAttemptID(),
+ new ContainerRequestInfo(request, event));
+ break;
+ }
+ }
+ if (!usingPreAllocated
+ && !preAllocatedAvailableContainerIds.isEmpty()) {
+ // catch all for an attempt that does not match any host/rack combo
+ ContainerId cId = preAllocatedAvailableContainerIds.iterator().next();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expecting to use pre-allocated container for this map"
+ + " request, containerId=" + cId
+ + ", matchType=any");
+ }
+ preAllocatedAvailableContainerIds.remove(cId);
+ preAllocatedSeenContainerIds.add(cId);
+ availableUnlaunchedContainerIds.add(cId);
+ maps.put(event.getAttemptID(),
+ new ContainerRequestInfo(request, event));
+ usingPreAllocated = true;
+ }
+ }
+
+ if (!usingPreAllocated) {
+ maps.put(event.getAttemptID(), new ContainerRequestInfo(request,
+ event));
+ requestor.addContainerReq(request);
+ }
+
+ }
+
+
+ void addReduce(ContainerRequestInfo req) {
+ reduces.put(req.getAttemptId(), req);
+ requestor.addContainerReq(req.getContainerRequest());
+ }
+
+ private void assign(LinkedHashSet<ContainerId> allocatedContainerIds) {
+ // this method will change the list of allocatedContainers.
+ Iterator<ContainerId> it = allocatedContainerIds.iterator();
+ LOG.info("Got allocated containers " + allocatedContainerIds.size());
+ containersAllocated += allocatedContainerIds.size();
+ while (it.hasNext()) {
+ ContainerId containerId = it.next();
+ AMContainer amContainer = containerMap.get(containerId);
+ Container allocated = amContainer.getContainer();
+ if (amContainer.getState() == AMContainerState.ALLOCATED) {
+ newContainerAllocations++;
+ } else {
+ existingContainerAllocations++;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning container " + allocated.getId()
+ + " with priority " + allocated.getPriority() + " to NM "
+ + allocated.getNodeId());
+ }
+
+ // check if allocated container meets memory requirements
+ // and whether we have any scheduled tasks that need
+ // a container to be assigned
+ boolean isAssignable = true;
+ Priority priority = allocated.getPriority();
+ int allocatedMemory = allocated.getResource().getMemory();
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)
+ || PRIORITY_MAP.equals(priority)) {
+ if (allocatedMemory < mapResourceReqt
+ || maps.isEmpty()) {
+ LOG.info("Cannot assign container " + allocated
+ + " for a map as either "
+ + " container memory less than required " + mapResourceReqt
+ + " or no pending map tasks - maps.isEmpty="
+ + maps.isEmpty());
+ isAssignable = false;
+ }
+ }
+ else if (PRIORITY_REDUCE.equals(priority)) {
+ if (allocatedMemory < reduceResourceReqt
+ || reduces.isEmpty()) {
+ LOG.info("Cannot assign container " + allocated
+ + " for a reduce as either "
+ + " container memory less than required " + reduceResourceReqt
+ + " or no pending reduce tasks - reduces.isEmpty="
+ + reduces.isEmpty());
+ isAssignable = false;
+ }
+ } else {
+ LOG.warn("Container allocated at unwanted priority: " + priority +
+ ". Returning to RM...");
+ isAssignable = false;
+ }
+
+ if(!isAssignable) {
+ //release container if we could not assign it
+ containerNotAssigned(allocated);
+ it.remove();
+ continue;
+ }
+
+ String allocatedHost = allocated.getNodeId().getHost();
+
+ // TODO Differentiation between blacklisted versus unusable nodes ?
+ // Ideally there should be no assignments on unhealthy nodes.
+ boolean nodeUnhealthy = false;
+ boolean blackListed = false;
+ blackListed = appContext.getAllNodes().isHostBlackListed(
+ allocatedHost);
+ nodeUnhealthy = appContext.getAllNodes().get(allocated.getNodeId())
+ .isUnhealthy();
+
+ if (nodeUnhealthy || blackListed) {
+ // we need to request for a new container
+ // and release the current one
+ LOG.info("Got allocated container on an unusable " + " host "
+ + allocatedHost + ". Releasing container " + allocated
+ + " NodeUnhealthy: " + nodeUnhealthy + ", NodeBlackListed: "
+ + blackListed);
+
+ // find the request matching this allocated container and replace it
+ // with a new one. Have to ensure a request goes out to the RM
+ // asking for a new container. Hence a decRequest + addRequest.
+ ContainerRequestInfo toBeReplacedReq = getContainerReqToReplace(allocated);
+
+ if (toBeReplacedReq != null) {
+ LOG.info("Placing a new container request for task attempt "
+ + toBeReplacedReq.getAttemptId());
+ // This isn't necessarily needed, since the request should have
+ // changed
+ // when the node blacklist event was received.
+ ContainerRequestInfo newReq = getFilteredContainerRequest(toBeReplacedReq);
+ requestor.decContainerReq(toBeReplacedReq.getContainerRequest());
+ if (toBeReplacedReq.getAttemptId().getTaskId().getTaskType() == TaskType.MAP) {
+ maps.put(newReq.getAttemptId(), newReq);
+ } else {
+ reduces.put(newReq.getAttemptId(), newReq);
+ }
+ requestor.addContainerReq(newReq.getContainerRequest());
+ } else {
+ LOG.info("Could not map allocated container to a valid request."
+ + " Releasing allocated container " + allocated);
+ }
+
+ // release container if we could not assign it
+ containerNotAssigned(allocated);
+ it.remove();
+ continue;
+ }
+ }
+
+ assignContainers(allocatedContainerIds);
+
+ // release container if we could not assign it
+ it = allocatedContainerIds.iterator();
+ while (it.hasNext()) {
+ Container allocated = containerMap.get(it.next()).getContainer();
+ LOG.info("Releasing unassigned and invalid container "
+ + allocated + ". RM may have assignment issues");
+ containerNotAssigned(allocated);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void containerAssigned(Container allocated,
+ ContainerRequestInfo assigned) {
+ // Update resource requests
+ if (preAllocatedSeenContainerIds.contains(allocated.getId())) {
+ final String[] emptyArray = new String[0];
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(getConfig().getInt(MRJobConfig.MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB));
+ capability.setVirtualCores(1);
+ final ContainerRequest preAllocReq =
+ new ContainerRequest(capability, emptyArray, emptyArray,
+ RMContainerAllocator.PRIORITY_MAP);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using pre-allocated container, not changing asks in"
+ + " requestor except to drecement ANY by 1");
+ }
+ preAllocatedSeenContainerIds.remove(allocated.getId());
+ requestor.decContainerReq(preAllocReq);
+ } else {
+ requestor.decContainerReq(assigned.getContainerRequest());
+ }
+
+ // TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator.
+ ContainerId containerId = allocated.getId();
+ if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
+ AMSchedulerTALaunchRequestEvent tlrEvent = attemptToLaunchRequestMap
+ .get(assigned.getAttemptId());
+ JobConf jobConf = new JobConf(getJob().getConf());
+
+ AMContainerEventLaunchRequest launchRequest = new AMContainerEventLaunchRequest(
+ containerId, jobId, assigned.getAttemptId().getTaskId()
+ .getTaskType(), tlrEvent.getJobToken(),
+ tlrEvent.getCredentials(), shouldProfileTaskAttempt(
+ jobConf, tlrEvent.getRemoteTaskContext()), jobConf);
+
+ eventHandler.handle(launchRequest);
+ }
+ eventHandler.handle(new AMContainerEventAssignTA(containerId,
+ assigned.getAttemptId(), attemptToLaunchRequestMap.get(
+ assigned.getAttemptId()).getRemoteTaskContext()));
+
+ assignedRequests.add(allocated, assigned.getAttemptId());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Assigned container (" + allocated + ") "
+ + " to task " + assigned.getAttemptId() + " on node "
+ + allocated.getNodeId().toString());
+ }
+ }
+
+ private void containerNotAssigned(Container allocated) {
+ if (preAllocatedAvailableContainerIds.contains(allocated.getId())) {
+ LOG.debug("Not releasing pre-allocated container until it can be"
+ + " assigned");
+ } else if (preAllocatedSeenContainerIds.contains(allocated.getId())) {
+ LOG.debug("Not able to assign a pre-allocated container"
+ + ", putting it back into available pool");
+ preAllocatedAvailableContainerIds.add(allocated.getId());
+ } else {
+ containersReleased++;
+ sendEvent(new AMContainerEvent(allocated.getId(),
+ AMContainerEventType.C_STOP_REQUEST));
+ }
+ }
+
+ private ContainerRequestInfo assignWithoutLocality(Container allocated) {
+ ContainerRequestInfo assigned = null;
+
+ Priority priority = allocated.getPriority();
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
+ LOG.info("Assigning container " + allocated + " to fast fail map");
+ assigned = assignToFailedMap(allocated);
+ } else if (PRIORITY_REDUCE.equals(priority)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigning container " + allocated + " to reduce");
+ }
+ assigned = assignToReduce(allocated);
+ }
+
+ return assigned;
+ }
+
+ private void assignContainers(LinkedHashSet<ContainerId> allocatedContainers) {
+ Iterator<ContainerId> it = allocatedContainers.iterator();
+ while (it.hasNext()) {
+ Container allocated = containerMap.get(it.next()).getContainer();
+ ContainerRequestInfo assigned = assignWithoutLocality(allocated);
+ if (assigned != null) {
+ containerAssigned(allocated, assigned);
+ it.remove();
+ }
+ }
+
+ assignMapsWithLocality(allocatedContainers);
+ }
+
+ private ContainerRequestInfo getContainerReqToReplace(Container allocated) {
+ LOG.info("Finding containerReq for allocated container: " + allocated);
+ Priority priority = allocated.getPriority();
+ ContainerRequestInfo toBeReplaced = null;
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
+ LOG.info("Replacing FAST_FAIL_MAP container " + allocated.getId());
+ Iterator<TaskAttemptId> iter = earlierFailedMaps.iterator();
+ while (toBeReplaced == null && iter.hasNext()) {
+ toBeReplaced = maps.get(iter.next());
+ }
+ LOG.info("Found replacement: " + toBeReplaced);
+ return toBeReplaced;
+ }
+ else if (PRIORITY_MAP.equals(priority)) {
+ LOG.info("Replacing MAP container " + allocated.getId());
+ // allocated container was for a map
+ String host = allocated.getNodeId().getHost();
+ LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+ if (list != null && list.size() > 0) {
+ TaskAttemptId tId = list.removeLast();
+ if (maps.containsKey(tId)) {
+ toBeReplaced = maps.remove(tId);
+ }
+ }
+ else {
+ TaskAttemptId tId = maps.keySet().iterator().next();
+ toBeReplaced = maps.remove(tId);
+ }
+ }
+ else if (PRIORITY_REDUCE.equals(priority)) {
+ TaskAttemptId tId = reduces.keySet().iterator().next();
+ toBeReplaced = reduces.remove(tId);
+ }
+ LOG.info("Found replacement: " + toBeReplaced);
+ return toBeReplaced;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private ContainerRequestInfo assignToFailedMap(Container allocated) {
+ //try to assign to earlierFailedMaps if present
+ ContainerRequestInfo assigned = null;
+ while (assigned == null && earlierFailedMaps.size() > 0) {
+ TaskAttemptId tId = earlierFailedMaps.removeFirst();
+ if (maps.containsKey(tId)) {
+ assigned = maps.remove(tId);
+ JobEventCounterUpdate jce =
+ new JobEventCounterUpdate(assigned.getAttemptId().getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
+ LOG.info("Assigned from earlierFailedMaps");
+ break;
+ }
+ }
+ return assigned;
+ }
+
+ private ContainerRequestInfo assignToReduce(Container allocated) {
+ ContainerRequestInfo assigned = null;
+ //try to assign to reduces if present
+ if (assigned == null && reduces.size() > 0) {
+ TaskAttemptId tId = reduces.keySet().iterator().next();
+ assigned = reduces.remove(tId);
+ LOG.info("Assigned to reduce");
+ }
+ return assigned;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void assignMapsWithLocality(LinkedHashSet<ContainerId> allocatedContainerIds) {
+ // try to assign to all nodes first to match node local
+ Iterator<ContainerId> it = allocatedContainerIds.iterator();
+ while(it.hasNext() && maps.size() > 0){
+ Container allocated = containerMap.get(it.next()).getContainer();
+ Priority priority = allocated.getPriority();
+ assert PRIORITY_MAP.equals(priority);
+ // "if (maps.containsKey(tId))" below should be almost always true.
+ // hence this while loop would almost always have O(1) complexity
+ String host = allocated.getNodeId().getHost();
+ LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+ while (list != null && list.size() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Host matched to the request list " + host);
+ }
+ TaskAttemptId tId = list.removeFirst();
+ if (maps.containsKey(tId)) {
+ ContainerRequestInfo assigned = maps.remove(tId);
+ containerAssigned(allocated, assigned);
+ it.remove();
+ JobEventCounterUpdate jce =
+ new JobEventCounterUpdate(tId.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
+ hostLocalAssigned++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned based on host match " + host);
+ }
+ break;
+ }
+ }
+ }
+
+ // try to match all rack local
+ it = allocatedContainerIds.iterator();
+ while(it.hasNext() && maps.size() > 0){
+ Container allocated = containerMap.get(it.next()).getContainer();
+ Priority priority = allocated.getPriority();
+ assert PRIORITY_MAP.equals(priority);
+ // "if (maps.containsKey(tId))" below should be almost always true.
+ // hence this while loop would almost always have O(1) complexity
+ String host = allocated.getNodeId().getHost();
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+ while (list != null && list.size() > 0) {
+ TaskAttemptId tId = list.removeFirst();
+ if (maps.containsKey(tId)) {
+ ContainerRequestInfo assigned = maps.remove(tId);
+ containerAssigned(allocated, assigned);
+ it.remove();
+ JobEventCounterUpdate jce =
+ new JobEventCounterUpdate(tId.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
+ rackLocalAssigned++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned based on rack match " + rack);
+ }
+ break;
+ }
+ }
+ }
+
+ // assign remaining
+ it = allocatedContainerIds.iterator();
+ while(it.hasNext() && maps.size() > 0){
+ Container allocated = containerMap.get(it.next()).getContainer();
+ Priority priority = allocated.getPriority();
+ assert PRIORITY_MAP.equals(priority);
+ TaskAttemptId tId = maps.keySet().iterator().next();
+ ContainerRequestInfo assigned = maps.remove(tId);
+ containerAssigned(allocated, assigned);
+ it.remove();
+ JobEventCounterUpdate jce =
+ new JobEventCounterUpdate(tId.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned based on * match");
+ }
+ }
+ }
+ }
+
+ /**
+ * Tracks TaskAttempts which have been assigned a Container.
+ */
+ private class AssignedRequests {
+ private final LinkedHashMap<TaskAttemptId, Container> maps =
+ new LinkedHashMap<TaskAttemptId, Container>();
+ private final LinkedHashMap<TaskAttemptId, Container> reduces =
+ new LinkedHashMap<TaskAttemptId, Container>();
+ private final Set<TaskAttemptId> preemptionWaitingReduces =
+ new HashSet<TaskAttemptId>();
+
+ void add(Container container, TaskAttemptId tId) {
+ LOG.info("Assigned container " + container.getId().toString() + " to " + tId);
+ if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
+ maps.put(tId, container);
+ } else {
+ reduces.put(tId, container);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void preemptReduce(int toPreempt) {
+ List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
+ (reduces.keySet());
+ //sort reduces on progress
+ Collections.sort(reduceList,
+ new Comparator<TaskAttemptId>() {
+ @Override
+ public int compare(TaskAttemptId o1, TaskAttemptId o2) {
+ float p = getJob().getTask(o1.getTaskId()).getAttempt(o1).getProgress() -
+ getJob().getTask(o2.getTaskId()).getAttempt(o2).getProgress();
+ return p >= 0 ? 1 : -1;
+ }
+ });
+
+ for (int i = 0; i < toPreempt && reduceList.size() > 0; i++) {
+ TaskAttemptId id = reduceList.remove(0);//remove the one on top
+ LOG.info("Preempting " + id);
+ preemptionWaitingReduces.add(id);
+ eventHandler.handle(new TaskAttemptEventKillRequest(id, "Pre-empting reduce"));
+ }
+ }
+
+ ContainerId remove(TaskAttemptId tId) {
+ ContainerId containerId = null;
+ if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
+ containerId = maps.remove(tId).getId();
+ } else {
+ containerId = reduces.remove(tId).getId();
+ if (containerId != null) {
+ boolean preempted = preemptionWaitingReduces.remove(tId);
+ if (preempted) {
+ LOG.info("Reduce preemption successful " + tId);
+ }
+ }
+ }
+
+ return containerId;
+ }
+ }
+
+ protected ContainerRequestInfo getFilteredContainerRequest(
+ ContainerRequestInfo origRequestInfo) {
+ ContainerRequest orig = origRequestInfo.getContainerRequest();
+ ArrayList<String> newHosts = new ArrayList<String>();
+ for (String host : orig.hosts) {
+ if (!appContext.getAllNodes().isHostBlackListed(host)) {
+ newHosts.add(host);
+ }
+ }
+ String[] hosts = newHosts.toArray(new String[newHosts.size()]);
+ ContainerRequestInfo newReq = new ContainerRequestInfo(
+ new ContainerRequest(orig.capability, hosts, orig.racks, orig.priority),
+ origRequestInfo.launchRequestEvent);
+ return newReq;
+ }
+
+ /*
+ * Not very useful for a re-use scheduler.
+ */
+ protected boolean shouldProfileTaskAttempt(JobConf conf,
+ MRTaskContext remoteTaskContext) {
+ // TODO EVENTUALLY
+// TaskType taskType = TezTypeConverters.toYarn(remoteTaskContext.getTaskAttemptId()
+// .getTaskType());
+//
+// if (conf.getProfileEnabled()) {
+// if (conf.getProfileTaskRange(taskType == TaskType.MAP).isIncluded(
+// remoteTask.getPartition())) {
+// return true;
+// }
+// }
+ return false;
+ }
+
+ private static class ContainerRequestInfo {
+ ContainerRequestInfo(ContainerRequest containerRequest,
+ AMSchedulerTALaunchRequestEvent launchRequestEvent) {
+ this.containerRequest = containerRequest;
+ this.launchRequestEvent = launchRequestEvent;
+ }
+
+ ContainerRequest containerRequest;
+ AMSchedulerTALaunchRequestEvent launchRequestEvent;
+
+ TaskAttemptId getAttemptId() {
+ return launchRequestEvent.getAttemptID();
+ }
+
+ ContainerRequest getContainerRequest() {
+ return this.containerRequest;
+ }
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,555 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventContainerAllocated;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventNodeCountUpdated;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventStateChanged;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ * Keeps the data structures to send container requests to RM.
+ */
+// TODO XXX: Eventually rename to RMCommunicator
+public class RMContainerRequestor extends RMCommunicator implements ContainerRequestor {
+
+ private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
+ public static final String ANY = "*";
+
+ private final Clock clock;
+
+ private Resource availableResources; // aka headroom.
+ private long retrystartTime;
+ private long retryInterval;
+
+ private int numContainerReleaseRequests;
+ private int numContainersAllocated;
+ private int numFinishedContainers; // Not very useful.
+
+
+ //Key -> Priority
+ //Value -> Map
+ // Key->ResourceName (e.g., hostname, rackname, *)
+ // Value->Map
+ // Key->Resource Capability
+ // Value->ResourceRequest
+ private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+ remoteRequestsTable =
+ new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+ private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
+ private final Set<ContainerId> release = new TreeSet<ContainerId>();
+
+ private Lock releaseLock = new ReentrantLock();
+ private Lock askLock = new ReentrantLock();
+ private final List<ContainerId> emptyReleaseList = new ArrayList<ContainerId>(0);
+ private final List<ResourceRequest> emptyAskList = new ArrayList<ResourceRequest>();
+
+ private int clusterNmCount = 0;
+
+ // TODO Consider allowing sync comm between the requestor and allocator...
+
+ // TODO Why does the RMRequestor require the ClientService ??
+ // (for the RPC address. get rid of this.)
+ public RMContainerRequestor(ClientService clientService, AppContext context) {
+ super(clientService, context);
+ this.clock = context.getClock();
+ }
+
+ public static class ContainerRequest {
+ final Resource capability;
+ final String[] hosts;
+ final String[] racks;
+ final Priority priority;
+
+ public ContainerRequest(Resource capability, String[] hosts,
+ String[] racks, Priority priority) {
+ this.capability = capability;
+ this.hosts = hosts;
+ this.racks = racks;
+ this.priority = priority;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Capability[").append(capability).append("]");
+ sb.append("Priority[").append(priority).append("]");
+ return sb.toString();
+ }
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ retrystartTime = clock.getTime();
+ retryInterval = getConfig().getLong(
+ MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+ MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ return this.applicationACLs;
+ }
+
+ public void stop(Configuration conf) {
+ LOG.info("NumAllocatedContainers: " + numContainersAllocated
+ + "NumFinihsedContainers: " + numFinishedContainers
+ + "NumReleaseRequests: " + numContainerReleaseRequests);
+ super.stop();
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ return availableResources;
+ }
+
+ public void addContainerReq(ContainerRequest req) {
+ // Create resource requests
+ for (String host : req.hosts) {
+ // Data-local
+ // Assumes the scheduler is handling bad nodes. Tracking them here would
+ // lead to an out-of-sync scheduler / requestor.
+ addResourceRequest(req.priority, host, req.capability);
+ }
+
+ // Nothing Rack-local for now
+ for (String rack : req.racks) {
+ addResourceRequest(req.priority, rack, req.capability);
+ }
+
+ // Off-switch
+ addResourceRequest(req.priority, ANY, req.capability);
+ }
+
+ public void decContainerReq(ContainerRequest req) {
+ // Update resource requests
+ for (String hostName : req.hosts) {
+ decResourceRequest(req.priority, hostName, req.capability);
+ }
+
+ for (String rack : req.racks) {
+ decResourceRequest(req.priority, rack, req.capability);
+ }
+
+ decResourceRequest(req.priority, ANY, req.capability);
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ addResourceRequest(priority, resourceName, capability, 1);
+ }
+
+ private void addResourceRequest(Priority priority, String resourceName,
+ Resource capability, int increment) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ if (remoteRequests == null) {
+ remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+ this.remoteRequestsTable.put(priority, remoteRequests);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added priority=" + priority);
+ }
+ }
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ reqMap = new HashMap<Resource, ResourceRequest>();
+ remoteRequests.put(resourceName, reqMap);
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+ if (remoteRequest == null) {
+ remoteRequest = Records.newRecord(ResourceRequest.class);
+ remoteRequest.setPriority(priority);
+ remoteRequest.setHostName(resourceName);
+ remoteRequest.setCapability(capability);
+ remoteRequest.setNumContainers(0);
+ reqMap.put(capability, remoteRequest);
+ }
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() + increment);
+ // 0 is a special case to re-add the request to the ask table.
+
+ // Note this down for next interaction with ResourceManager
+ int askSize = 0;
+ askLock.lock();
+ try {
+ ask.add(remoteRequest);
+ askSize = ask.size();
+ } finally {
+ askLock.unlock();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + askSize);
+ }
+ }
+
+ private void decResourceRequest(Priority priority, String resourceName,
+ Resource capability) {
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ this.remoteRequestsTable.get(priority);
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ // as we modify the resource requests by filtering out blacklisted hosts
+ // when they are added, this value may be null when being
+ // decremented
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not decrementing resource as " + resourceName
+ + " is not present in request table");
+ }
+ return;
+ }
+ ResourceRequest remoteRequest = reqMap.get(capability);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + getAskSize());
+ }
+
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+ if (remoteRequest.getNumContainers() == 0) {
+ reqMap.remove(capability);
+ if (reqMap.size() == 0) {
+ remoteRequests.remove(resourceName);
+ }
+ if (remoteRequests.size() == 0) {
+ remoteRequestsTable.remove(priority);
+ }
+ //remove from ask if it may have
+ askLock.lock();
+ try {
+ ask.remove(remoteRequest);
+ } finally {
+ askLock.unlock();
+ }
+ } else {
+ askLock.lock();
+ try {
+ ask.add(remoteRequest);//this will override the request if ask doesn't
+ //already have it.
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("AFTER decResourceRequest:" + " applicationId="
+ + applicationId.getId() + " priority=" + priority.getPriority()
+ + " resourceName=" + resourceName + " numContainers="
+ + remoteRequest.getNumContainers() + " #asks=" + getAskSize());
+ }
+ }
+
+ private int getAskSize() {
+ askLock.lock();
+ try {
+ return ask.size();
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ private String getStat() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ContainersAllocated: ").append(numContainersAllocated)
+ .append(", ContainersFinished: ").append(numFinishedContainers)
+ .append(", NumContainerReleaseRequests: ")
+ .append(numContainerReleaseRequests);
+ return sb.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void heartbeat() throws Exception {
+ LOG.info("BeforeHeartbeat: " + getStat());
+ int headRoom = getAvailableResources() != null ? getAvailableResources()
+ .getMemory() : 0;// first time it would be null
+ int lastClusterNmCount = clusterNmCount;
+ AMResponse response = errorCheckedMakeRemoteRequest();
+
+ int newHeadRoom = getAvailableResources() != null ? getAvailableResources()
+ .getMemory() : 0;
+ List<Container> newContainers = response.getAllocatedContainers();
+ logNewContainers(newContainers);
+ numContainersAllocated += newContainers.size();
+
+ List<ContainerStatus> finishedContainers = response
+ .getCompletedContainersStatuses();
+ logFinishedContainers(finishedContainers);
+ numFinishedContainers += finishedContainers.size();
+
+ List<NodeReport> updatedNodeReports = response.getUpdatedNodes();
+ logUpdatedNodes(updatedNodeReports);
+
+ LOG.info("AfterHeartbeat: " + getStat());
+
+ if (clusterNmCount != lastClusterNmCount) {
+ LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+ + clusterNmCount);
+ eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+ }
+
+ // Inform the Containers about completion..
+ for (ContainerStatus c : finishedContainers) {
+ eventHandler.handle(new AMContainerEventCompleted(c));
+ }
+
+ // Inform the scheduler about new containers.
+ List<ContainerId> newContainerIds;
+ if (newContainers.size() > 0) {
+ newContainerIds = new ArrayList<ContainerId>(newContainers.size());
+ for (Container container : newContainers) {
+ context.getAllContainers().addContainerIfNew(container);
+ newContainerIds.add(container.getId());
+ context.getAllNodes().nodeSeen(container.getNodeId());
+ eventHandler.handle(new AMNodeEventContainerAllocated(container
+ .getNodeId(), container.getId()));
+ }
+ eventHandler.handle(new AMSchedulerEventContainersAllocated(
+ newContainerIds, (newHeadRoom - headRoom != 0)));
+ }
+
+ //Inform the nodes about sate changes.
+ for (NodeReport nr : updatedNodeReports) {
+ eventHandler.handle(new AMNodeEventStateChanged(nr));
+ // Allocator will find out from the node, if at all.
+ // Relying on the RM to not allocated containers on an unhealthy node.
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ protected AMResponse errorCheckedMakeRemoteRequest() throws Exception {
+ AMResponse response = null;
+ try {
+ response = makeRemoteRequest();
+ // Reset retry count if no exception occurred.
+ retrystartTime = clock.getTime();
+ } catch (Exception e) {
+ // This can happen when the connection to the RM has gone down. Keep
+ // re-trying until the retryInterval has expired.
+ if (clock.getTime() - retrystartTime >= retryInterval) {
+ LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Could not contact RM after " +
+ retryInterval + " milliseconds.");
+ }
+ // Throw this up to the caller, which may decide to ignore it and
+ // continue to attempt to contact the RM.
+ throw e;
+ }
+ if (response.getReboot()) {
+ // This can happen if the RM has been restarted. If it is in that state,
+ // this application must clean itself up.
+ eventHandler.handle(new JobEvent(this.getJob().getID(),
+ JobEventType.INTERNAL_ERROR));
+ throw new YarnException("Resource Manager doesn't recognize AttemptId: "
+ + this.getContext().getApplicationID());
+ }
+ return response;
+ }
+
+
+ protected AMResponse makeRemoteRequest() throws Exception {
+ List<ContainerId> clonedReleaseList = cloneAndClearReleaseList();
+ List<ResourceRequest> clonedAskList = cloneAndClearAskList();
+
+ AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+ applicationAttemptId, lastResponseID, super.getApplicationProgress(),
+ clonedAskList, clonedReleaseList);
+ AllocateResponse allocateResponse = null;
+ try {
+ allocateResponse = scheduler.allocate(allocateRequest);
+ } catch (Exception e) {
+ rePopulateListsOnError(clonedReleaseList, clonedAskList);
+ throw e;
+ }
+ AMResponse response = allocateResponse.getAMResponse();
+ lastResponseID = response.getResponseId();
+ availableResources = response.getAvailableResources();
+ clusterNmCount = allocateResponse.getNumClusterNodes();
+
+ if (clonedAskList.size() > 0 || clonedReleaseList.size() > 0) {
+ LOG.info("getResources() for " + applicationId + ":" + " ask="
+ + clonedAskList.size() + " release= " + clonedReleaseList.size()
+ + " newContainers=" + response.getAllocatedContainers().size()
+ + " finishedContainers="+ response.getCompletedContainersStatuses().size()
+ + " resourcelimit=" + availableResources + " knownNMs=" + clusterNmCount);
+ }
+
+ return response;
+ }
+
+ @Override
+ public void handle(RMCommunicatorEvent rawEvent) {
+ switch(rawEvent.getType()) {
+ case CONTAINER_DEALLOCATE:
+ RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent) rawEvent;
+ releaseLock.lock();
+ try {
+ numContainerReleaseRequests++;
+ release.add(event.getContainerId());
+ } finally {
+ releaseLock.unlock();
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+
+ private List<ContainerId> cloneAndClearReleaseList() {
+ ArrayList<ContainerId> clonedReleaseList;
+ releaseLock.lock();
+ try {
+ if (release.size() == 0) {
+ return emptyReleaseList;
+ }
+ clonedReleaseList = new ArrayList<ContainerId>(release);
+ release.clear();
+ return clonedReleaseList;
+ } finally {
+ releaseLock.unlock();
+ }
+ }
+
+ private List<ResourceRequest> cloneAndClearAskList() {
+ ArrayList<ResourceRequest> clonedAskList;
+ askLock.lock();
+ try {
+ if (ask.size() == 0) {
+ return emptyAskList;
+ }
+ clonedAskList = new ArrayList<ResourceRequest>(ask);
+ ask.clear();
+ return clonedAskList;
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ private void rePopulateListsOnError(List<ContainerId> clonedReleaseList,
+ List<ResourceRequest> clonedAskList) {
+ releaseLock.lock();
+ try {
+ release.addAll(clonedReleaseList);
+ } finally {
+ releaseLock.unlock();
+ }
+ askLock.lock();
+ try {
+ // Asks for a particular ressource could have changed (increased or
+ // decresed) during the failure. Re-pull the list from the
+ // remoteRequestTable. ask being a hashSet and using the same objects
+ // avoids duplicates.
+ rePopulateAskList(clonedAskList);
+ } finally {
+ askLock.unlock();
+ }
+ }
+
+ private void rePopulateAskList(List<ResourceRequest> clonedAskList) {
+ for (ResourceRequest rr : clonedAskList) {
+ addResourceRequest(rr.getPriority(), rr.getHostName(),
+ rr.getCapability(), 0);
+ }
+ }
+
+ private void logNewContainers(List<Container> newContainers) {
+ if (newContainers.size() > 0) {
+ LOG.info("Got allocated " + newContainers.size() + " containers");
+ for (Container c : newContainers) {
+ LOG.info("AllocatedContainer: " + c);
+ }
+ }
+ }
+
+ private void logFinishedContainers(List<ContainerStatus> finishedContainers) {
+ if (finishedContainers.size() > 0) {
+ LOG.info(finishedContainers.size() + " containers finished");
+ for (ContainerStatus cs : finishedContainers) {
+ LOG.info("FinihsedContainer: " + cs);
+ }
+ }
+ }
+
+ private void logUpdatedNodes(List<NodeReport> nodeReports) {
+ if (nodeReports.size() > 0) {
+ LOG.info(nodeReports.size() + " nodes changed state");
+ for (NodeReport nr : nodeReports) {
+ LOG.info("UpdatedNodeReport: " + nr);
+ }
+ }
+ }
+
+ @Private
+ Map<Priority, Map<String, Map<Resource, ResourceRequest>>> getRemoteRequestTable() {
+ return remoteRequestsTable;
+ }
+
+ @Private
+ Set<ResourceRequest> getAskSet() {
+ return ask;
+ }
+
+ @Private
+ Set<ContainerId> getReleaseSet() {
+ return release;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainer.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainer.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainer.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainer.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface AMContainer extends EventHandler<AMContainerEvent>{
+
+ public AMContainerState getState();
+ public ContainerId getContainerId();
+ public Container getContainer();
+ //TODO Rename - CompletedTaskAttempts, ideally means FAILED / KILLED as well.
+ public List<TaskAttemptId> getCompletedTaskAttempts();
+ public TaskAttemptId getRunningTaskAttempt();
+ public List<TaskAttemptId> getQueuedTaskAttempts();
+ public List<TaskAttemptId> getAllTaskAttempts();
+
+ public int getShufflePort();
+
+ // TODO Add a method to get the containers capabilities - to match taskAttempts.
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/container/AMContainerEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMContainerEvent extends AbstractEvent<AMContainerEventType> {
+
+ private final ContainerId containerId;
+
+ public AMContainerEvent(ContainerId containerId, AMContainerEventType type) {
+ super(type);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+}