You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/04/08 04:54:57 UTC
svn commit: r1090092 [1/2] - in /hadoop/mapreduce/branches/MR-279:
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
mr-client/hadoop-ma...
Author: mahadev
Date: Fri Apr 8 02:54:56 2011
New Revision: 1090092
URL: http://svn.apache.org/viewvc?rev=1090092&view=rev
Log:
MAPREDUCE-2405: Implement uber-AppMaster (in-cluster LocalJobRunner for MRv2) Greg Roelofs via mahadev
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
Modified:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1090092&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Fri Apr 8 02:54:56 2011
@@ -0,0 +1,432 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMasterConstants;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * Runs the container task locally in a thread.
+ * Since all (sub)tasks share the same local directory, they must be executed
+ * sequentially in order to avoid creating/deleting the same files/dirs.
+ */
+public class LocalContainerLauncher extends AbstractService implements
+ ContainerLauncher {
+
+ private static final File curDir = new File(new String("."));
+ private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class);
+
+ private FileContext curFC = null;
+ private final HashSet<File> localizedFiles;
+ private final AppContext context;
+ private final TaskUmbilicalProtocol umbilical;
+ private Thread eventHandlingThread;
+ private BlockingQueue<ContainerLauncherEvent> eventQueue =
+ new LinkedBlockingQueue<ContainerLauncherEvent>();
+
+ public LocalContainerLauncher(AppContext context,
+ TaskUmbilicalProtocol umbilical) {
+ super(LocalContainerLauncher.class.getName());
+ this.context = context;
+ this.umbilical = umbilical;
+ // umbilical: MRAppMaster creates (taskAttemptListener), passes to us (TODO/FIXME: pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar: implement umbilical protocol but skip RPC stuff)
+
+ try {
+ curFC = FileContext.getFileContext(curDir.toURI());
+ } catch (UnsupportedFileSystemException ufse) {
+ LOG.error("Local filesystem " + curDir.toURI().toString()
+ + " is unsupported?? (should never happen)");
+ }
+
+ // Save list of files/dirs that are supposed to be present so can delete
+ // any extras created by one task before starting subsequent task. Note
+ // that there's no protection against deleted or renamed localization;
+ // users who do that get what they deserve (and will have to disable
+ // uberization in order to run correctly).
+ File[] curLocalFiles = curDir.listFiles();
+ localizedFiles = new HashSet<File>(curLocalFiles.length);
+ for (int j = 0; j < curLocalFiles.length; ++j) {
+ localizedFiles.add(curLocalFiles[j]);
+ }
+
+ // Relocalization note/future FIXME (per chrisdo, 20110315): At moment,
+ // full localization info is in AppSubmissionContext passed from client to
+ // RM and then to NM for AM-container launch: no difference between AM-
+ // localization and MapTask- or ReduceTask-localization, so can assume all
+ // OK. Longer-term, will need to override uber-AM container-localization
+ // request ("needed resources") with union of regular-AM-resources + task-
+ // resources (and, if maps and reduces ever differ, then union of all three
+ // types), OR will need localizer service/API that uber-AM can request
+ // after running (e.g., "localizeForTask()" or "localizeForMapTask()").
+ }
+
+ public void start() {
+ eventHandlingThread = new Thread(new SubtaskRunner(), "uber-SubtaskRunner");
+ eventHandlingThread.start();
+ super.start();
+ }
+
+ public void stop() {
+ eventHandlingThread.interrupt();
+ super.stop();
+ }
+
+ @Override
+ public void handle(ContainerLauncherEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e); // FIXME? YarnException is "for runtime exceptions only"
+ }
+ }
+
+
+ /*
+ * Uber-AM lifecycle/ordering ("normal" case):
+ *
+ * - [somebody] sends TA_ASSIGNED
+ * - handled by ContainerAssignedTransition (TaskAttemptImpl.java)
+ * - creates "remoteTask" for us == real Task
+ * - sends CONTAINER_REMOTE_LAUNCH
+ * - TA: UNASSIGNED -> ASSIGNED
+ * - CONTAINER_REMOTE_LAUNCH handled by LocalContainerLauncher (us!)
+ * - sucks "remoteTask" out of TaskAttemptImpl via getRemoteTask()
+ * - sends TA_CONTAINER_LAUNCHED
+ * [[ elsewhere...
+ * - TA_CONTAINER_LAUNCHED handled by LaunchedContainerTransition
+ * - registers "remoteTask" with TaskAttemptListener (== umbilical)
+ * - NUKES "remoteTask"
+ * - sends T_ATTEMPT_LAUNCHED (Task: SCHEDULED -> RUNNING)
+ * - TA: ASSIGNED -> RUNNING
+ * ]]
+ * - runs Task (runSubMap() or runSubReduce())
+ * - TA can safely send TA_UPDATE since in RUNNING state
+ * [modulo possible TA-state-machine race noted below: CHECK (TODO)]
+ */
+ private class SubtaskRunner implements Runnable {
+
+ private boolean doneWithMaps = false;
+ private int finishedSubMaps = 0;
+
+ SubtaskRunner() {
+ }
+
+ @Override
+ public void run() {
+ ContainerLauncherEvent event = null;
+
+ // _must_ either run subtasks sequentially or accept expense of new JVMs
+ // (i.e., fork()), else will get weird failures when maps try to create/
+ // write same dirname or filename: no chdir() in Java
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) { // mostly via T_KILL? JOB_KILL?
+ LOG.error("Returning, interrupted : " + e);
+ return;
+ }
+
+ LOG.info("Processing the event " + event.toString());
+
+ if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+
+ ContainerRemoteLaunchEvent launchEv =
+ (ContainerRemoteLaunchEvent)event;
+ TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME: can attemptID ever be null? (only if retrieved over umbilical?)
+
+ Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
+ int numMapTasks = job.getTotalMaps();
+ int numReduceTasks = job.getTotalReduces();
+
+ // YARN (tracking) Task:
+ org.apache.hadoop.mapreduce.v2.app.job.Task ytask =
+ job.getTask(attemptID.getTaskId());
+ // classic mapred Task:
+ org.apache.hadoop.mapred.Task remoteTask = launchEv.getRemoteTask();
+
+ // after "launching," send launched event to task attempt to move
+ // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
+ // do getRemoteTask() call first)
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter)
+
+ if (numMapTasks == 0) {
+ doneWithMaps = true;
+ }
+
+ try {
+ runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
+ (numReduceTasks > 0));
+ } catch (RuntimeException re) {
+ // this is our signal that the subtask failed in some way, so
+ // simulate a failed JVM/container and send a container-completed
+ // event to task attempt (i.e., move state machine from RUNNING
+ // to FAIL_CONTAINER_CLEANUP [and ultimately to FAILED])
+ context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+ } catch (IOException ioe) {
+ // if umbilical itself barfs (in error-handler of runSubMap()),
+ // we're pretty much hosed, so do what YarnChild main() does
+ // (i.e., exit clumsily--but can never happen, so no worries!)
+ LOG.fatal("oopsie... this can never happen: "
+ + StringUtils.stringifyException(ioe));
+ System.exit(-1);
+ }
+
+ } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
+
+ // no container to kill, so just send "cleaned" event to task attempt
+ // to move us from SUCCESS_CONTAINER_CLEANUP to SUCCEEDED state
+ // (or {FAIL|KILL}_CONTAINER_CLEANUP to {FAIL|KILL}_TASK_CLEANUP)
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(event.getTaskAttemptID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+
+ } else {
+ LOG.warn("Ignoring unexpected event " + event.toString());
+ }
+
+ }
+ }
+
+ private void runSubtask(org.apache.hadoop.mapred.Task task,
+ final TaskType taskType,
+ TaskAttemptId attemptID,
+ final int numMapTasks,
+ boolean renameOutputs)
+ throws RuntimeException, IOException {
+ org.apache.hadoop.mapred.TaskAttemptID classicAttemptID =
+ TypeConverter.fromYarn(attemptID);
+
+ try {
+ JobConf conf = new JobConf(getConfig());
+
+ // META-FIXME: do we want the extra sanity-checking (doneWithMaps,
+ // etc.), or just assume/hope the state machine(s) and uber-AM work
+ // as expected?
+ if (taskType == TaskType.MAP) {
+ if (doneWithMaps) {
+ LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+ + attemptID + "), but should be finished with maps");
+ // throw new RuntimeException() (FIXME: what's appropriate here?)
+ }
+
+ MapTask map = (MapTask)task;
+
+ //CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
+ map.run(conf, umbilical);
+
+ if (renameOutputs) {
+ renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
+ }
+ relocalize();
+
+ if (++finishedSubMaps == numMapTasks) {
+ doneWithMaps = true;
+ }
+
+ } else /* TaskType.REDUCE */ {
+
+ if (!doneWithMaps) {
+ //check if event-queue empty? whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): doesn't send reduce event until maps all done]
+ LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+ + attemptID + "), but not yet finished with maps");
+ // throw new RuntimeException() (FIXME) // or push reduce event back onto end of queue? (probably former)
+ }
+
+ ReduceTask reduce = (ReduceTask)task;
+
+ // a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
+ conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
+
+ reduce.run(conf, umbilical);
+ //relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
+ }
+
+ } catch (FSError e) {
+ LOG.fatal("FSError from child", e);
+ // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
+ umbilical.fsError(classicAttemptID, e.getMessage());
+ throw new RuntimeException();
+
+ } catch (Exception exception) {
+ LOG.warn("Exception running local (uberized) 'child' : "
+ + StringUtils.stringifyException(exception));
+ try {
+ if (task != null) {
+ // do cleanup for the task
+// if (childUGI == null) { // no need to job into doAs block
+ task.taskCleanup(umbilical);
+// } else {
+// final Task taskFinal = task;
+// childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+// @Override
+// public Object run() throws Exception {
+// taskFinal.taskCleanup(umbilical);
+// return null;
+// }
+// });
+// }
+ }
+ } catch (Exception e) {
+ LOG.info("Exception cleaning up: "
+ + StringUtils.stringifyException(e));
+ }
+ // Report back any failures, for diagnostic purposes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ exception.printStackTrace(new PrintStream(baos));
+// if (classicAttemptID != null) {
+ umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
+// }
+ throw new RuntimeException();
+
+ } catch (Throwable throwable) {
+ LOG.fatal("Error running local (uberized) 'child' : "
+ + StringUtils.stringifyException(throwable));
+// if (classicAttemptID != null) {
+ Throwable tCause = throwable.getCause();
+ String cause = (tCause == null)
+ ? throwable.getMessage()
+ : StringUtils.stringifyException(tCause);
+ umbilical.fatalError(classicAttemptID, cause);
+// }
+ throw new RuntimeException();
+
+ } finally {
+/*
+FIXME: do we need to do any of this stuff? (guessing not since not in own JVM)
+ RPC.stopProxy(umbilical);
+ DefaultMetricsSystem.shutdown();
+ // Shutting down log4j of the child-vm...
+ // This assumes that on return from Task.run()
+ // there is no more logging done.
+ LogManager.shutdown();
+ */
+ }
+ }
+
+
+/* FIXME: may not need renameMapOutputForReduce() anymore? TEST!
+
+${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for tasks;
+contains task.sh script, which, when executed, creates symlinks and sets up env
+ "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
+ "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
+ "$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done
+
+ OHO! no further need for this at all? $taskId is unique per subtask
+ now => should work fine to leave alone. TODO: test with teragen or
+ similar
+ */
+
+ /**
+ * Within the _local_ filesystem (not HDFS), all activity takes place within
+ * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
+ * and all sub-MapTasks create the same filename ("file.out"). Rename that
+ * to something unique (e.g., "map_0.out") to avoid collisions.
+ *
+ * Longer-term, we'll modify [something] to use TaskAttemptID-based
+ * filenames instead of "file.out". (All of this is entirely internal,
+ * so there are no particular compatibility issues.)
+ */
+ private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
+ MapOutputFile subMapOutputFile)
+ throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ // move map output to reduce input
+ Path mapOut = subMapOutputFile.getOutputFile();
+ Path reduceIn = subMapOutputFile.getInputFileForWrite(
+ TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut));
+ if (!localFs.mkdirs(reduceIn.getParent())) {
+ throw new IOException("Mkdirs failed to create "
+ + reduceIn.getParent().toString());
+ }
+ if (!localFs.rename(mapOut, reduceIn))
+ throw new IOException("Couldn't rename " + mapOut);
+ }
+
+ /**
+ * Also within the local filesystem, we need to restore the initial state
+ * of the directory as much as possible. Compare current contents against
+ * the saved original state and nuke everything that doesn't belong, with
+ * the exception of the renamed map outputs (see above).
+FIXME: do we really need to worry about renamed map outputs, or already moved to output dir on commit? if latter, fix comment
+ *
+ * Any jobs that go out of their way to rename or delete things from the
+ * local directory are considered broken and deserve what they get...
+ */
+ private void relocalize() {
+ File[] curLocalFiles = curDir.listFiles();
+ for (int j = 0; j < curLocalFiles.length; ++j) {
+ if (!localizedFiles.contains(curLocalFiles[j])) {
+ // found one that wasn't there before: delete it
+ boolean deleted = false;
+ try {
+ if (curFC != null) {
+ // this is recursive, unlike File delete():
+ deleted = curFC.delete(new Path(curLocalFiles[j].getName()),true);
+ }
+ } catch (Exception e) {
+ deleted = false;
+ }
+ if (!deleted) {
+ LOG.warn("Unable to delete unexpected local file/dir "
+ + curLocalFiles[j].getName() + ": insufficient permissions?");
+ }
+ }
+ }
+ }
+
+ } // end SubtaskRunner
+
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Apr 8 02:54:56 2011
@@ -31,7 +31,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
@@ -53,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
@@ -111,6 +114,8 @@ public class MRAppMaster extends Composi
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
+ private Job job;
+
public MRAppMaster(ApplicationId applicationId) {
this(applicationId, null);
}
@@ -139,26 +144,19 @@ public class MRAppMaster extends Composi
//service to do the task cleanup
taskCleaner = createTaskCleaner(context);
addIfService(taskCleaner);
- //service to launch allocated containers via NodeManager
- containerLauncher = createContainerLauncher(context);
- addIfService(containerLauncher);
-
+
//service to handle requests from JobClient
clientService = createClientService(context);
addIfService(clientService);
- //service to allocate containers from RM
- containerAllocator = createContainerAllocator(clientService, context);
- addIfService(containerAllocator);
-
//service to log job history events
EventHandler<JobHistoryEvent> historyService =
- createJobHistoryHandler(conf);
+ createJobHistoryHandler(conf);
+
+ JobEventDispatcher synchronousJobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
- dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
- dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
- dispatcher.register(JobEventType.class, new JobEventDispatcher());
+ dispatcher.register(JobEventType.class, synchronousJobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
@@ -178,7 +176,118 @@ public class MRAppMaster extends Composi
}
super.init(conf);
- }
+
+ //---- start of what used to be startJobs() code:
+
+ Configuration config = getConfig();
+
+ job = createJob(config);
+
+ /** create a job event for job intialization */
+ JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+ /** send init to the job (this does NOT trigger job execution) */
+ synchronousJobEventDispatcher.handle(initJobEvent);
+
+ // JobImpl's InitTransition is done (call above is synchronous), so the
+ // "uber-decision" (MR-1220) has been made. Query job and switch to
+ // ubermode if appropriate (by registering different container-allocator
+ // and container-launcher services/event-handlers).
+
+ if (job.isUber()) {
+ LOG.info("MRAppMaster uberizing job " + job.getID()
+ + " in local container (\"uber-AM\").");
+ } else {
+ LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ + "job " + job.getID() + ".");
+ }
+
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ containerAllocator =
+ createContainerAllocator(clientService, context, job.isUber());
+ addIfService(containerAllocator);
+ dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+ if (containerAllocator instanceof Service) {
+ ((Service) containerAllocator).init(config);
+ }
+
+ // corresponding service to launch allocated containers via NodeManager
+ containerLauncher = createContainerLauncher(context, job.isUber());
+ addIfService(containerLauncher);
+ dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+ if (containerLauncher instanceof Service) {
+ ((Service) containerLauncher).init(config);
+ }
+
+ } // end of init()
+
+ /** Create and initialize (but don't start) a single job. */
+ public Job createJob(Configuration conf) {
+ Credentials fsTokens = new Credentials();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // Read the file-system tokens from the localized tokens-file.
+ try {
+ Path jobSubmitDir =
+ FileContext.getLocalFSFileContext().makeQualified(
+ new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR)
+ .getAbsolutePath()));
+ Path jobTokenFile =
+ new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
+ fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
+ LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
+ + jobTokenFile);
+
+ UserGroupInformation currentUser =
+ UserGroupInformation.getCurrentUser();
+ for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
+ LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
+ + "in current ugi in the AppMaster for service "
+ + tk.getService());
+ currentUser.addToken(tk); // For use by AppMaster itself.
+ }
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ // create single job
+ Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
+ taskAttemptListener, jobTokenSecretManager, fsTokens);
+ ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
+
+ dispatcher.register(JobFinishEvent.Type.class,
+ new EventHandler<JobFinishEvent>() {
+ @Override
+ public void handle(JobFinishEvent event) {
+ // job has finished
+ // this is the only job, so shut down the Appmaster
+ // note in a workflow scenario, this may lead to creation of a new
+ // job (FIXME?)
+
+ // TODO:currently just wait for some time so clients can know the
+ // final states. Will be removed once RM come on.
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ LOG.info("Calling stop for all the services");
+ try {
+ stop();
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
+ //TODO: this is required because rpc server does not shut down
+ // in spite of calling server.stop().
+ //Bring the process down by force.
+ //Not needed after HADOOP-7140
+ LOG.info("Exiting MR AppMaster..GoodBye!");
+ System.exit(0);
+ }
+ });
+
+ return newJob;
+ } // end createJob()
static class NullSpeculatorEventHandler
implements EventHandler<SpeculatorEvent> {
@@ -245,14 +354,20 @@ public class MRAppMaster extends Composi
return new TaskCleanerImpl(context);
}
- protected ContainerLauncher createContainerLauncher(AppContext context) {
- return new ContainerLauncherImpl(context);
- }
-
- protected ContainerAllocator createContainerAllocator(ClientService
- clientService, AppContext context) {
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context, boolean isLocal) {
//return new StaticContainerAllocator(context);
- return new RMContainerAllocator(clientService, context);
+ return isLocal
+ ? new LocalContainerAllocator(clientService, context)
+ : new RMContainerAllocator(clientService, context);
+ }
+
+ protected ContainerLauncher createContainerLauncher(AppContext context,
+ boolean isLocal) {
+ return isLocal
+ ? new LocalContainerLauncher(context,
+ (TaskUmbilicalProtocol) taskAttemptListener)
+ : new ContainerLauncherImpl(context);
}
//TODO:should have an interface for MRClientService
@@ -292,7 +407,7 @@ public class MRAppMaster extends Composi
class RunningAppContext implements AppContext {
private Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
-
+
@Override
public ApplicationId getApplicationID() {
return appID;
@@ -330,80 +445,17 @@ public class MRAppMaster extends Composi
/**
* This can be overridden to instantiate multiple jobs and create a
* workflow.
+ *
+ * TODO: Rework the design to actually support this. Currently much of the
+ * job stuff has been moved to init() above to support uberization (MR-1220).
+ * In a typical workflow, one presumably would want to uberize only a subset
+ * of the jobs (the "small" ones), which is awkward with the current design.
*/
protected void startJobs() {
-
- Configuration config = getConfig();
-
- Credentials fsTokens = new Credentials();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- // Read the file-system tokens from the localized tokens-file.
- try {
- Path jobSubmitDir =
- FileContext.getLocalFSFileContext().makeQualified(
- new Path(new File(YARNApplicationConstants.JOB_SUBMIT_DIR)
- .getAbsolutePath()));
- Path jobTokenFile =
- new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
- fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, config));
- LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
- + jobTokenFile);
-
- UserGroupInformation currentUser =
- UserGroupInformation.getCurrentUser();
- for (Token<? extends TokenIdentifier> tk : fsTokens.getAllTokens()) {
- LOG.info(" --- DEBUG: Token of kind " + tk.getKind()
- + "in current ugi in the AppMaster for service "
- + tk.getService());
- currentUser.addToken(tk); // For use by AppMaster itself.
- }
- } catch (IOException e) {
- throw new YarnException(e);
- }
- }
-
- //create single job
- Job job =
- new JobImpl(appID, config, dispatcher.getEventHandler(),
- taskAttemptListener, jobTokenSecretManager, fsTokens);
- ((RunningAppContext) context).jobs.put(job.getID(), job);
-
- dispatcher.register(JobFinishEvent.Type.class,
- new EventHandler<JobFinishEvent>() {
- @Override
- public void handle(JobFinishEvent event) {
- // job has finished
- // this is the only job, so shutdown the Appmaster
- // note in a workflow scenario, this may lead to creation of a new
- // job
-
- // TODO:currently just wait for sometime so clients can know the
- // final states. Will be removed once RM come on.
- try {
- Thread.sleep(15000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- LOG.info("Calling stop for all the services");
- try {
- stop();
- } catch (Throwable t) {
- LOG.warn("Graceful stop failed ", t);
- }
- //TODO: this is required because rpc server does not shutdown
- //inspite of calling server.stop().
- //Bring the process down by force.
- //Not needed after HADOOP-7140
- LOG.info("Exiting MR AppMaster..GoodBye!");
- System.exit(0);
- }
- });
-
- /** create a job event for job intialization **/
- JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
- /** send init on the job. this triggers the job execution.**/
- dispatcher.getEventHandler().handle(initJobEvent);
+ /** create a job-start event to get this ball rolling */
+ JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
+ /** send the job-start event. this triggers the job execution. */
+ dispatcher.getEventHandler().handle(startJobEvent);
}
private class JobEventDispatcher implements EventHandler<JobEvent> {
@@ -411,7 +463,8 @@ public class MRAppMaster extends Composi
public void handle(JobEvent event) {
((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
}
- }
+ }
+
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Fri Apr 8 02:54:56 2011
@@ -48,6 +48,7 @@ public interface Job {
int getTotalReduces();
int getCompletedMaps();
int getCompletedReduces();
+ boolean isUber();
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Fri Apr 8 02:54:56 2011
@@ -28,11 +28,12 @@ public enum JobEventType {
//Producer:MRAppMaster
JOB_INIT,
+ JOB_START,
//Producer:Task
JOB_TASK_COMPLETED,
JOB_MAP_TASK_RESCHEDULED,
- JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+ JOB_TASK_ATTEMPT_COMPLETED_EVENT, // why "_EVENT" only on this one?
//Producer:Any component
JOB_DIAGNOSTIC_UPDATE,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Apr 8 02:54:56 2011
@@ -53,6 +53,8 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
+import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -111,10 +113,10 @@ public class JobImpl implements org.apac
private static final Log LOG = LogFactory.getLog(JobImpl.class);
public static final
- float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+ float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
- //The maximum percentage of fetch failures allowed for a map
- private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
+ //The maximum fraction of fetch failures allowed for a map
+ private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -153,12 +155,14 @@ public class JobImpl implements org.apac
private final Map<TaskAttemptId, Integer> fetchFailuresMapping =
new HashMap<TaskAttemptId, Integer>();
- private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
- new TaskAttemptCompletedEventTransition();
- private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION =
- new DiagnosticsUpdateTransition();
- private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION =
- new InternalErrorTransition();
+ private static final DiagnosticsUpdateTransition
+ DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final InternalErrorTransition
+ INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final TaskAttemptCompletedEventTransition
+ TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
+ new TaskAttemptCompletedEventTransition();
+
protected static final
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
stateMachineFactory
@@ -170,31 +174,45 @@ public class JobImpl implements org.apac
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition
- (JobState.NEW,
- EnumSet.of(JobState.RUNNING, JobState.FAILED),
+ (JobState.NEW,
+ EnumSet.of(JobState.INITED, JobState.FAILED),
JobEventType.JOB_INIT,
new InitTransition())
.addTransition(JobState.NEW, JobState.KILLED,
- JobEventType.JOB_KILL, new KillNewJobTransition())
- .addTransition(
- JobState.NEW,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
+ JobEventType.JOB_KILL,
+ new KillNewJobTransition())
+ .addTransition(JobState.NEW, JobState.ERROR,
+ JobEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
+ // Transitions from INITED state
+ .addTransition(JobState.INITED, JobState.INITED,
+ JobEventType.JOB_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobState.INITED, JobState.RUNNING,
+ JobEventType.JOB_START,
+ new StartTransition())
+ .addTransition(JobState.INITED, JobState.KILLED,
+ JobEventType.JOB_KILL,
+ new KillInitedJobTransition())
+ .addTransition(JobState.INITED, JobState.ERROR,
+ JobEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
.addTransition(JobState.RUNNING, JobState.RUNNING,
- JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
- TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT,
+ TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
.addTransition
- (JobState.RUNNING,
+ (JobState.RUNNING,
EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition())
.addTransition(JobState.RUNNING, JobState.KILL_WAIT,
- JobEventType.JOB_KILL, new KillTasksTransition())
+ JobEventType.JOB_KILL, new KillTasksTransition())
.addTransition(JobState.RUNNING, JobState.RUNNING,
- JobEventType.JOB_MAP_TASK_RESCHEDULED,
- new MapTaskRescheduledTransition())
+ JobEventType.JOB_MAP_TASK_RESCHEDULED,
+ new MapTaskRescheduledTransition())
.addTransition(JobState.RUNNING, JobState.RUNNING,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
@@ -203,12 +221,12 @@ public class JobImpl implements org.apac
new TaskAttemptFetchFailureTransition())
.addTransition(
JobState.RUNNING,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from KILL_WAIT state.
.addTransition
- (JobState.KILL_WAIT,
+ (JobState.KILL_WAIT,
EnumSet.of(JobState.KILL_WAIT, JobState.KILLED),
JobEventType.JOB_TASK_COMPLETED,
new KillWaitTaskCompletedTransition())
@@ -220,7 +238,7 @@ public class JobImpl implements org.apac
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(
JobState.KILL_WAIT,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
@@ -234,7 +252,7 @@ public class JobImpl implements org.apac
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(
JobState.SUCCEEDED,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
@@ -247,7 +265,7 @@ public class JobImpl implements org.apac
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(
JobState.FAILED,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.FAILED, JobState.FAILED,
@@ -260,7 +278,7 @@ public class JobImpl implements org.apac
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(
JobState.KILLED,
- JobState.ERROR, JobEventType.INTERNAL_ERROR,
+ JobState.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Ignore-able events
.addTransition(JobState.KILLED, JobState.KILLED,
@@ -300,6 +318,7 @@ public class JobImpl implements org.apac
private float setupProgress;
private float cleanupProgress;
private boolean reducesScheduled;
+ private boolean isUber = false;
private Credentials fsTokens;
private Token<JobTokenIdentifier> jobToken;
@@ -375,6 +394,11 @@ public class JobImpl implements org.apac
}
@Override
+ public boolean isUber() {
+ return isUber;
+ }
+
+ @Override
public Counters getCounters() {
Counters counters = newCounters();
// TODO: compute job level counters
@@ -602,16 +626,25 @@ public class JobImpl implements org.apac
@Override
public int getTotalMaps() {
- return mapTasks.size();
+ return mapTasks.size(); //FIXME: why indirection? return numMapTasks...
+ // unless race? how soon can this get called?
}
@Override
public int getTotalReduces() {
- return reduceTasks.size();
+ return reduceTasks.size(); //FIXME: why indirection? return numReduceTasks
}
public static class InitTransition
- implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+ implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
+
+ /**
+ * Note that this transition method is called directly (and synchronously)
+ * by MRAppMaster's init() method (i.e., no RPC, no thread-switching;
+ * just plain sequential call within AM context), so we can trigger
+ * modifications in AM state from here (at least, if AM is written that
+ * way; MR version is).
+ */
@Override
public JobState transition(JobImpl job, JobEvent event) {
job.startTime = System.currentTimeMillis();
@@ -644,28 +677,96 @@ public class JobImpl implements org.apac
"test", constructJobACLs(job.conf),
job.conf.get(MRJobConfig.QUEUE_NAME,"test"));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
- TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job,
- job.jobId);
+
+ TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
- job.numReduceTasks =
- job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
checkTaskLimits();
+ long inputLength = 0;
+ for (int i = 0; i < job.numMapTasks; ++i) {
+ inputLength += taskSplitMetaInfo[i].getInputDataLength();
+ }
+
+//FIXME: need new memory criterion for uber-decision (oops, too late here; until AM-resizing supported, must depend on job client to pass fat-slot needs)
+ // these are no longer "system" settings, necessarily; user may override
+ int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
+ int sysMaxReduces =
+ job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+ long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
+ job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is wrong; get FS from [File?]InputFormat and default block size from that
+ //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot(); // FIXME [could use default AM-container memory size...]
+
+ boolean uberEnabled =
+ job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+ boolean smallNumMapTasks = (job.numMapTasks <= sysMaxMaps);
+ boolean smallNumReduceTasks = (job.numReduceTasks <= sysMaxReduces);
+ boolean smallInput = (inputLength <= sysMaxBytes);
+ boolean smallMemory = true; //FIXME (see above)
+ // ignoring overhead due to UberTask and statics as negligible here:
+// FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
+// || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
+ boolean notChainJob = !isChainJob(job.conf);
+
+ // User has overall veto power over uberization, or user can modify
+ // limits (overriding system settings and potentially shooting
+ // themselves in the head). Note that ChainMapper/Reducer are
+ // fundamentally incompatible with MR-1220; they employ a blocking
+
+ // User has overall veto power over uberization, or user can modify
+ // limits (overriding system settings and potentially shooting
+ // themselves in the head). Note that ChainMapper/Reducer are
+ // fundamentally incompatible with MR-1220; they employ a blocking
+ // queue between the maps/reduces and thus require parallel execution,
+ // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
+ // and thus requires sequential execution.
+ job.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
+ && smallInput && smallMemory && notChainJob;
+
+ if (job.isUber) {
+ LOG.info("Uberizing job " + job.jobId + ": " + job.numMapTasks + "m+"
+ + job.numReduceTasks + "r tasks (" + inputLength
+ + " input bytes) will run sequentially on single node.");
+ //TODO: also note which node?
+
+ // make sure reduces are scheduled only after all map are completed
+ job.conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
+ 1.0f);
+ // uber-subtask attempts all get launched on same node; if one fails,
+ // probably should retry elsewhere, i.e., move entire uber-AM: ergo,
+ // limit attempts to 1 (or at most 2? probably not...)
+ job.conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+ // disable speculation: makes no sense to speculate an entire job
+// canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old version, ultimately was from conf.getMapSpeculativeExecution(), conf.getReduceSpeculativeExecution()]
+ } else {
+ StringBuilder msg = new StringBuilder();
+ msg.append("Not uberizing ").append(job.jobId).append(" because:");
+ if (!uberEnabled)
+ msg.append(" not enabled;");
+ if (!smallNumMapTasks)
+ msg.append(" too many maps;");
+ if (!smallNumReduceTasks)
+ msg.append(" too many reduces;");
+ if (!smallInput)
+ msg.append(" too much input;");
+ if (!smallMemory)
+ msg.append(" too much RAM;");
+ if (!notChainJob)
+ msg.append(" chainjob");
+ LOG.info(msg.toString());
+ }
+
job.taskAttemptCompletionEvents =
- new ArrayList<TaskAttemptCompletionEvent>(
- job.numMapTasks + job.numReduceTasks
- + 10);
-
- createMapTasks(job, taskSplitMetaInfo);
- job.allowedMapFailuresPercent = job.conf.getInt(
- MRJobConfig.MAP_FAILURES_MAX_PERCENT,
- 0);
+ new ArrayList<TaskAttemptCompletionEvent>(
+ job.numMapTasks + job.numReduceTasks + 10);
- createReduceTasks(job);
- job.allowedReduceFailuresPercent = job.conf.getInt(
- MRJobConfig.REDUCE_FAILURES_MAXPERCENT,
- 0);
+ job.allowedMapFailuresPercent =
+ job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
+ job.allowedReduceFailuresPercent =
+ job.conf.getInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);
// Calculate the minimum number of maps to be complete before
// we should start scheduling reduces
@@ -676,21 +777,16 @@ public class JobImpl implements org.apac
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
job.numMapTasks));
- //do the setup
+ // do the setup
job.committer.setupJob(job.jobContext);
job.setupProgress = 1.0f;
- //schedule the maps
- job.scheduleTasks(job.mapTasks);
- JobInitedEvent jie =
- new JobInitedEvent(TypeConverter.fromYarn(job.jobId),
- job.startTime,
- job.numMapTasks, job.numReduceTasks,
- false, 0, 0,
- JobState.NEW.toString());
- job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+ // create the Tasks but don't start them yet
+ createMapTasks(job, inputLength, taskSplitMetaInfo);
+ createReduceTasks(job);
+
+ return JobState.INITED;
- return JobState.RUNNING;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
@@ -709,9 +805,9 @@ public class JobImpl implements org.apac
job.remoteJobSubmitDir =
FileSystem.get(job.conf).makeQualified(
- new Path(job.conf
- .get(YARNApplicationConstants.APPS_STAGING_DIR_KEY),
- job.oldJobId.toString()));
+ new Path(
+ job.conf.get(YARNApplicationConstants.APPS_STAGING_DIR_KEY),
+ oldJobIDString));
job.remoteJobConfFile =
new Path(job.remoteJobSubmitDir,
YARNApplicationConstants.JOB_CONF_FILE);
@@ -742,14 +838,42 @@ public class JobImpl implements org.apac
new Path(job.remoteJobSubmitDir,
YarnConfiguration.APPLICATION_TOKENS_FILE);
tokenStorage.writeTokenStorageFile(remoteJobTokenFile, job.conf);
- LOG.info("Written back the job-token file on the remote file system:"
+ LOG.info("Writing back the job-token file on the remote file system:"
+ remoteJobTokenFile.toString());
}
- private void createMapTasks(JobImpl job, TaskSplitMetaInfo[] splits) {
- long inputLength = 0;
- for(int i=0; i < job.numMapTasks; ++i) {
- inputLength += splits[i].getInputDataLength();
+ /**
+ * ChainMapper and ChainReducer must execute in parallel, so they're not
+ * compatible with uberization/LocalContainerLauncher (100% sequential).
+ */
+ boolean isChainJob(Configuration conf) {
+ boolean isChainJob = false;
+ try {
+ String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
+ if (mapClassName != null) {
+ Class<?> mapClass = Class.forName(mapClassName);
+ if (ChainMapper.class.isAssignableFrom(mapClass))
+ isChainJob = true;
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // don't care; assume it's not derived from ChainMapper
+ }
+ try {
+ String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
+ if (reduceClassName != null) {
+ Class<?> reduceClass = Class.forName(reduceClassName);
+ if (ChainReducer.class.isAssignableFrom(reduceClass))
+ isChainJob = true;
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // don't care; assume it's not derived from ChainReducer
+ }
+ return isChainJob;
+ }
+
+ private void createMapTasks(JobImpl job, long inputLength,
+ TaskSplitMetaInfo[] splits) {
+ for (int i=0; i < job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
@@ -774,8 +898,8 @@ public class JobImpl implements org.apac
job.fsTokens.getAllTokens());
job.addTask(task);
}
- LOG.info("Number of reduces for job " + job.
- jobId + " = " + job.numReduceTasks);
+ LOG.info("Number of reduces for job " + job.jobId + " = "
+ + job.numReduceTasks);
}
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
@@ -798,6 +922,26 @@ public class JobImpl implements org.apac
private void checkTaskLimits() {
// no code, for now
}
+ } // end of InitTransition
+
+ public static class StartTransition
+ implements SingleArcTransition<JobImpl, JobEvent> {
+ /**
+ * This transition executes in the event-dispatcher thread, though it's
+ * triggered in MRAppMaster's startJobs() method.
+ */
+ @Override
+ public void transition(JobImpl job, JobEvent event) {
+ job.startTime = System.currentTimeMillis();
+ job.scheduleTasks(job.mapTasks); // schedule (i.e., start) the maps
+ JobInitedEvent jie =
+ new JobInitedEvent(TypeConverter.fromYarn(job.jobId),
+ job.startTime,
+ job.numMapTasks, job.numReduceTasks,
+ job.isUber, 0, 0, // FIXME: lose latter two args again (old-style uber junk: needs to go along with 98% of other old-style uber junk)
+ JobState.NEW.toString());
+ job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+ }
}
private void abortJob(
@@ -817,23 +961,31 @@ public class JobImpl implements org.apac
eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
}
- private static class KillNewJobTransition
- implements SingleArcTransition<JobImpl, JobEvent> {
+ // Task-start has been moved out of InitTransition, so this arc simply
+ // hardcodes 0 for both map and reduce finished tasks.
+ private static class KillNewJobTransition
+ implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(job.jobId),
- job.finishTime,
- job.succeededMapTaskCount,
- job.numReduceTasks, //TODO finishedReduceTasks
+ job.finishTime, 0, 0,
org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
+ job.finished();
+ }
+ }
+ private static class KillInitedJobTransition
+ implements SingleArcTransition<JobImpl, JobEvent> {
+ @Override
+ public void transition(JobImpl job, JobEvent event) {
+ job.abortJob(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
job.finished();
}
}
- private static class KillTasksTransition
+ private static class KillTasksTransition
implements SingleArcTransition<JobImpl, JobEvent> {
@Override
public void transition(JobImpl job, JobEvent event) {
@@ -893,11 +1045,9 @@ public class JobImpl implements org.apac
float failureRate = (float) fetchFailures / runningReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
- boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT)
- ? true
- : false;
- if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
- && isMapFaulty) {
+ boolean isMapFaulty =
+ (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
+ if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId,
@@ -914,7 +1064,7 @@ public class JobImpl implements org.apac
@Override
public JobState transition(JobImpl job, JobEvent event) {
job.completedTaskCount++;
- LOG.info("No of completed Tasks:" + job.completedTaskCount);
+ LOG.info("Num completed Tasks: " + job.completedTaskCount);
JobTaskEvent taskEvent = (JobTaskEvent) event;
Task task = job.tasks.get(taskEvent.getTaskID());
if (taskEvent.getState() == TaskState.SUCCEEDED) {
@@ -970,7 +1120,7 @@ public class JobImpl implements org.apac
new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
job.finishTime,
job.succeededMapTaskCount, job.numReduceTasks, job.failedMapTaskCount,
- job.numReduceTasks, //TODO replace finsihedReduceTasks
+ job.numReduceTasks, //TODO replace finishedReduceTasks
TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounter
TypeConverter.fromYarn(job.getCounters()), // TODO reduceCounters
TypeConverter.fromYarn(job.getCounters()));
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Apr 8 02:54:56 2011
@@ -419,7 +419,7 @@ public abstract class TaskAttemptImpl im
memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
}
- return 1024;
+ return 1024; //FIXME: why not "return memory;" ?
}
private static LocalResource getLocalResource(FileContext fc, Path file,
@@ -443,7 +443,7 @@ public abstract class TaskAttemptImpl im
FileContext remoteFS = FileContext.getFileContext(conf);
Path localizedJobConf = new Path(YARNApplicationConstants.JOB_CONF_FILE);
- remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!!
+ remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!! (presumably this means "FIXME"...)
URL jobConfFileOnRemoteFS = ConverterUtils.getYarnUrlFromPath(localizedJobConf);
LOG.info("The job-conf file on the remote FS is " + jobConfFileOnRemoteFS);
@@ -842,6 +842,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.containerID = cEvent.getContainerID();
taskAttempt.containerMgrAddress = cEvent.getContainerManagerAddress();
taskAttempt.containerToken = cEvent.getContainerToken();
+ // this is a _real_ Task (classic Hadoop mapred flavor):
taskAttempt.remoteTask = taskAttempt.createRemoteTask();
taskAttempt.jvmID = new WrappedJvmID(
taskAttempt.remoteTask.getTaskID().getJobID(),
@@ -858,7 +859,7 @@ public abstract class TaskAttemptImpl im
return taskAttempt.getContainer();
}
@Override
- public Task getRemoteTask() {
+ public Task getRemoteTask() { // classic mapred Task, not YARN version
return taskAttempt.remoteTask;
}
});
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri Apr 8 02:54:56 2011
@@ -189,7 +189,8 @@ public class ContainerLauncherImpl exten
startRequest.setContainerLaunchContext(containerLaunchContext);
proxy.startContainer(startRequest);
- // after launching send launched event to taskattempt
+ // after launching, send launched event to task attempt to move
+ // it from ASSIGNED to RUNNING state
context.getEventHandler().handle(
new TaskAttemptEvent(launchEv.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
@@ -201,9 +202,9 @@ public class ContainerLauncherImpl exten
}
break;
- case CONTAINER_REMOTE_CLEANUP:
- // We will have to remove the launch event if it is still in eventQueue
+ case CONTAINER_REMOTE_CLEANUP:
+ // We will have to remove the launch (meant "cleanup"? FIXME) event if it is still in eventQueue
// and not yet processed
if (eventQueue.contains(event)) {
eventQueue.remove(event); // TODO: Any synchro needed?
@@ -228,10 +229,10 @@ public class ContainerLauncherImpl exten
e);
}
- // after killing send killed event to taskattempt
- context.getEventHandler().handle(
- new TaskAttemptEvent(event.getTaskAttemptID(),
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ // after killing, send killed event to taskattempt
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(event.getTaskAttemptID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
}
break;
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1090092&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Fri Apr 8 02:54:56 2011
@@ -0,0 +1,77 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.local;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Allocates containers locally. Doesn't allocate a real container;
+ * instead sends an allocated event for all requests.
+ */
+public class LocalContainerAllocator extends RMCommunicator
+ implements ContainerAllocator {
+
+ private static final Log LOG =
+ LogFactory.getLog(LocalContainerAllocator.class);
+
+ private final EventHandler eventHandler;
+ private final ApplicationId appID;
+ private AtomicInteger containerCount = new AtomicInteger();
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ public LocalContainerAllocator(ClientService clientService,
+ AppContext context) {
+ super(clientService, context);
+ this.eventHandler = context.getEventHandler();
+ this.appID = context.getApplicationID();
+ }
+
+ @Override
+ public void handle(ContainerAllocatorEvent event) {
+ if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+ LOG.info("Processing the event " + event.toString());
+ ContainerId cID = recordFactory.newRecordInstance(ContainerId.class);
+ cID.setAppId(appID);
+ // use negative ids to denote that these are local. Need a better way ??
+ cID.setId((-1) * containerCount.getAndIncrement());
+ // send the container-assigned event to task attempt
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+ event.getAttemptID(), cID,
+ "localhost",//put the AppMaster hostname (TODO)
+ null));
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1090092&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Apr 8 02:54:56 2011
@@ -0,0 +1,225 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * Registers/unregisters to RM and sends heartbeats to RM.
+ */
+public class RMCommunicator extends AbstractService {
+ private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+ private static int rmPollInterval;//millis
+ protected ApplicationId applicationId;
+ private volatile boolean stopped;
+ protected Thread allocatorThread;
+ protected EventHandler eventHandler;
+ private ApplicationMaster applicationMaster;
+ protected AMRMProtocol scheduler;
+ private final ClientService clientService;
+ private int lastResponseID;
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ public RMCommunicator(ClientService clientService, AppContext context) {
+ super("RMCommunicator");
+ this.clientService = clientService;
+ this.eventHandler = context.getEventHandler();
+ this.applicationId = context.getApplicationID();
+ this.applicationMaster =
+ recordFactory.newRecordInstance(ApplicationMaster.class);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ rmPollInterval = conf.getInt(YarnConfiguration.AM_EXPIRY_INTERVAL, 10000)/3;
+ }
+
+ @Override
+ public void start() {
+ scheduler= createSchedulerProxy();
+ //LOG.info("Scheduler is " + scheduler);
+ register();
+ startAllocatorThread();
+ super.start();
+ }
+
+ protected void register() {
+ //Register
+ applicationMaster.setApplicationId(applicationId);
+ applicationMaster.setHost(
+ clientService.getBindAddress().getAddress().getHostAddress());
+ applicationMaster.setRpcPort(clientService.getBindAddress().getPort());
+ applicationMaster.setState(ApplicationState.RUNNING);
+ applicationMaster.setHttpPort(clientService.getHttpPort());
+ applicationMaster.setStatus(
+ recordFactory.newRecordInstance(ApplicationStatus.class));
+ applicationMaster.getStatus().setApplicationId(applicationId);
+ applicationMaster.getStatus().setProgress(0.0f);
+ try {
+ RegisterApplicationMasterRequest request =
+ recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
+ request.setApplicationMaster(applicationMaster);
+ scheduler.registerApplicationMaster(request);
+ } catch (Exception are) {
+ LOG.info("Exception while registering", are);
+ throw new YarnException(are);
+ }
+ }
+
+ protected void unregister() {
+ try {
+ applicationMaster.setState(ApplicationState.COMPLETED);
+ FinishApplicationMasterRequest request =
+ recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
+ request.setApplicationMaster(applicationMaster);
+ scheduler.finishApplicationMaster(request);
+ } catch(Exception are) {
+ LOG.info("Exception while unregistering ", are);
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ allocatorThread.interrupt();
+ try {
+ allocatorThread.join();
+ } catch (InterruptedException ie) {
+ LOG.info("InterruptedException while stopping", ie);
+ }
+ unregister();
+ super.stop();
+ }
+
+ protected void startAllocatorThread() {
+ allocatorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(rmPollInterval);
+ try {
+ heartbeat();
+ } catch (Exception e) {
+ LOG.error("ERROR IN CONTACTING RM. ", e);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Allocated thread interrupted. Returning.");
+ return;
+ }
+ }
+ }
+ });
+ allocatorThread.start();
+ }
+
+ protected AMRMProtocol createSchedulerProxy() {
+ final YarnRPC rpc = YarnRPC.create(getConfig());
+ final Configuration conf = new Configuration(getConfig());
+ final String serviceAddr = conf.get(
+ YarnConfiguration.SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
+
+ UserGroupInformation currentUser;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ SchedulerSecurityInfo.class, SecurityInfo.class);
+
+ String tokenURLEncodedStr = System.getenv().get(
+ YarnConfiguration.APPLICATION_MASTER_TOKEN_ENV_NAME);
+ LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
+ Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+ try {
+ token.decodeFromUrlString(tokenURLEncodedStr);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ currentUser.addToken(token);
+ }
+
+ return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+ @Override
+ public AMRMProtocol run() {
+ return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), conf);
+ }
+ });
+ }
+
+ protected synchronized void heartbeat() throws Exception {
+ ApplicationStatus status =
+ recordFactory.newRecordInstance(ApplicationStatus.class);
+ status.setApplicationId(applicationId);
+ status.setResponseId(lastResponseID);
+
+ AllocateRequest allocateRequest =
+ recordFactory.newRecordInstance(AllocateRequest.class);
+ allocateRequest.setApplicationStatus(status);
+ allocateRequest.addAllAsks(new ArrayList<ResourceRequest>());
+ allocateRequest.addAllReleases(new ArrayList<Container>());
+ AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+ AMResponse response = allocateResponse.getAMResponse();
+ }
+
+}