You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [24/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,617 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.recover;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventDiagnosticsUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventTAUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventContainerCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventTASucceeded;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/*
+ * Recovers the completed tasks from the previous life of Application Master.
+ * The completed tasks are deciphered from the history file of the previous life.
+ * Recovery service intercepts and replay the events for completed tasks.
+ * While recovery is in progress, the scheduling of new tasks are delayed by
+ * buffering the task schedule events.
+ * The recovery service controls the clock while recovery is in progress.
+ */
+
+//TODO:
+//task cleanup for all non completed tasks
+public class RecoveryService extends CompositeService implements Recovery {
+
+ private static final Log LOG = LogFactory.getLog(RecoveryService.class);
+
+ private final ApplicationAttemptId applicationAttemptId;
+ private final OutputCommitter committer;
+ private final Dispatcher dispatcher;
+ private final ControlledClock clock;
+ private final AppContext appContext;
+
+ private JobInfo jobInfo = null;
+ private final Map<TaskId, TaskInfo> completedTasks =
+ new HashMap<TaskId, TaskInfo>();
+
+ private final List<TaskEvent> pendingTaskScheduleEvents =
+ new ArrayList<TaskEvent>();
+ private Map<ContainerId, ContainerInfo> containerInfo =
+ new HashMap<ContainerId, ContainerInfo>();
+ private Map<TaskAttemptId, ContainerId> attemptToContainerMap =
+ new HashMap<TaskAttemptId, ContainerId>();
+
+ private volatile boolean recoveryMode = false;
+
+ public RecoveryService(AppContext appContext, OutputCommitter committer) {
+ super("RecoveringDispatcher");
+ this.appContext = appContext;
+ this.applicationAttemptId = appContext.getApplicationAttemptId();
+ this.committer = committer;
+ this.dispatcher = createRecoveryDispatcher();
+ this.clock = new ControlledClock(appContext.getClock());
+ addService((Service) dispatcher);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ // parse the history file
+ try {
+ parse();
+ } catch (Exception e) {
+ LOG.warn(e);
+ LOG.warn("Could not parse the old history file. Aborting recovery. "
+ + "Starting afresh.", e);
+ }
+ if (completedTasks.size() > 0) {
+ recoveryMode = true;
+ LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+ + "TO RECOVER " + completedTasks.size());
+ LOG.info("Job launch time " + jobInfo.getLaunchTime());
+ clock.setTime(jobInfo.getLaunchTime());
+ }
+ }
+
+ @Override
+ public Dispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ @Override
+ public Clock getClock() {
+ return clock;
+ }
+
+ @Override
+ public Map<TaskId, TaskInfo> getCompletedTasks() {
+ return completedTasks;
+ }
+
+ @Override
+ public List<AMInfo> getAMInfos() {
+ if (jobInfo == null || jobInfo.getAMInfos() == null) {
+ return new LinkedList<AMInfo>();
+ }
+ List<AMInfo> amInfos = new LinkedList<AMInfo>();
+ for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
+ .getAMInfos()) {
+ AMInfo amInfo =
+ MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+ jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+ jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+ jhAmInfo.getNodeManagerHttpPort());
+
+ amInfos.add(amInfo);
+ }
+ return amInfos;
+ }
+
+ private void parse() throws IOException {
+ // TODO: parse history file based on startCount
+ String jobId =
+ TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
+ String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(
+ getConfig(), jobId);
+ FSDataInputStream in = null;
+ Path historyFile = null;
+ Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
+ new Path(jobhistoryDir));
+ FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
+ getConfig());
+ //read the previous history file
+ historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
+ histDirPath, jobId, (applicationAttemptId.getAttemptId() - 1)));
+ LOG.info("History file is at " + historyFile);
+ in = fc.open(historyFile);
+ JobHistoryParser parser = new JobHistoryParser(in);
+ jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ if (parseException != null) {
+ LOG.info("Got an error parsing job-history file " + historyFile +
+ ", ignoring incomplete events.", parseException);
+ }
+ Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+ .getAllTasks();
+ for (TaskInfo taskInfo : taskInfos.values()) {
+ if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+ completedTasks
+ .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+ LOG.info("Read from history task "
+ + TypeConverter.toYarn(taskInfo.getTaskId()));
+ }
+ }
+ LOG.info("Read completed tasks from history "
+ + completedTasks.size());
+ }
+
+ protected Dispatcher createRecoveryDispatcher() {
+ return new RecoveryDispatcher();
+ }
+
+ @SuppressWarnings("rawtypes")
+ class RecoveryDispatcher extends AsyncDispatcher {
+ // Intercept events when they're being drained from the queue - so oreder is considered.
+ private final EventHandler actualHandler;
+ private final EventHandler handler;
+
+ RecoveryDispatcher() {
+ super();
+ actualHandler = super.getEventHandler();
+ handler = new InterceptingEventHandler(actualHandler);
+ }
+
+
+ @Override
+ public void dispatch(Event event) {
+ if (recoveryMode) {
+ if (event.getType() == TaskAttemptEventType.TA_STARTED_REMOTELY) {
+ // These events are split between the intercepted handle() call, and
+ // just before the dispatch.
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+ .getTaskAttemptID());
+ LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
+ clock.setTime(attInfo.getStartTime());
+ } else if (event.getType() == TaskAttemptEventType.TA_DONE
+ || event.getType() == TaskAttemptEventType.TA_FAIL_REQUEST
+ || event.getType() == TaskAttemptEventType.TA_KILL_REQUEST) {
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+ .getTaskAttemptID());
+ LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
+ clock.setTime(attInfo.getFinishTime());
+ }
+
+ else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
+ || event.getType() == TaskEventType.T_ATTEMPT_KILLED
+ || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
+ TaskEventTAUpdate tEvent = (TaskEventTAUpdate) event;
+ LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
+ TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
+ .getTaskId());
+ taskInfo.getAllTaskAttempts().remove(
+ TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
+ // remove the task info from completed tasks if all attempts are
+ // recovered
+ if (taskInfo.getAllTaskAttempts().size() == 0) {
+ completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
+ // checkForRecoveryComplete
+ LOG.info("CompletedTasks() " + completedTasks.size());
+ if (allTasksRecovered()) {
+ if (!allContainersStopped()) {
+ stopRemainingContainers(actualHandler);
+ } else {
+ endRecovery(actualHandler);
+ }
+ }
+ }
+ } else if (event.getType() == AMSchedulerEventType.S_CONTAINER_COMPLETED) {
+ // This is the last event after a container completes. TA_TERMINATED
+ // messages to tasks would have gone out, and been processed before
+ // this. As a result, TASK_CLEAN generated by TA_TERMINATED would
+ // reach the InterceptingEventHandler (and are ignored) before this
+ // event type is dispatched.
+ // At this point, it's safe to remove this container from the
+ // containerInfo map.
+ AMSchedulerEventContainerCompleted cEvent = (AMSchedulerEventContainerCompleted)event;
+ ContainerId containerId = cEvent.getContainerId();
+ LOG.info("In Recovery, Container with id: " + containerId + " completed");
+ containerInfo.remove(containerId);
+
+ // Check if recovery is complete.
+ if (allTasksRecovered() && allContainersStopped()) {
+ endRecovery(actualHandler);
+ }
+ return; //S_CONTAINER_COMPELTED does not need to reach the scheduler.
+ }
+ }
+ realDispatch(event);
+ }
+
+ public void realDispatch(Event event) {
+ super.dispatch(event);
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return handler;
+ }
+ }
+
+ private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
+ TaskInfo taskInfo = completedTasks.get(id.getTaskId());
+ return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
+ }
+
+ protected boolean allContainersStopped() {
+ return containerInfo.size() == 0;
+ }
+
+ protected boolean allTasksRecovered() {
+ return completedTasks.size() == 0;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected void stopRemainingContainers(EventHandler eventHandler) {
+ for (ContainerId containerId : containerInfo.keySet()) {
+ eventHandler.handle(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected void endRecovery(EventHandler eventHandler) {
+ recoveryMode = false;
+ clock.reset();
+ LOG.info("Setting the recovery mode to false. " + "Recovery is complete!");
+
+ // send all pending tasks schedule events
+ for (TaskEvent tEv : pendingTaskScheduleEvents) {
+ eventHandler.handle(tEv);
+ }
+ }
+
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private class InterceptingEventHandler implements EventHandler {
+ //Intercept events before they're put onto the queue.
+ EventHandler actualHandler;
+
+ InterceptingEventHandler(EventHandler actualHandler) {
+ this.actualHandler = actualHandler;
+ }
+
+ @Override
+ public void handle(Event event) {
+ if (!recoveryMode) {
+ // delegate to the dispatcher one
+ actualHandler.handle(event);
+ return;
+ }
+
+ // Schedule previous finished attempts. Delay new ones till after recovery.
+ else if (event.getType() == TaskEventType.T_SCHEDULE) {
+ TaskEvent taskEvent = (TaskEvent) event;
+ // delay the scheduling of new tasks till previous ones are recovered
+ if (completedTasks.get(taskEvent.getTaskID()) == null) {
+ LOG.debug("Adding to pending task events "
+ + taskEvent.getTaskID());
+ pendingTaskScheduleEvents.add(taskEvent);
+ return;
+ }
+ }
+
+ // Intercept TaskAttempt start request.
+ else if (event.getType() == AMSchedulerEventType.S_TA_LAUNCH_REQUEST) {
+ TaskAttemptId aId = ((AMSchedulerTALaunchRequestEvent) event).getAttemptID();
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+ LOG.debug("TA_LAUNCH_REQUEST " + aId);
+ sendAssignedEvent(aId, attInfo, (AMSchedulerTALaunchRequestEvent)event);
+ return;
+ }
+
+ // Container Launch request. Mock and send back launched.
+ else if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+ ContainerId cId = ((NMCommunicatorLaunchRequestEvent) event)
+ .getContainerId();
+ // Simulate container launch.
+ ContainerInfo cInfo = containerInfo.get(cId);
+ actualHandler.handle(new AMContainerEventLaunched(cId, cInfo
+ .getShufflePort()));
+
+ // Simulate a pull from the TaskAttempt
+ actualHandler.handle(new AMContainerEvent(cId,
+ AMContainerEventType.C_PULL_TA));
+
+ // Inform the TaskAttempt about the assignment.
+ actualHandler.handle(new TaskAttemptEventStartedRemotely(cInfo
+ .getNextAttemptId(), cId, null, cInfo.getShufflePort()));
+
+ // TaskAttempt doesn't generate any useful event while in RUNNING. Generate events for next state here.
+ TaskAttemptId aId = cInfo.getNextAttemptId();
+
+ TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+ // send the status update event
+ sendStatusUpdateEvent(aId, attInfo);
+
+ TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
+ switch (state) {
+ case SUCCEEDED:
+ //recover the task output
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
+ attInfo.getAttemptId());
+ try {
+ TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+ int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
+ if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+ committer.recoverTask(taskContext);
+ LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+ } else {
+ LOG.info("Will not try to recover output for "
+ + taskContext.getTaskAttemptID());
+ }
+ } catch (IOException e) {
+ LOG.error("Caught an exception while trying to recover task "+aId, e);
+ actualHandler.handle(new JobEventDiagnosticsUpdate(
+ aId.getTaskId().getJobId(), "Error in recovering task output " +
+ e.getMessage()));
+ actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
+ JobEventType.INTERNAL_ERROR));
+ }
+
+ // send the done event
+ LOG.info("Sending done event to recovered attempt " + aId);
+ actualHandler.handle(new TaskAttemptEvent(aId,
+ TaskAttemptEventType.TA_DONE));
+ // TODO thh.unregister happens here. Ensure THH handles it
+ // correctly in case of recovery.
+ break;
+ case KILLED:
+ LOG.info("Sending kill event to recovered attempt " + aId);
+ actualHandler.handle(new TaskAttemptEventKillRequest(aId, ""));
+ break;
+ default:
+ LOG.info("Sending fail event to recovered attempt " + aId);
+ actualHandler.handle(new TaskAttemptEventFailRequest(aId, ""));
+ break;
+ }
+ return;
+ }
+
+ // Handle Events which may be sent to the scheduler.
+ else if (event.getType() == AMSchedulerEventType.S_TA_ENDED) {
+ // Tell the container to stop.
+ AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) event;
+ ContainerId containerId = attemptToContainerMap.get(sEvent
+ .getAttemptID());
+ switch (sEvent.getState()) {
+ case FAILED:
+ case KILLED:
+ actualHandler.handle(new AMContainerEvent(containerId,
+ AMContainerEventType.C_STOP_REQUEST));
+ return;
+ // TODO chh.unregister happens here. Ensure THH handles it
+ // correctly in case of recovery.
+ case SUCCEEDED:
+ // Inform the container that the task attempt succeeded.
+ // Leaving the event in the map - for TA failure after success.
+ actualHandler.handle(new AMContainerEventTASucceeded(containerId,
+ sEvent.getAttemptID()));
+ return;
+ // TODO tal.unregister happens here. Ensure THH handles it
+ // correctly in case of recovery.
+ default:
+ throw new YarnException("Invalid state " + sEvent.getState());
+ }
+ }
+
+ // De-allocate containers used by previous attempts immediately.
+ else if (event.getType() == NMCommunicatorEventType.CONTAINER_STOP_REQUEST) {
+ // Ignore. Unless we start relying on a successful NM.stopContainer() call.
+ NMCommunicatorEvent nEvent = (NMCommunicatorEvent)event;
+ ContainerId cId = nEvent.getContainerId();
+ ContainerStatus cs = BuilderUtils.newContainerStatus(cId,
+ ContainerState.COMPLETE, "", 0);
+ actualHandler.handle(new AMContainerEventCompleted(cs));
+ return;
+ }
+
+ // De-allocate containers used by previous attempts immediately.
+ else if (event.getType() == RMCommunicatorEventType.CONTAINER_DEALLOCATE) {
+ RMCommunicatorContainerDeAllocateRequestEvent dEvent = (RMCommunicatorContainerDeAllocateRequestEvent) event;
+ ContainerId cId = dEvent.getContainerId();
+ // exitStatus not known, diagnostics not known.
+ ContainerStatus cs = BuilderUtils.newContainerStatus(cId,
+ ContainerState.COMPLETE, "", 0);
+ actualHandler.handle(new AMContainerEventCompleted(cs));
+ return;
+ }
+
+ // Received for FAILED/KILLED tasks after C_COMPLETED.
+ else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
+ TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
+ LOG.debug("TASK_CLEAN for attemptId: " + aId);
+ return;
+ }
+
+
+ // delegate to the actual handler
+ actualHandler.handle(event);
+ }
+
+ private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
+ TaskAttemptInfo attemptInfo) {
+ LOG.info("Sending status update event to " + yarnAttemptID);
+ TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+ taskAttemptStatus.id = yarnAttemptID;
+ taskAttemptStatus.progress = 1.0f;
+ taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
+ // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
+ taskAttemptStatus.phase = Phase.CLEANUP;
+ org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
+ if (cntrs == null) {
+ taskAttemptStatus.counters = null;
+ } else {
+ taskAttemptStatus.counters = cntrs;
+ }
+ actualHandler.handle(new TaskAttemptEventStatusUpdate(
+ taskAttemptStatus.id, taskAttemptStatus));
+ }
+
+ private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
+ TaskAttemptInfo attemptInfo, AMSchedulerTALaunchRequestEvent event) {
+ LOG.info("Sending assigned event to " + yarnAttemptID);
+ ContainerId cId = attemptInfo.getContainerId();
+
+ NodeId nodeId =
+ ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+ + attemptInfo.getPort());
+ // Resource/Priority/ApplicationACLs are only needed while launching the
+ // container on an NM, these are already completed tasks, so setting them
+ // to null
+ Container container = BuilderUtils.newContainer(cId, nodeId,
+ attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
+ null, null, null);
+
+ // Track shufflePort, attemptId for container - would normally be done by the scheduler.
+ ContainerInfo cInfo = containerInfo.get(cId);
+ if (cInfo == null) {
+ cInfo = new ContainerInfo(attemptInfo.getShufflePort());
+ containerInfo.put(cId, cInfo);
+ }
+ cInfo.setAttemptId(yarnAttemptID);
+ attemptToContainerMap.put(yarnAttemptID, cId);
+
+ appContext.getAllNodes().nodeSeen(nodeId);
+ appContext.getAllContainers().addContainerIfNew(container);
+
+
+ // Request container launch for new containers.
+ if (appContext.getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+ TaskId taskId = yarnAttemptID.getTaskId();
+ AMContainerEventLaunchRequest lrEvent = new AMContainerEventLaunchRequest(
+ cId, taskId.getJobId(), taskId.getTaskType(), event.getJobToken(),
+ event.getCredentials(), false, new JobConf(appContext.getJob(
+ taskId.getJobId()).getConf()));
+ actualHandler.handle(lrEvent);
+ }
+ // Assing the task attempt to this container.
+ actualHandler.handle(new AMContainerEventAssignTA(cId, yarnAttemptID,
+ event.getRemoteTaskContext()));
+ }
+ // TODO: Handle container launch request
+ }
+
+ private static class ContainerInfo {
+ int shufflePort;
+ TaskAttemptId nextAttemptId;
+
+ ContainerInfo(int shufflePort) {
+ this.shufflePort = shufflePort;
+ }
+
+ void setAttemptId(TaskAttemptId attemptId) {
+ this.nextAttemptId = attemptId;
+ }
+
+ int getShufflePort() {
+ return shufflePort;
+ }
+
+ TaskAttemptId getNextAttemptId() {
+ return nextAttemptId;
+ }
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.recover;
+import org.apache.hadoop.classification.InterfaceAudience;
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+
+public class AMSchedulerEvent extends AbstractEvent<AMSchedulerEventType> {
+
+ // TODO Not a very useful class...
+ public AMSchedulerEvent(AMSchedulerEventType type) {
+ super(type);
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainerCompleted.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainerCompleted extends AMSchedulerEvent {
+
+ private final ContainerId containerId;
+
+ public AMSchedulerEventContainerCompleted(ContainerId containerId) {
+ super(AMSchedulerEventType.S_CONTAINER_COMPLETED);;
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainersAllocated extends AMSchedulerEvent {
+
+ private final List<ContainerId> containerIds;
+ private final boolean headRoomChanged;
+
+ // TODO Maybe distinguish between newly allocated containers and
+ // existing containers being re-used.
+ // headRoomChanged is a strange API - making an assumption about how the
+ // scheduler will use this info.
+ public AMSchedulerEventContainersAllocated(List<ContainerId> containerIds,
+ boolean headRoomChanged) {
+ super(AMSchedulerEventType.S_CONTAINERS_ALLOCATED);
+ this.containerIds = containerIds;
+ this.headRoomChanged = headRoomChanged;
+ }
+
+ public List<ContainerId> getContainerIds() {
+ return this.containerIds;
+ }
+
+ public boolean didHeadroomChange() {
+ return headRoomChanged;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class AMSchedulerEventNodeBlacklisted extends AMSchedulerEvent {
+
+ private final NodeId nodeId;
+
+ public AMSchedulerEventNodeBlacklisted(NodeId nodeId) {
+ super(AMSchedulerEventType.S_NODE_BLACKLISTED);
+ this.nodeId = nodeId;
+ }
+
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventTAEnded.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventTAEnded extends AMSchedulerEvent {
+
+ private final TaskAttemptId attemptId;
+ private final ContainerId containerId;
+ private TaskAttemptState state;
+
+ public AMSchedulerEventTAEnded(TaskAttemptId attemptId,
+ ContainerId containerId, TaskAttemptState state) {
+ super(AMSchedulerEventType.S_TA_ENDED);
+ this.attemptId = attemptId;
+ this.containerId = containerId;
+ this.state = state;
+ }
+
+ public TaskAttemptId getAttemptID() {
+ return this.attemptId;
+ }
+
+ public ContainerId getUsedContainerId() {
+ return this.containerId;
+ }
+
+ public TaskAttemptState getState() {
+ return this.state;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+public enum AMSchedulerEventType {
+ //Producer: TaskAttempt
+ S_TA_LAUNCH_REQUEST,
+ S_TA_ENDED, // Annotated with FAILED/KILLED/SUCCEEDED.
+
+ //Producer: RMCommunicator
+ S_CONTAINERS_ALLOCATED,
+
+ //Producer: Container. (Maybe RMCommunicator)
+ S_CONTAINER_COMPLETED,
+
+ //Producer: Node
+ S_NODE_BLACKLISTED,
+ S_NODE_UNHEALTHY,
+ S_NODE_HEALTHY
+ // The scheduler should have a way of knowing about unusable nodes. Acting on
+ // this information to change requests etc is scheduler specific.
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+public class AMSchedulerTALaunchRequestEvent extends AMSchedulerEvent {
+
+ // TODO Get rid of remoteTask from here. Can be forgottent after it has been assigned.
+ //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
+
+ private final TaskAttemptId attemptId;
+ private final boolean rescheduled;
+ private final Resource capability;
+ private final MRTaskContext remoteTaskContext;
+ private final TaskAttempt taskAttempt;
+ private final Credentials credentials;
+ private Token<JobTokenIdentifier> jobToken;
+ private final String[] hosts;
+ private final String[] racks;
+
+
+ public AMSchedulerTALaunchRequestEvent(TaskAttemptId attemptId,
+ boolean rescheduled, Resource capability,
+ MRTaskContext remoteTaskContext, TaskAttempt ta,
+ Credentials credentials, Token<JobTokenIdentifier> jobToken,
+ String[] hosts, String[] racks) {
+ super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
+ this.attemptId = attemptId;
+ this.rescheduled = rescheduled;
+ this.capability = capability;
+ this.remoteTaskContext = remoteTaskContext;
+ this.taskAttempt = ta;
+ this.credentials = credentials;
+ this.jobToken = jobToken;
+ this.hosts = hosts;
+ this.racks = racks;
+ }
+
+ public TaskAttemptId getAttemptID() {
+ return this.attemptId;
+ }
+
+ public Resource getCapability() {
+ return capability;
+ }
+
+ public String[] getHosts() {
+ return hosts;
+ }
+
+ public String[] getRacks() {
+ return racks;
+ }
+
+ public boolean isRescheduled() {
+ return rescheduled;
+ }
+
+ public MRTaskContext getRemoteTaskContext() {
+ return remoteTaskContext;
+ }
+
+ public TaskAttempt getTaskAttempt() {
+ return this.taskAttempt;
+ }
+
+ public Credentials getCredentials() {
+ return this.credentials;
+ }
+
+ public Token<JobTokenIdentifier> getJobToken() {
+ return this.jobToken;
+ }
+
+ // Parameter replacement: @taskid@ will not be usable
+ // ProfileTaskRange not available along with ContainerReUse
+
+ /*Requirements to determine a container request.
+ * + Data-local + Rack-local hosts.
+ * + Resource capability
+ * + Env - mapreduce.map.env / mapreduce.reduce.env can change. M/R log level.
+ * - JobConf and JobJar file - same location.
+ * - Distributed Cache - identical for map / reduce tasks at the moment.
+ * - Credentials, tokens etc are identical.
+ * + Command - dependent on map / reduce java.opts
+ */
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+// TODO XXX Rename to AMScheduler.
+public interface ContainerAllocator extends EventHandler<AMSchedulerEvent>{
+
+// enum EventType {
+//
+// CONTAINER_REQ,
+// CONTAINER_DEALLOCATE,
+// CONTAINER_FAILED
+// }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerRequestor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerRequestor extends EventHandler<RMCommunicatorEvent> {
+ public Resource getAvailableResources();
+
+ public void addContainerReq(ContainerRequest req);
+
+ public void decContainerReq(ContainerRequest req);
+
+ public Map<ApplicationAccessType, String> getApplicationACLs();
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> {
+
+ private final ContainerId containerId;
+ private final NodeId nodeId;
+ private final ContainerToken containerToken;
+
+ public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken, NMCommunicatorEventType type) {
+ super(type);
+ this.containerId = containerId;
+ this.nodeId = nodeId;
+ this.containerToken = containerToken;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
+
+ public ContainerToken getContainerToken() {
+ return this.containerToken;
+ }
+
+ public String toSrting() {
+ return super.toString() + " for container " + containerId + ", nodeId: "
+ + nodeId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((containerId == null) ? 0 : containerId.hashCode());
+ result = prime * result
+ + ((containerToken == null) ? 0 : containerToken.hashCode());
+ result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ NMCommunicatorEvent other = (NMCommunicatorEvent) obj;
+ if (containerId == null) {
+ if (other.containerId != null)
+ return false;
+ } else if (!containerId.equals(other.containerId))
+ return false;
+ if (containerToken == null) {
+ if (other.containerToken != null)
+ return false;
+ } else if (!containerToken.equals(other.containerToken))
+ return false;
+ if (nodeId == null) {
+ if (other.nodeId != null)
+ return false;
+ } else if (!nodeId.equals(other.nodeId))
+ return false;
+ return true;
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+// TODO - Re-use the events in ContainerLauncher..
+public enum NMCommunicatorEventType {
+ CONTAINER_LAUNCH_REQUEST,
+ CONTAINER_STOP_REQUEST
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+
+public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
+
+ private final ContainerLaunchContext clc;
+
+ public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
+ Container container) {
+ super(clc.getContainerId(), container.getNodeId(), container
+ .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+ this.clc = clc;
+ }
+
+ public ContainerLaunchContext getContainerLaunchContext() {
+ return this.clc;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
+
+ public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
+ ContainerToken containerToken) {
+ super(containerId, nodeId, containerToken,
+ NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,315 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.NotRunningJob;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Registers/unregisters to RM and sends heartbeats to RM.
+ */
+public abstract class RMCommunicator extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+ private int rmPollInterval;//millis
+ protected ApplicationId applicationId;
+ protected ApplicationAttemptId applicationAttemptId;
+ private AtomicBoolean stopped;
+ protected Thread allocatorThread;
+ @SuppressWarnings("rawtypes")
+ protected EventHandler eventHandler;
+ protected AMRMProtocol scheduler;
+ private final ClientService clientService;
+ protected int lastResponseID;
+ private Resource minContainerCapability;
+ private Resource maxContainerCapability;
+ protected Map<ApplicationAccessType, String> applicationACLs;
+
+ protected final AppContext context;
+ private Job job = null;
+ private JobId jobId;
+ // Has a signal (SIGTERM etc) been issued?
+ protected volatile boolean isSignalled = false;
+
+ public RMCommunicator(ClientService clientService, AppContext context) {
+ super(RMCommunicator.class.getSimpleName());
+ this.clientService = clientService;
+ this.context = context;
+ this.eventHandler = context.getEventHandler();
+ this.applicationId = context.getApplicationID();
+ this.applicationAttemptId = context.getApplicationAttemptId();
+ this.stopped = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ rmPollInterval =
+ conf.getInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
+ MRJobConfig.DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS);
+ }
+
+ @Override
+ public void start() {
+ scheduler= createSchedulerProxy();
+ register();
+ startAllocatorThread();
+ JobID id = TypeConverter.fromYarn(this.applicationId);
+ jobId = TypeConverter.toYarn(id);
+ job = getJob();
+ super.start();
+ }
+
+ protected AppContext getContext() {
+ return context;
+ }
+
+ protected Job getJob() {
+ if (job == null) {
+ job = context.getJob(jobId);
+ }
+ if (job instanceof NotRunningJob) {
+ job = null;
+ }
+ return job;
+ }
+
+ /**
+ * Get the appProgress. Can be used only after this component is started.
+ * @return the appProgress.
+ */
+ protected float getApplicationProgress() {
+ // For now just a single job. In future when we have a DAG, we need an
+ // aggregate progress.
+ Job j = getJob();
+ return (float) ((j != null) ? j.getProgress() : 0.0);
+ }
+
+ // TODO EVENTUALLY. get rid of this, when RMComma dn RMContainerRequestor are
+ // collapsed.
+ public Map<ApplicationAccessType, String> getApplicationAcls() {
+ return this.applicationACLs;
+ }
+
+ // TODO (After 3902): Get rid of the dependencies on the ClientService.
+ protected void register() {
+ //Register
+ InetSocketAddress serviceAddr = clientService.getBindAddress();
+ try {
+ RegisterApplicationMasterRequest request = Records
+ .newRecord(RegisterApplicationMasterRequest.class);
+ request.setApplicationAttemptId(applicationAttemptId);
+ request.setHost(serviceAddr.getHostName());
+ request.setRpcPort(serviceAddr.getPort());
+ request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+ RegisterApplicationMasterResponse response =
+ scheduler.registerApplicationMaster(request);
+ minContainerCapability = response.getMinimumResourceCapability();
+ maxContainerCapability = response.getMaximumResourceCapability();
+ this.context.getClusterInfo().setMinContainerCapability(
+ minContainerCapability);
+ this.context.getClusterInfo().setMaxContainerCapability(
+ maxContainerCapability);
+ this.applicationACLs = response.getApplicationACLs();
+ LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
+ LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
+ } catch (Exception are) {
+ LOG.error("Exception while registering", are);
+ throw new YarnException(are);
+ }
+ }
+
+ protected void unregister() {
+ try {
+ Job j = getJob();
+ StringBuffer sb = new StringBuffer();
+ String historyUrl = "";
+ FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+ if (j == null) {
+ finishState = FinalApplicationStatus.FAILED;
+ sb.append("Application failed due to unknown reason."
+ + " No job submitted to this MR AM");
+ } else {
+ if (j.getState() == JobState.SUCCEEDED) {
+ finishState = FinalApplicationStatus.SUCCEEDED;
+ } else if (j.getState() == JobState.KILLED
+ || (j.getState() == JobState.RUNNING && isSignalled)) {
+ finishState = FinalApplicationStatus.KILLED;
+ } else if (j.getState() == JobState.FAILED
+ || j.getState() == JobState.ERROR) {
+ finishState = FinalApplicationStatus.FAILED;
+ }
+ for (String s : j.getDiagnostics()) {
+ sb.append(s).append("\n");
+ }
+ LOG.info("Setting job diagnostics to " + sb.toString());
+
+ historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
+ context.getApplicationID());
+ LOG.info("History url is " + historyUrl);
+ }
+
+ FinishApplicationMasterRequest request = Records
+ .newRecord(FinishApplicationMasterRequest.class);
+ request.setAppAttemptId(this.applicationAttemptId);
+ request.setFinishApplicationStatus(finishState);
+ request.setDiagnostics(sb.toString());
+ request.setTrackingUrl(historyUrl);
+ scheduler.finishApplicationMaster(request);
+ } catch(Exception are) {
+ LOG.error("Exception while unregistering ", are);
+ }
+ }
+
+ protected Resource getMinContainerCapability() {
+ return minContainerCapability;
+ }
+
+ protected Resource getMaxContainerCapability() {
+ return maxContainerCapability;
+ }
+
+ @Override
+ public void stop() {
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
+ allocatorThread.interrupt();
+ try {
+ allocatorThread.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("InterruptedException while stopping", ie);
+ }
+ unregister();
+ super.stop();
+ }
+
+ protected void startAllocatorThread() {
+ allocatorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(rmPollInterval);
+ try {
+ heartbeat();
+ } catch (YarnException e) {
+ LOG.error("Error communicating with RM: " + e.getMessage() , e);
+ return;
+ } catch (Exception e) {
+ LOG.error("ERROR IN CONTACTING RM. ", e);
+ // TODO: for other exceptions
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Allocated thread interrupted. Returning.");
+ return;
+ }
+ }
+ }
+ });
+ allocatorThread.setName("RMCommunicator");
+ allocatorThread.start();
+ }
+
+ protected AMRMProtocol createSchedulerProxy() {
+ final Configuration conf = getConfig();
+ final YarnRPC rpc = YarnRPC.create(conf);
+ final InetSocketAddress serviceAddr = conf.getSocketAddr(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String tokenURLEncodedStr = System.getenv().get(
+ ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+ Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+ try {
+ token.decodeFromUrlString(tokenURLEncodedStr);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ SecurityUtil.setTokenService(token, serviceAddr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AppMasterToken is " + token);
+ }
+ currentUser.addToken(token);
+ }
+
+ return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+ @Override
+ public AMRMProtocol run() {
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+ serviceAddr, conf);
+ }
+ });
+ }
+
+ protected abstract void heartbeat() throws Exception;
+
+ public void setSignalled(boolean isSignalled) {
+ this.isSignalled = isSignalled;
+ LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+ }
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class RMCommunicatorContainerDeAllocateRequestEvent extends
+ RMCommunicatorEvent {
+
+ private final ContainerId containerId;
+
+ public RMCommunicatorContainerDeAllocateRequestEvent(ContainerId containerId) {
+ super(RMCommunicatorEventType.CONTAINER_DEALLOCATE);
+ this.containerId = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMCommunicatorEvent extends AbstractEvent<RMCommunicatorEventType> {
+
+ public RMCommunicatorEvent(RMCommunicatorEventType type) {
+ super(type);
+ }
+
+}
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+public enum RMCommunicatorEventType {
+ CONTAINER_DEALLOCATE,
+}