You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/14 00:35:12 UTC
svn commit: r1213987 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...
Author: vinodkv
Date: Tue Dec 13 23:35:11 2011
New Revision: 1213987
URL: http://svn.apache.org/viewvc?rev=1213987&view=rev
Log:
MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs in the correct directory to work properly in secure mode. Contributed by Hitesh Shah.
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Dec 13 23:35:11 2011
@@ -282,6 +282,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3542. Support "FileSystemCounter" legacy counter group name for
compatibility. (tomwhite)
+ MAPREDUCE-3426. Fixed MR AM in uber mode to write map intermediate outputs
+ in the correct directory to work properly in secure mode. (Hitesh Shah via
+ vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Tue Dec 13 23:35:11 2011
@@ -22,20 +22,19 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
-import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
@@ -47,13 +46,12 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.service.AbstractService;
/**
@@ -80,7 +78,10 @@ public class LocalContainerLauncher exte
super(LocalContainerLauncher.class.getName());
this.context = context;
this.umbilical = umbilical;
- // umbilical: MRAppMaster creates (taskAttemptListener), passes to us (TODO/FIXME: pointless to use RPC to talk to self; should create LocalTaskAttemptListener or similar: implement umbilical protocol but skip RPC stuff)
+ // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
+ // (TODO/FIXME: pointless to use RPC to talk to self; should create
+ // LocalTaskAttemptListener or similar: implement umbilical protocol
+ // but skip RPC stuff)
try {
curFC = FileContext.getFileContext(curDir.toURI());
@@ -152,7 +153,6 @@ public class LocalContainerLauncher exte
* ]]
* - runs Task (runSubMap() or runSubReduce())
* - TA can safely send TA_UPDATE since in RUNNING state
- * [modulo possible TA-state-machine race noted below: CHECK (TODO)]
*/
private class SubtaskRunner implements Runnable {
@@ -162,6 +162,7 @@ public class LocalContainerLauncher exte
SubtaskRunner() {
}
+ @SuppressWarnings("unchecked")
@Override
public void run() {
ContainerLauncherEvent event = null;
@@ -183,7 +184,7 @@ public class LocalContainerLauncher exte
ContainerRemoteLaunchEvent launchEv =
(ContainerRemoteLaunchEvent)event;
- TaskAttemptId attemptID = launchEv.getTaskAttemptID(); //FIXME: can attemptID ever be null? (only if retrieved over umbilical?)
+ TaskAttemptId attemptID = launchEv.getTaskAttemptID();
Job job = context.getAllJobs().get(attemptID.getTaskId().getJobId());
int numMapTasks = job.getTotalMaps();
@@ -204,7 +205,6 @@ public class LocalContainerLauncher exte
// port number is set to -1 in this case.
context.getEventHandler().handle(
new TaskAttemptContainerLaunchedEvent(attemptID, -1));
- //FIXME: race condition here? or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state? (probably latter)
if (numMapTasks == 0) {
doneWithMaps = true;
@@ -259,6 +259,7 @@ public class LocalContainerLauncher exte
}
}
+ @SuppressWarnings("deprecation")
private void runSubtask(org.apache.hadoop.mapred.Task task,
final TaskType taskType,
TaskAttemptId attemptID,
@@ -270,6 +271,19 @@ public class LocalContainerLauncher exte
try {
JobConf conf = new JobConf(getConfig());
+ conf.set(JobContext.TASK_ID, task.getTaskID().toString());
+ conf.set(JobContext.TASK_ATTEMPT_ID, classicAttemptID.toString());
+ conf.setBoolean(JobContext.TASK_ISMAP, (taskType == TaskType.MAP));
+ conf.setInt(JobContext.TASK_PARTITION, task.getPartition());
+ conf.set(JobContext.ID, task.getJobID().toString());
+
+ // Use the AM's local dir env to generate the intermediate step
+ // output files
+ String[] localSysDirs = StringUtils.getTrimmedStrings(
+ System.getenv(ApplicationConstants.LOCAL_DIR_ENV));
+ conf.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
+ LOG.info(MRConfig.LOCAL_DIR + " for uber task: "
+ + conf.get(MRConfig.LOCAL_DIR));
// mark this as an uberized subtask so it can set task counter
// (longer-term/FIXME: could redefine as job counter and send
@@ -285,12 +299,12 @@ public class LocalContainerLauncher exte
if (doneWithMaps) {
LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task ("
+ attemptID + "), but should be finished with maps");
- // throw new RuntimeException() (FIXME: what's appropriate here?)
+ throw new RuntimeException();
}
MapTask map = (MapTask)task;
+ map.setConf(conf);
- //CODE-REVIEWER QUESTION: why not task.getConf() or map.getConf() instead of conf? do we need Task's localizeConfiguration() run on this first?
map.run(conf, umbilical);
if (renameOutputs) {
@@ -305,19 +319,23 @@ public class LocalContainerLauncher exte
} else /* TaskType.REDUCE */ {
if (!doneWithMaps) {
- //check if event-queue empty? whole idea of counting maps vs. checking event queue is a tad wacky...but could enforce ordering (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?): doesn't send reduce event until maps all done]
+ // check if event-queue empty? whole idea of counting maps vs.
+ // checking event queue is a tad wacky...but could enforce ordering
+ // (assuming no "lost events") at LocalMRAppMaster [CURRENT BUG(?):
+ // doesn't send reduce event until maps all done]
LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task ("
+ attemptID + "), but not yet finished with maps");
- // throw new RuntimeException() (FIXME) // or push reduce event back onto end of queue? (probably former)
+ throw new RuntimeException();
}
- ReduceTask reduce = (ReduceTask)task;
-
// a.k.a. "mapreduce.jobtracker.address" in LocalJobRunner:
// set framework name to local to make task local
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
conf.set(MRConfig.MASTER_ADDRESS, "local"); // bypass shuffle
+ ReduceTask reduce = (ReduceTask)task;
+ reduce.setConf(conf);
+
reduce.run(conf, umbilical);
//relocalize(); // needed only if more than one reducer supported (is MAPREDUCE-434 fixed yet?)
}
@@ -334,18 +352,7 @@ public class LocalContainerLauncher exte
try {
if (task != null) {
// do cleanup for the task
-// if (childUGI == null) { // no need to job into doAs block
- task.taskCleanup(umbilical);
-// } else {
-// final Task taskFinal = task;
-// childUGI.doAs(new PrivilegedExceptionAction<Object>() {
-// @Override
-// public Object run() throws Exception {
-// taskFinal.taskCleanup(umbilical);
-// return null;
-// }
-// });
-// }
+ task.taskCleanup(umbilical);
}
} catch (Exception e) {
LOG.info("Exception cleaning up: "
@@ -354,51 +361,21 @@ public class LocalContainerLauncher exte
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
-// if (classicAttemptID != null) {
- umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
-// }
+ umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
throw new RuntimeException();
} catch (Throwable throwable) {
LOG.fatal("Error running local (uberized) 'child' : "
+ StringUtils.stringifyException(throwable));
-// if (classicAttemptID != null) {
- Throwable tCause = throwable.getCause();
- String cause = (tCause == null)
- ? throwable.getMessage()
- : StringUtils.stringifyException(tCause);
- umbilical.fatalError(classicAttemptID, cause);
-// }
+ Throwable tCause = throwable.getCause();
+ String cause = (tCause == null)
+ ? throwable.getMessage()
+ : StringUtils.stringifyException(tCause);
+ umbilical.fatalError(classicAttemptID, cause);
throw new RuntimeException();
-
- } finally {
-/*
-FIXME: do we need to do any of this stuff? (guessing not since not in own JVM)
- RPC.stopProxy(umbilical);
- DefaultMetricsSystem.shutdown();
- // Shutting down log4j of the child-vm...
- // This assumes that on return from Task.run()
- // there is no more logging done.
- LogManager.shutdown();
- */
}
}
-
-/* FIXME: may not need renameMapOutputForReduce() anymore? TEST!
-
-${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for containers;
-contains launch_container.sh script, which, when executed, creates symlinks and
-sets up env
- "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
- "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
- "$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done
-
- OHO! no further need for this at all? $taskId is unique per subtask
- now => should work fine to leave alone. TODO: test with teragen or
- similar
- */
-
/**
* Within the _local_ filesystem (not HDFS), all activity takes place within
* a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
@@ -409,14 +386,21 @@ sets up env
* filenames instead of "file.out". (All of this is entirely internal,
* so there are no particular compatibility issues.)
*/
+ @SuppressWarnings("deprecation")
private void renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId,
MapOutputFile subMapOutputFile)
throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
// move map output to reduce input
Path mapOut = subMapOutputFile.getOutputFile();
+ FileStatus mStatus = localFs.getFileStatus(mapOut);
Path reduceIn = subMapOutputFile.getInputFileForWrite(
- TypeConverter.fromYarn(mapId).getTaskID(), localFs.getLength(mapOut));
+ TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Renaming map output file for task attempt "
+ + mapId.toString() + " from original location " + mapOut.toString()
+ + " to destination " + reduceIn.toString());
+ }
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
@@ -429,8 +413,7 @@ sets up env
* Also within the local filesystem, we need to restore the initial state
* of the directory as much as possible. Compare current contents against
* the saved original state and nuke everything that doesn't belong, with
- * the exception of the renamed map outputs (see above).
-FIXME: do we really need to worry about renamed map outputs, or already moved to output dir on commit? if latter, fix comment
+ * the exception of the renamed map outputs.
*
* Any jobs that go out of their way to rename or delete things from the
* local directory are considered broken and deserve what they get...
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Tue Dec 13 23:35:11 2011
@@ -236,6 +236,13 @@ public class MapReduceChildJVM {
getTaskLogFile(TaskLog.LogName.PROFILE)
)
);
+ if (task.isMapTask()) {
+ vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, ""));
+ }
+ else {
+ vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, ""));
+ }
+
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Dec 13 23:35:11 2011
@@ -156,6 +156,7 @@ public class MRAppMaster extends Composi
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private boolean inRecovery = false;
+ private SpeculatorEventDispatcher speculatorEventDispatcher;
private Job job;
private Credentials fsTokens = new Credentials(); // Filled during init
@@ -265,8 +266,9 @@ public class MRAppMaster extends Composi
addIfService(speculator);
}
+ speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
dispatcher.register(Speculator.EventType.class,
- new SpeculatorEventDispatcher(conf));
+ speculatorEventDispatcher);
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(clientService, context);
@@ -386,7 +388,7 @@ public class MRAppMaster extends Composi
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
stop();
-
+
// Send job-end notification
try {
LOG.info("Job end notification started for jobID : "
@@ -401,14 +403,14 @@ public class MRAppMaster extends Composi
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
-
+
// Cleanup staging directory
try {
cleanupStagingDir();
} catch(IOException io) {
LOG.warn("Failed to delete staging dir");
}
-
+
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
@@ -790,10 +792,6 @@ public class MRAppMaster extends Composi
// job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent);
- // send init to speculator. This won't yest start as dispatcher isn't
- // started yet.
- dispatcher.getEventHandler().handle(
- new SpeculatorEvent(job.getID(), clock.getTime()));
// JobImpl's InitTransition is done (call above is synchronous), so the
// "uber-decision" (MR-1220) has been made. Query job and switch to
@@ -801,9 +799,15 @@ public class MRAppMaster extends Composi
// and container-launcher services/event-handlers).
if (job.isUber()) {
+ speculatorEventDispatcher.disableSpeculation();
LOG.info("MRAppMaster uberizing job " + job.getID()
- + " in local container (\"uber-AM\").");
+ + " in local container (\"uber-AM\") on node "
+ + nmHost + ":" + nmPort + ".");
} else {
+ // send init to speculator only for non-uber jobs.
+ // This won't yet start as dispatcher isn't started yet.
+ dispatcher.getEventHandler().handle(
+ new SpeculatorEvent(job.getID(), clock.getTime()));
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
@@ -865,17 +869,24 @@ public class MRAppMaster extends Composi
private class SpeculatorEventDispatcher implements
EventHandler<SpeculatorEvent> {
private final Configuration conf;
+ private volatile boolean disabled;
public SpeculatorEventDispatcher(Configuration config) {
this.conf = config;
}
@Override
public void handle(SpeculatorEvent event) {
- if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
- || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+ if (!disabled &&
+ (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+ || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) {
// Speculator IS enabled, direct the event to there.
speculator.handle(event);
}
}
+
+ public void disableSpeculation() {
+ disabled = true;
+ }
+
}
private static void validateInputParam(String value, String param)
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Dec 13 23:35:11 2011
@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -583,13 +584,13 @@ public class JobImpl implements org.apac
if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress, 0.0f, 0.0f,
- cleanupProgress, remoteJobConfFile.toString(), amInfos);
+ cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
}
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
appSubmitTime, startTime, finishTime, setupProgress,
computeProgress(mapTasks), computeProgress(reduceTasks),
- cleanupProgress, remoteJobConfFile.toString(), amInfos);
+ cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber);
} finally {
readLock.unlock();
}
@@ -812,6 +813,129 @@ public class JobImpl implements org.apac
return amInfos;
}
+ /**
+ * Decide whether job can be run in uber mode based on various criteria.
+ * @param dataInputLength Total length for all splits
+ */
+ private void makeUberDecision(long dataInputLength) {
+ //FIXME: need new memory criterion for uber-decision (oops, too late here;
+ // until AM-resizing supported,
+ // must depend on job client to pass fat-slot needs)
+ // these are no longer "system" settings, necessarily; user may override
+ int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
+
+ //FIXME: handling multiple reduces within a single AM does not seem to
+ //work.
+ // int sysMaxReduces =
+ // job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
+ int sysMaxReduces = 1;
+
+ long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
+ conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
+ // wrong; get FS from [File?]InputFormat and default block size from that
+
+ long sysMemSizeForUberSlot =
+ conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
+ MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
+
+ boolean uberEnabled =
+ conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
+ boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
+ boolean smallInput = (dataInputLength <= sysMaxBytes);
+ // ignoring overhead due to UberAM and statics as negligible here:
+ boolean smallMemory =
+ ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
+ conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
+ <= sysMemSizeForUberSlot)
+ || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
+ boolean notChainJob = !isChainJob(conf);
+
+ // User has overall veto power over uberization, or user can modify
+ // limits (overriding system settings and potentially shooting
+ // themselves in the head). Note that ChainMapper/Reducer are
+ // fundamentally incompatible with MR-1220; they employ a blocking
+ // queue between the maps/reduces and thus require parallel execution,
+ // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
+ // and thus requires sequential execution.
+ isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
+ && smallInput && smallMemory && notChainJob;
+
+ if (isUber) {
+ LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
+ + numReduceTasks + "r tasks (" + dataInputLength
+ + " input bytes) will run sequentially on single node.");
+
+ // make sure reduces are scheduled only after all map are completed
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
+ 1.0f);
+ // uber-subtask attempts all get launched on same node; if one fails,
+ // probably should retry elsewhere, i.e., move entire uber-AM: ergo,
+ // limit attempts to 1 (or at most 2? probably not...)
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+ // disable speculation
+ conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+ } else {
+ StringBuilder msg = new StringBuilder();
+ msg.append("Not uberizing ").append(jobId).append(" because:");
+ if (!uberEnabled)
+ msg.append(" not enabled;");
+ if (!smallNumMapTasks)
+ msg.append(" too many maps;");
+ if (!smallNumReduceTasks)
+ msg.append(" too many reduces;");
+ if (!smallInput)
+ msg.append(" too much input;");
+ if (!smallMemory)
+ msg.append(" too much RAM;");
+ if (!notChainJob)
+ msg.append(" chainjob");
+ LOG.info(msg.toString());
+ }
+ }
+
+ /**
+ * ChainMapper and ChainReducer must execute in parallel, so they're not
+ * compatible with uberization/LocalContainerLauncher (100% sequential).
+ */
+ private boolean isChainJob(Configuration conf) {
+ boolean isChainJob = false;
+ try {
+ String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
+ if (mapClassName != null) {
+ Class<?> mapClass = Class.forName(mapClassName);
+ if (ChainMapper.class.isAssignableFrom(mapClass))
+ isChainJob = true;
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // don't care; assume it's not derived from ChainMapper
+ }
+ try {
+ String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
+ if (reduceClassName != null) {
+ Class<?> reduceClass = Class.forName(reduceClassName);
+ if (ChainReducer.class.isAssignableFrom(reduceClass))
+ isChainJob = true;
+ }
+ } catch (ClassNotFoundException cnfe) {
+ // don't care; assume it's not derived from ChainReducer
+ }
+ return isChainJob;
+ }
+
+ /*
+ private int getBlockSize() {
+ String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
+ if (inputClassName != null) {
+ Class<?> inputClass - Class.forName(inputClassName);
+ if (FileInputFormat<K, V>)
+ }
+ }
+ */
+
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
@@ -863,81 +987,8 @@ public class JobImpl implements org.apac
inputLength += taskSplitMetaInfo[i].getInputDataLength();
}
- //FIXME: need new memory criterion for uber-decision (oops, too late here;
- // until AM-resizing supported, must depend on job client to pass fat-slot needs)
- // these are no longer "system" settings, necessarily; user may override
- int sysMaxMaps = job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);
- int sysMaxReduces =
- job.conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);
- long sysMaxBytes = job.conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
- job.conf.getLong("dfs.block.size", 64*1024*1024)); //FIXME: this is
- // wrong; get FS from [File?]InputFormat and default block size from that
- //long sysMemSizeForUberSlot = JobTracker.getMemSizeForReduceSlot();
- // FIXME [could use default AM-container memory size...]
-
- boolean uberEnabled =
- job.conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
- boolean smallNumMapTasks = (job.numMapTasks <= sysMaxMaps);
- boolean smallNumReduceTasks = (job.numReduceTasks <= sysMaxReduces);
- boolean smallInput = (inputLength <= sysMaxBytes);
- boolean smallMemory = true; //FIXME (see above)
- // ignoring overhead due to UberTask and statics as negligible here:
- // FIXME && (Math.max(memoryPerMap, memoryPerReduce) <= sysMemSizeForUberSlot
- // || sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)
- boolean notChainJob = !isChainJob(job.conf);
-
- // User has overall veto power over uberization, or user can modify
- // limits (overriding system settings and potentially shooting
- // themselves in the head). Note that ChainMapper/Reducer are
- // fundamentally incompatible with MR-1220; they employ a blocking
-
- // User has overall veto power over uberization, or user can modify
- // limits (overriding system settings and potentially shooting
- // themselves in the head). Note that ChainMapper/Reducer are
- // fundamentally incompatible with MR-1220; they employ a blocking
- // queue between the maps/reduces and thus require parallel execution,
- // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
- // and thus requires sequential execution.
- job.isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
- && smallInput && smallMemory && notChainJob;
-
- if (job.isUber) {
- LOG.info("Uberizing job " + job.jobId + ": " + job.numMapTasks + "m+"
- + job.numReduceTasks + "r tasks (" + inputLength
- + " input bytes) will run sequentially on single node.");
- //TODO: also note which node?
-
- // make sure reduces are scheduled only after all map are completed
- job.conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
- 1.0f);
- // uber-subtask attempts all get launched on same node; if one fails,
- // probably should retry elsewhere, i.e., move entire uber-AM: ergo,
- // limit attempts to 1 (or at most 2? probably not...)
- job.conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
- job.conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
-
- // disable speculation: makes no sense to speculate an entire job
- //canSpeculateMaps = canSpeculateReduces = false; // [TODO: in old
- //version, ultimately was from conf.getMapSpeculativeExecution(),
- //conf.getReduceSpeculativeExecution()]
- } else {
- StringBuilder msg = new StringBuilder();
- msg.append("Not uberizing ").append(job.jobId).append(" because:");
- if (!uberEnabled)
- msg.append(" not enabled;");
- if (!smallNumMapTasks)
- msg.append(" too many maps;");
- if (!smallNumReduceTasks)
- msg.append(" too many reduces;");
- if (!smallInput)
- msg.append(" too much input;");
- if (!smallMemory)
- msg.append(" too much RAM;");
- if (!notChainJob)
- msg.append(" chainjob");
- LOG.info(msg.toString());
- }
-
+ job.makeUberDecision(inputLength);
+
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
@@ -1008,35 +1059,6 @@ public class JobImpl implements org.apac
}
}
- /**
- * ChainMapper and ChainReducer must execute in parallel, so they're not
- * compatible with uberization/LocalContainerLauncher (100% sequential).
- */
- boolean isChainJob(Configuration conf) {
- boolean isChainJob = false;
- try {
- String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
- if (mapClassName != null) {
- Class<?> mapClass = Class.forName(mapClassName);
- if (ChainMapper.class.isAssignableFrom(mapClass))
- isChainJob = true;
- }
- } catch (ClassNotFoundException cnfe) {
- // don't care; assume it's not derived from ChainMapper
- }
- try {
- String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
- if (reduceClassName != null) {
- Class<?> reduceClass = Class.forName(reduceClassName);
- if (ChainReducer.class.isAssignableFrom(reduceClass))
- isChainJob = true;
- }
- } catch (ClassNotFoundException cnfe) {
- // don't care; assume it's not derived from ChainReducer
- }
- return isChainJob;
- }
-
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Tue Dec 13 23:35:11 2011
@@ -60,8 +60,8 @@ public class LocalContainerAllocator ext
private static final Log LOG =
LogFactory.getLog(LocalContainerAllocator.class);
+ @SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
-// private final ApplicationId appID;
private AtomicInteger containerCount = new AtomicInteger();
private long retryInterval;
private long retrystartTime;
@@ -73,8 +73,6 @@ public class LocalContainerAllocator ext
AppContext context) {
super(clientService, context);
this.eventHandler = context.getEventHandler();
-// this.appID = context.getApplicationID();
-
}
@Override
@@ -88,6 +86,7 @@ public class LocalContainerAllocator ext
retrystartTime = System.currentTimeMillis();
}
+ @SuppressWarnings("unchecked")
@Override
protected synchronized void heartbeat() throws Exception {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
@@ -124,6 +123,7 @@ public class LocalContainerAllocator ext
}
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(ContainerAllocatorEvent event) {
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Dec 13 23:35:11 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,7 +66,7 @@ public abstract class RMCommunicator ext
private int rmPollInterval;//millis
protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId;
- private volatile boolean stopped;
+ private AtomicBoolean stopped;
protected Thread allocatorThread;
protected EventHandler eventHandler;
protected AMRMProtocol scheduler;
@@ -88,6 +89,7 @@ public abstract class RMCommunicator ext
this.eventHandler = context.getEventHandler();
this.applicationId = context.getApplicationID();
this.applicationAttemptId = context.getApplicationAttemptId();
+ this.stopped = new AtomicBoolean(false);
}
@Override
@@ -213,7 +215,10 @@ public abstract class RMCommunicator ext
@Override
public void stop() {
- stopped = true;
+ if (stopped.getAndSet(true)) {
+ // return if already stopped
+ return;
+ }
allocatorThread.interrupt();
try {
allocatorThread.join();
@@ -228,7 +233,7 @@ public abstract class RMCommunicator ext
allocatorThread = new Thread(new Runnable() {
@Override
public void run() {
- while (!stopped && !Thread.currentThread().isInterrupted()) {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(rmPollInterval);
try {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Tue Dec 13 23:35:11 2011
@@ -183,6 +183,7 @@ public class TestMRClientService {
Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
.getAttemptId());
Assert.assertTrue(amInfo.getStartTime() > 0);
+ Assert.assertEquals(false, jr.isUber());
}
private void verifyTaskAttemptReport(TaskAttemptReport tar) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Dec 13 23:35:11 2011
@@ -118,7 +118,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -195,7 +195,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -261,7 +261,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -375,7 +375,7 @@ public class TestRMContainerAllocator {
public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
- this.reduceProgress, this.cleanupProgress, "jobfile", null);
+ this.reduceProgress, this.cleanupProgress, "jobfile", null, false);
}
}
@@ -511,7 +511,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -610,7 +610,7 @@ public class TestRMContainerAllocator {
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
- 0, 0, 0, 0, 0, 0, "jobfile", null));
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Tue Dec 13 23:35:11 2011
@@ -288,7 +288,7 @@ public class TypeConverter {
.getMapProgress(), jobreport.getReduceProgress(), jobreport
.getCleanupProgress(), fromYarn(jobreport.getJobState()),
jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
- .getJobFile(), trackingUrl);
+ .getJobFile(), trackingUrl, jobreport.isUber());
jobStatus.setFailureInfo(jobreport.getDiagnostics());
return jobStatus;
}
@@ -421,7 +421,7 @@ public class TypeConverter {
TypeConverter.fromYarn(application.getYarnApplicationState()),
org.apache.hadoop.mapreduce.JobPriority.NORMAL,
application.getUser(), application.getName(),
- application.getQueue(), jobFile, trackingUrl
+ application.getQueue(), jobFile, trackingUrl, false
);
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java Tue Dec 13 23:35:11 2011
@@ -36,6 +36,7 @@ public interface JobReport {
public abstract String getDiagnostics();
public abstract String getJobFile();
public abstract List<AMInfo> getAMInfos();
+ public abstract boolean isUber();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@@ -52,4 +53,5 @@ public interface JobReport {
public abstract void setDiagnostics(String diagnostics);
public abstract void setJobFile(String jobFile);
public abstract void setAMInfos(List<AMInfo> amInfos);
+ public abstract void setIsUber(boolean isUber);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java Tue Dec 13 23:35:11 2011
@@ -332,4 +332,16 @@ public class JobReportPBImpl extends Pro
private JobState convertFromProtoFormat(JobStateProto e) {
return MRProtoUtils.convertFromProtoFormat(e);
}
+
+ @Override
+ public synchronized boolean isUber() {
+ JobReportProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getIsUber();
+ }
+
+ @Override
+ public synchronized void setIsUber(boolean isUber) {
+ maybeInitBuilder();
+ builder.setIsUber(isUber);
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Tue Dec 13 23:35:11 2011
@@ -60,7 +60,8 @@ public class MRBuilderUtils {
public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
- float cleanupProgress, String jobFile, List<AMInfo> amInfos) {
+ float cleanupProgress, String jobFile, List<AMInfo> amInfos,
+ boolean isUber) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
@@ -75,6 +76,7 @@ public class MRBuilderUtils {
report.setReduceProgress(reduceProgress);
report.setJobFile(jobFile);
report.setAMInfos(amInfos);
+ report.setIsUber(isUber);
return report;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Tue Dec 13 23:35:11 2011
@@ -152,6 +152,7 @@ message JobReportProto {
optional string jobFile = 13;
repeated AMInfoProto am_infos = 14;
optional int64 submit_time = 15;
+ optional bool is_uber = 16 [default = false];
}
message AMInfoProto {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Tue Dec 13 23:35:11 2011
@@ -97,7 +97,7 @@ public class JobStatus extends org.apach
String user, String jobName,
String jobFile, String trackingUrl) {
this(jobid, mapProgress, reduceProgress, cleanupProgress, runState,
- JobPriority.NORMAL, user, jobName, jobFile, trackingUrl);
+ JobPriority.NORMAL, user, jobName, jobFile, trackingUrl);
}
/**
@@ -135,7 +135,8 @@ public class JobStatus extends org.apach
String user, String jobName, String jobFile,
String trackingUrl) {
this(jobid, 0.0f, mapProgress, reduceProgress,
- cleanupProgress, runState, jp, user, jobName, jobFile, trackingUrl);
+ cleanupProgress, runState, jp, user, jobName, jobFile,
+ trackingUrl);
}
/**
@@ -157,9 +158,31 @@ public class JobStatus extends org.apach
int runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
- runState, jp,
- user, jobName, "default", jobFile, trackingUrl);
+ runState, jp, user, jobName, "default", jobFile, trackingUrl);
}
+
+ /**
+ * Create a job status object for a given jobid.
+ * @param jobid The jobid of the job
+ * @param setupProgress The progress made on the setup
+ * @param mapProgress The progress made on the maps
+ * @param reduceProgress The progress made on the reduces
+ * @param cleanupProgress The progress made on the cleanup
+ * @param runState The current state of the job
+ * @param jp Priority of the job.
+ * @param user userid of the person who submitted the job.
+ * @param jobName user-specified job name.
+ * @param jobFile job configuration file.
+ * @param trackingUrl link to the web-ui for details of the job.
+ * @param isUber Whether job running in uber mode
+ */
+ public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+ float reduceProgress, float cleanupProgress,
+ int runState, JobPriority jp, String user, String jobName,
+ String jobFile, String trackingUrl, boolean isUber) {
+ this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+ runState, jp, user, jobName, "default", jobFile, trackingUrl, isUber);
+ }
/**
* Create a job status object for a given jobid.
@@ -173,27 +196,52 @@ public class JobStatus extends org.apach
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param queue job queue name.
+ * @param jobFile job configuration file.
+ * @param trackingUrl link to the web-ui for details of the job.
+ */
+ public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+ float reduceProgress, float cleanupProgress,
+ int runState, JobPriority jp,
+ String user, String jobName, String queue,
+ String jobFile, String trackingUrl) {
+ this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+ runState, jp,
+ user, jobName, queue, jobFile, trackingUrl, false);
+ }
+
+ /**
+ * Create a job status object for a given jobid.
+ * @param jobid The jobid of the job
+ * @param setupProgress The progress made on the setup
+ * @param mapProgress The progress made on the maps
+ * @param reduceProgress The progress made on the reduces
+ * @param cleanupProgress The progress made on the cleanup
+ * @param runState The current state of the job
+ * @param jp Priority of the job.
+ * @param user userid of the person who submitted the job.
+ * @param jobName user-specified job name.
+ * @param queue job queue name.
* @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
+ * @param isUber Whether job running in uber mode
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
float reduceProgress, float cleanupProgress,
int runState, JobPriority jp,
String user, String jobName, String queue,
- String jobFile, String trackingUrl) {
+ String jobFile, String trackingUrl, boolean isUber) {
super(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
getEnum(runState), org.apache.hadoop.mapreduce.JobPriority.valueOf(jp.name()),
- user, jobName, queue, jobFile, trackingUrl);
+ user, jobName, queue, jobFile, trackingUrl, isUber);
}
-
public static JobStatus downgrade(org.apache.hadoop.mapreduce.JobStatus stat){
JobStatus old = new JobStatus(JobID.downgrade(stat.getJobID()),
stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
stat.getCleanupProgress(), stat.getState().getValue(),
JobPriority.valueOf(stat.getPriority().name()),
stat.getUsername(), stat.getJobName(), stat.getJobFile(),
- stat.getTrackingUrl());
+ stat.getTrackingUrl(), stat.isUber());
old.setStartTime(stat.getStartTime());
old.setFinishTime(stat.getFinishTime());
old.setSchedulingInfo(stat.getSchedulingInfo());
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Tue Dec 13 23:35:11 2011
@@ -467,6 +467,7 @@ public class Job extends JobContextImpl
sb.append("Job File: ").append(status.getJobFile()).append("\n");
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n");
+ sb.append("Uber job : ").append(status.isUber()).append("\n");
sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: ");
@@ -1268,12 +1269,20 @@ public class Job extends JobContextImpl
Job.getProgressPollInterval(clientConf);
/* make sure to report full progress after the job is done */
boolean reportedAfterCompletion = false;
+ boolean reportedUberMode = false;
while (!isComplete() || !reportedAfterCompletion) {
if (isComplete()) {
reportedAfterCompletion = true;
} else {
Thread.sleep(progMonitorPollIntervalMillis);
}
+ if (status.getState() == JobStatus.State.PREP) {
+ continue;
+ }
+ if (!reportedUberMode) {
+ reportedUberMode = true;
+ LOG.info("Job " + jobId + " running in uber mode : " + isUber());
+ }
String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " +
@@ -1497,4 +1506,10 @@ public class Job extends JobContextImpl
conf.set(Job.OUTPUT_FILTER, newValue.toString());
}
+ public boolean isUber() throws IOException, InterruptedException {
+ ensureState(JobState.RUNNING);
+ updateStatus();
+ return status.isUber();
+ }
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java Tue Dec 13 23:35:11 2011
@@ -97,7 +97,7 @@ public class JobStatus implements Writab
private int usedMem;
private int reservedMem;
private int neededMem;
-
+ private boolean isUber;
/**
*/
@@ -115,17 +115,17 @@ public class JobStatus implements Writab
* @param jp Priority of the job.
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
- * @param jobFile job configuration file.
+ * @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
- float reduceProgress, float cleanupProgress,
+ float reduceProgress, float cleanupProgress,
State runState, JobPriority jp, String user, String jobName,
String jobFile, String trackingUrl) {
this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
- runState, jp, user, jobName, "default", jobFile, trackingUrl);
+ runState, jp, user, jobName, "default", jobFile, trackingUrl, false);
}
-
+
/**
* Create a job status object for a given jobid.
* @param jobid The jobid of the job
@@ -138,14 +138,39 @@ public class JobStatus implements Writab
* @param user userid of the person who submitted the job.
* @param jobName user-specified job name.
* @param queue queue name
- * @param jobFile job configuration file.
+ * @param jobFile job configuration file.
* @param trackingUrl link to the web-ui for details of the job.
*/
public JobStatus(JobID jobid, float setupProgress, float mapProgress,
- float reduceProgress, float cleanupProgress,
- State runState, JobPriority jp,
- String user, String jobName, String queue,
+ float reduceProgress, float cleanupProgress,
+ State runState, JobPriority jp,
+ String user, String jobName, String queue,
String jobFile, String trackingUrl) {
+ this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+ runState, jp, user, jobName, queue, jobFile, trackingUrl, false);
+ }
+
+ /**
+ * Create a job status object for a given jobid.
+ * @param jobid The jobid of the job
+ * @param setupProgress The progress made on the setup
+ * @param mapProgress The progress made on the maps
+ * @param reduceProgress The progress made on the reduces
+ * @param cleanupProgress The progress made on the cleanup
+ * @param runState The current state of the job
+ * @param jp Priority of the job.
+ * @param user userid of the person who submitted the job.
+ * @param jobName user-specified job name.
+ * @param queue queue name
+ * @param jobFile job configuration file.
+ * @param trackingUrl link to the web-ui for details of the job.
+ * @param isUber Whether job running in uber mode
+ */
+ public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+ float reduceProgress, float cleanupProgress,
+ State runState, JobPriority jp,
+ String user, String jobName, String queue,
+ String jobFile, String trackingUrl, boolean isUber) {
this.jobid = jobid;
this.setupProgress = setupProgress;
this.mapProgress = mapProgress;
@@ -161,8 +186,9 @@ public class JobStatus implements Writab
this.jobName = jobName;
this.jobFile = jobFile;
this.trackingUrl = trackingUrl;
+ this.isUber = isUber;
}
-
+
/**
* Sets the map progress of this job
@@ -411,6 +437,7 @@ public class JobStatus implements Writab
Text.writeString(out, jobName);
Text.writeString(out, trackingUrl);
Text.writeString(out, jobFile);
+ out.writeBoolean(isUber);
// Serialize the job's ACLs
out.writeInt(jobACLs.size());
@@ -438,6 +465,7 @@ public class JobStatus implements Writab
this.jobName = Text.readString(in);
this.trackingUrl = Text.readString(in);
this.jobFile = Text.readString(in);
+ this.isUber = in.readBoolean();
// De-serialize the job's ACLs
int numACLs = in.readInt();
@@ -562,9 +590,26 @@ public class JobStatus implements Writab
this.neededMem = n;
}
+ /**
+ * Whether job running in uber mode
+ * @return job in uber-mode
+ */
+ public synchronized boolean isUber() {
+ return isUber;
+ }
+
+ /**
+ * Set uber-mode flag
+ * @param isUber Whether job running in uber-mode
+ */
+ public synchronized void setUber(boolean isUber) {
+ this.isUber = isUber;
+ }
+
public String toString() {
StringBuffer buffer = new StringBuffer();
buffer.append("job-id : " + jobid);
+ buffer.append("uber-mode : " + isUber);
buffer.append("map-progress : " + mapProgress);
buffer.append("reduce-progress : " + reduceProgress);
buffer.append("cleanup-progress : " + cleanupProgress);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Dec 13 23:35:11 2011
@@ -150,6 +150,10 @@ public interface MRJobConfig {
public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces";
+ public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params";
+
+ public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params";
+
public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
public static final String TASK_ID = "mapreduce.task.id";
@@ -298,12 +302,6 @@ public interface MRJobConfig {
"mapreduce.job.ubertask.maxreduces";
public static final String JOB_UBERTASK_MAXBYTES =
"mapreduce.job.ubertask.maxbytes";
- public static final String UBERTASK_JAVA_OPTS =
- "mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts?
- public static final String UBERTASK_ULIMIT =
- "mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit?
- public static final String UBERTASK_ENV =
- "mapreduce.ubertask.child.env"; // or mapreduce.uber.env?
public static final String MR_PREFIX = "yarn.app.mapreduce.";
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobMonitorAndPrint.java Tue Dec 13 23:35:11 2011
@@ -63,17 +63,20 @@ public class TestJobMonitorAndPrint exte
when(cluster.getConf()).thenReturn(conf);
when(cluster.getClient()).thenReturn(clientProtocol);
JobStatus jobStatus = new JobStatus(new JobID("job_000", 1), 0f, 0f, 0f, 0f,
- State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
+ State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
+ "tmp-jobfile", "tmp-url");
job = Job.getInstance(cluster, jobStatus, conf);
job = spy(job);
}
@Test
public void testJobMonitorAndPrint() throws Exception {
- JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f, 0.1f, 0f,
- State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
- JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f, 1f, 1f,
- State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname", "tmp-jobfile", "tmp-url");
+ JobStatus jobStatus_1 = new JobStatus(new JobID("job_000", 1), 1f, 0.1f,
+ 0.1f, 0f, State.RUNNING, JobPriority.HIGH, "tmp-user", "tmp-jobname",
+ "tmp-queue", "tmp-jobfile", "tmp-url", true);
+ JobStatus jobStatus_2 = new JobStatus(new JobID("job_000", 1), 1f, 1f,
+ 1f, 1f, State.SUCCEEDED, JobPriority.HIGH, "tmp-user", "tmp-jobname",
+ "tmp-queue", "tmp-jobfile", "tmp-url", true);
doAnswer(
new Answer<TaskCompletionEvent[]>() {
@@ -102,15 +105,21 @@ public class TestJobMonitorAndPrint exte
String line;
boolean foundHundred = false;
boolean foundComplete = false;
- String match_1 = "map 100% reduce 100%";
- String match_2 = "completed successfully";
+ boolean foundUber = false;
+ String match_1 = "uber mode : true";
+ String match_2 = "map 100% reduce 100%";
+ String match_3 = "completed successfully";
while ((line = r.readLine()) != null) {
- foundHundred = line.contains(match_1);
+ if (line.contains(match_1)) {
+ foundUber = true;
+ }
+ foundHundred = line.contains(match_2);
if (foundHundred)
break;
}
line = r.readLine();
- foundComplete = line.contains(match_2);
+ foundComplete = line.contains(match_3);
+ assertTrue(foundUber);
assertTrue(foundHundred);
assertTrue(foundComplete);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue Dec 13 23:35:11 2011
@@ -107,6 +107,7 @@ public class CompletedJob implements org
report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
.toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
report.setAMInfos(getAMInfos());
+ report.setIsUber(isUber());
}
@Override
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Tue Dec 13 23:35:11 2011
@@ -168,7 +168,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
when(jobReportResponse1.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
- JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
+ JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
// First AM returns a report with jobName firstGen and simulates AM shutdown
// on second invocation.
@@ -180,7 +180,7 @@ public class TestClientServiceDelegate {
GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
when(jobReportResponse2.getJobReport()).thenReturn(
MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
- JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
+ JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null, false));
// Second AM generation returns a report with jobName secondGen
MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1213987&r1=1213986&r2=1213987&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Tue Dec 13 23:35:11 2011
@@ -49,6 +49,7 @@ public class TestUberAM extends TestMRJo
}
@Override
+ @Test
public void testSleepJob()
throws IOException, InterruptedException, ClassNotFoundException {
if (mrCluster != null) {
@@ -84,6 +85,7 @@ public class TestUberAM extends TestMRJo
}
@Override
+ @Test
public void testRandomWriter()
throws IOException, InterruptedException, ClassNotFoundException {
super.testRandomWriter();
@@ -101,6 +103,7 @@ public class TestUberAM extends TestMRJo
}
@Override
+ @Test
public void testFailingMapper()
throws IOException, InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting uberized testFailingMapper().");