You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [16/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,6 +41,8 @@
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;
@@ -108,12 +110,12 @@
private static final Log LOG = LogFactory.getLog(JobConf.class);
static{
- Configuration.addDefaultResource("mapred-default.xml");
- Configuration.addDefaultResource("mapred-site.xml");
+ ConfigUtil.loadResources();
}
/**
- * @deprecated
+ * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and
+ * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY}
*/
@Deprecated
public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
@@ -145,18 +147,23 @@
* indicates that the options are turned off.
*/
public static final long DISABLED_MEMORY_LIMIT = -1L;
-
+
+ /**
+ * Property name for the configuration property mapreduce.cluster.local.dir
+ */
+ public static final String MAPRED_LOCAL_DIR_PROPERTY = MRConfig.LOCAL_DIR;
+
/**
* Name of the queue to which jobs will be submitted, if no queue
* name is mentioned.
*/
public static final String DEFAULT_QUEUE_NAME = "default";
- static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
- "mapred.job.map.memory.mb";
+ static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
+ JobContext.MAP_MEMORY_MB;
static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
- "mapred.job.reduce.memory.mb";
+ JobContext.REDUCE_MEMORY_MB;
/**
* Configuration key to set the java command line options for the child
@@ -200,7 +207,7 @@
* other environment variables to the map processes.
*/
public static final String MAPRED_MAP_TASK_JAVA_OPTS =
- "mapred.map.child.java.opts";
+ JobContext.MAP_JAVA_OPTS;
/**
* Configuration key to set the java command line options for the reduce tasks.
@@ -220,7 +227,7 @@
* pass process environment variables to the reduce processes.
*/
public static final String MAPRED_REDUCE_TASK_JAVA_OPTS =
- "mapred.reduce.child.java.opts";
+ JobContext.REDUCE_JAVA_OPTS;
public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
@@ -244,7 +251,7 @@
* Note: This must be greater than or equal to the -Xmx passed to the JavaVM
* via {@link #MAPRED_MAP_TASK_JAVA_OPTS}, else the VM might not start.
*/
- public static final String MAPRED_MAP_TASK_ULIMIT = "mapred.map.child.ulimit";
+ public static final String MAPRED_MAP_TASK_ULIMIT = JobContext.MAP_ULIMIT;
/**
* Configuration key to set the maximum virutal memory available to the
@@ -253,8 +260,9 @@
* Note: This must be greater than or equal to the -Xmx passed to the JavaVM
* via {@link #MAPRED_REDUCE_TASK_JAVA_OPTS}, else the VM might not start.
*/
- public static final String MAPRED_REDUCE_TASK_ULIMIT =
- "mapred.reduce.child.ulimit";
+ public static final String MAPRED_REDUCE_TASK_ULIMIT =
+ JobContext.REDUCE_ULIMIT;
+
/**
* Configuration key to set the environment of the child map/reduce tasks.
@@ -287,7 +295,7 @@
* <li> B=$X:c This is inherit tasktracker's X env variable. </li>
* </ul>
*/
- public static final String MAPRED_MAP_TASK_ENV = "mapred.map.child.env";
+ public static final String MAPRED_MAP_TASK_ENV = JobContext.MAP_ENV;
/**
* Configuration key to set the maximum virutal memory available to the
@@ -302,8 +310,7 @@
* <li> B=$X:c This is inherit tasktracker's X env variable. </li>
* </ul>
*/
- public static final String MAPRED_REDUCE_TASK_ENV =
- "mapred.reduce.child.env";
+ public static final String MAPRED_REDUCE_TASK_ENV = JobContext.REDUCE_ENV;
/**
* Configuration key to set the logging {@link Level} for the map task.
@@ -312,7 +319,7 @@
* OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
*/
public static final String MAPRED_MAP_TASK_LOG_LEVEL =
- "mapred.map.child.log.level";
+ JobContext.MAP_LOG_LEVEL;
/**
* Configuration key to set the logging {@link Level} for the reduce task.
@@ -321,7 +328,7 @@
* OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
*/
public static final String MAPRED_REDUCE_TASK_LOG_LEVEL =
- "mapred.reduce.child.log.level";
+ JobContext.REDUCE_LOG_LEVEL;
/**
* Default logging level for map/reduce tasks.
@@ -403,14 +410,14 @@
*
* @return the user jar for the map-reduce job.
*/
- public String getJar() { return get("mapred.jar"); }
+ public String getJar() { return get(JobContext.JAR); }
/**
* Set the user jar for the map-reduce job.
*
* @param jar the user jar for the map-reduce job.
*/
- public void setJar(String jar) { set("mapred.jar", jar); }
+ public void setJar(String jar) { set(JobContext.JAR, jar); }
/**
* Set the job's jar file by finding an example class location.
@@ -425,7 +432,7 @@
}
public String[] getLocalDirs() throws IOException {
- return getStrings("mapred.local.dir");
+ return getStrings(MRConfig.LOCAL_DIR);
}
public void deleteLocalFiles() throws IOException {
@@ -447,7 +454,7 @@
* local directories.
*/
public Path getLocalPath(String pathString) throws IOException {
- return getLocalPath("mapred.local.dir", pathString);
+ return getLocalPath(MRConfig.LOCAL_DIR, pathString);
}
/**
@@ -456,7 +463,7 @@
* @return the username
*/
public String getUser() {
- return get("user.name");
+ return get(JobContext.USER_NAME);
}
/**
@@ -465,7 +472,7 @@
* @param user the username for this job.
*/
public void setUser(String user) {
- set("user.name", user);
+ set(JobContext.USER_NAME, user);
}
@@ -479,7 +486,7 @@
*
*/
public void setKeepFailedTaskFiles(boolean keep) {
- setBoolean("keep.failed.task.files", keep);
+ setBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, keep);
}
/**
@@ -488,7 +495,7 @@
* @return should the files be kept?
*/
public boolean getKeepFailedTaskFiles() {
- return getBoolean("keep.failed.task.files", false);
+ return getBoolean(JobContext.PRESERVE_FAILED_TASK_FILES, false);
}
/**
@@ -500,7 +507,7 @@
* task names.
*/
public void setKeepTaskFilesPattern(String pattern) {
- set("keep.task.files.pattern", pattern);
+ set(JobContext.PRESERVE_FILES_PATTERN, pattern);
}
/**
@@ -510,7 +517,7 @@
* @return the pattern as a string, if it was set, othewise null.
*/
public String getKeepTaskFilesPattern() {
- return get("keep.task.files.pattern");
+ return get(JobContext.PRESERVE_FILES_PATTERN);
}
/**
@@ -520,7 +527,7 @@
*/
public void setWorkingDirectory(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
- set("mapred.working.dir", dir.toString());
+ set(JobContext.WORKING_DIR, dir.toString());
}
/**
@@ -529,13 +536,13 @@
* @return the directory name.
*/
public Path getWorkingDirectory() {
- String name = get("mapred.working.dir");
+ String name = get(JobContext.WORKING_DIR);
if (name != null) {
return new Path(name);
} else {
try {
Path dir = FileSystem.get(this).getWorkingDirectory();
- set("mapred.working.dir", dir.toString());
+ set(JobContext.WORKING_DIR, dir.toString());
return dir;
} catch (IOException e) {
throw new RuntimeException(e);
@@ -550,14 +557,14 @@
* -1 signifies no limit
*/
public void setNumTasksToExecutePerJvm(int numTasks) {
- setInt("mapred.job.reuse.jvm.num.tasks", numTasks);
+ setInt(JobContext.JVM_NUMTASKS_TORUN, numTasks);
}
/**
* Get the number of tasks that a spawned JVM should execute
*/
public int getNumTasksToExecutePerJvm() {
- return getInt("mapred.job.reuse.jvm.num.tasks", 1);
+ return getInt(JobContext.JVM_NUMTASKS_TORUN, 1);
}
/**
@@ -635,7 +642,7 @@
* @param compress should the map outputs be compressed?
*/
public void setCompressMapOutput(boolean compress) {
- setBoolean("mapred.compress.map.output", compress);
+ setBoolean(JobContext.MAP_OUTPUT_COMPRESS, compress);
}
/**
@@ -645,7 +652,7 @@
* <code>false</code> otherwise.
*/
public boolean getCompressMapOutput() {
- return getBoolean("mapred.compress.map.output", false);
+ return getBoolean(JobContext.MAP_OUTPUT_COMPRESS, false);
}
/**
@@ -657,7 +664,7 @@
public void
setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
setCompressMapOutput(true);
- setClass("mapred.map.output.compression.codec", codecClass,
+ setClass(JobContext.MAP_OUTPUT_COMPRESS_CODEC, codecClass,
CompressionCodec.class);
}
@@ -672,7 +679,7 @@
public Class<? extends CompressionCodec>
getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
- String name = get("mapred.map.output.compression.codec");
+ String name = get(JobContext.MAP_OUTPUT_COMPRESS_CODEC);
if (name != null) {
try {
codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
@@ -692,7 +699,7 @@
* @return the map output key class.
*/
public Class<?> getMapOutputKeyClass() {
- Class<?> retv = getClass("mapred.mapoutput.key.class", null, Object.class);
+ Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
if (retv == null) {
retv = getOutputKeyClass();
}
@@ -707,7 +714,7 @@
* @param theClass the map output key class.
*/
public void setMapOutputKeyClass(Class<?> theClass) {
- setClass("mapred.mapoutput.key.class", theClass, Object.class);
+ setClass(JobContext.MAP_OUTPUT_KEY_CLASS, theClass, Object.class);
}
/**
@@ -718,7 +725,7 @@
* @return the map output value class.
*/
public Class<?> getMapOutputValueClass() {
- Class<?> retv = getClass("mapred.mapoutput.value.class", null,
+ Class<?> retv = getClass(JobContext.MAP_OUTPUT_VALUE_CLASS, null,
Object.class);
if (retv == null) {
retv = getOutputValueClass();
@@ -734,7 +741,7 @@
* @param theClass the map output value class.
*/
public void setMapOutputValueClass(Class<?> theClass) {
- setClass("mapred.mapoutput.value.class", theClass, Object.class);
+ setClass(JobContext.MAP_OUTPUT_VALUE_CLASS, theClass, Object.class);
}
/**
@@ -743,7 +750,7 @@
* @return the key class for the job output data.
*/
public Class<?> getOutputKeyClass() {
- return getClass("mapred.output.key.class",
+ return getClass(JobContext.OUTPUT_KEY_CLASS,
LongWritable.class, Object.class);
}
@@ -753,7 +760,7 @@
* @param theClass the key class for the job output data.
*/
public void setOutputKeyClass(Class<?> theClass) {
- setClass("mapred.output.key.class", theClass, Object.class);
+ setClass(JobContext.OUTPUT_KEY_CLASS, theClass, Object.class);
}
/**
@@ -762,8 +769,8 @@
* @return the {@link RawComparator} comparator used to compare keys.
*/
public RawComparator getOutputKeyComparator() {
- Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class",
- null, RawComparator.class);
+ Class<? extends RawComparator> theClass = getClass(
+ JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
@@ -777,7 +784,7 @@
* @see #setOutputValueGroupingComparator(Class)
*/
public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
- setClass("mapred.output.key.comparator.class",
+ setClass(JobContext.KEY_COMPARATOR,
theClass, RawComparator.class);
}
@@ -798,14 +805,14 @@
*/
public void setKeyFieldComparatorOptions(String keySpec) {
setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
- set("mapred.text.key.comparator.options", keySpec);
+ set(KeyFieldBasedComparator.COMPARATOR_OPTIONS, keySpec);
}
/**
* Get the {@link KeyFieldBasedComparator} options
*/
public String getKeyFieldComparatorOption() {
- return get("mapred.text.key.comparator.options");
+ return get(KeyFieldBasedComparator.COMPARATOR_OPTIONS);
}
/**
@@ -823,14 +830,14 @@
*/
public void setKeyFieldPartitionerOptions(String keySpec) {
setPartitionerClass(KeyFieldBasedPartitioner.class);
- set("mapred.text.key.partitioner.options", keySpec);
+ set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, keySpec);
}
/**
* Get the {@link KeyFieldBasedPartitioner} options
*/
public String getKeyFieldPartitionerOption() {
- return get("mapred.text.key.partitioner.options");
+ return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
}
/**
@@ -841,8 +848,8 @@
* @see #setOutputValueGroupingComparator(Class) for details.
*/
public RawComparator getOutputValueGroupingComparator() {
- Class<? extends RawComparator> theClass = getClass("mapred.output.value.groupfn.class", null,
- RawComparator.class);
+ Class<? extends RawComparator> theClass = getClass(
+ JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}
@@ -876,8 +883,8 @@
* @see #setOutputKeyComparatorClass(Class)
*/
public void setOutputValueGroupingComparator(
- Class<? extends RawComparator> theClass) {
- setClass("mapred.output.value.groupfn.class",
+ Class<? extends RawComparator> theClass) {
+ setClass(JobContext.GROUP_COMPARATOR_CLASS,
theClass, RawComparator.class);
}
@@ -921,7 +928,7 @@
* @return the value class for job outputs.
*/
public Class<?> getOutputValueClass() {
- return getClass("mapred.output.value.class", Text.class, Object.class);
+ return getClass(JobContext.OUTPUT_VALUE_CLASS, Text.class, Object.class);
}
/**
@@ -930,7 +937,7 @@
* @param theClass the value class for job outputs.
*/
public void setOutputValueClass(Class<?> theClass) {
- setClass("mapred.output.value.class", theClass, Object.class);
+ setClass(JobContext.OUTPUT_VALUE_CLASS, theClass, Object.class);
}
/**
@@ -1081,7 +1088,7 @@
* <code>false</code> otherwise.
*/
public boolean getMapSpeculativeExecution() {
- return getBoolean("mapred.map.tasks.speculative.execution", true);
+ return getBoolean(JobContext.MAP_SPECULATIVE, true);
}
/**
@@ -1092,7 +1099,7 @@
* else <code>false</code>.
*/
public void setMapSpeculativeExecution(boolean speculativeExecution) {
- setBoolean("mapred.map.tasks.speculative.execution", speculativeExecution);
+ setBoolean(JobContext.MAP_SPECULATIVE, speculativeExecution);
}
/**
@@ -1104,7 +1111,7 @@
* <code>false</code> otherwise.
*/
public boolean getReduceSpeculativeExecution() {
- return getBoolean("mapred.reduce.tasks.speculative.execution", true);
+ return getBoolean(JobContext.REDUCE_SPECULATIVE, true);
}
/**
@@ -1115,7 +1122,7 @@
* else <code>false</code>.
*/
public void setReduceSpeculativeExecution(boolean speculativeExecution) {
- setBoolean("mapred.reduce.tasks.speculative.execution",
+ setBoolean(JobContext.REDUCE_SPECULATIVE,
speculativeExecution);
}
@@ -1125,7 +1132,7 @@
*
* @return the number of reduce tasks for this job.
*/
- public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
+ public int getNumMapTasks() { return getInt(JobContext.NUM_MAPS, 1); }
/**
* Set the number of map tasks for this job.
@@ -1152,8 +1159,8 @@
* bytes, of input files. However, the {@link FileSystem} blocksize of the
* input files is treated as an upper bound for input splits. A lower bound
* on the split size can be set via
- * <a href="{@docRoot}/../mapred-default.html#mapred.min.split.size">
- * mapred.min.split.size</a>.</p>
+ * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+ * mapreduce.input.fileinputformat.split.minsize</a>.</p>
*
* <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
* you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
@@ -1165,7 +1172,7 @@
* @see FileSystem#getDefaultBlockSize()
* @see FileStatus#getBlockSize()
*/
- public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
+ public void setNumMapTasks(int n) { setInt(JobContext.NUM_MAPS, n); }
/**
* Get configured the number of reduce tasks for this job. Defaults to
@@ -1173,7 +1180,7 @@
*
* @return the number of reduce tasks for this job.
*/
- public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
+ public int getNumReduceTasks() { return getInt(JobContext.NUM_REDUCES, 1); }
/**
* Set the requisite number of reduce tasks for this job.
@@ -1182,8 +1189,8 @@
*
* <p>The right number of reduces seems to be <code>0.95</code> or
* <code>1.75</code> multiplied by (<<i>no. of nodes</i>> *
- * <a href="{@docRoot}/../mapred-default.html#mapred.tasktracker.reduce.tasks.maximum">
- * mapred.tasktracker.reduce.tasks.maximum</a>).
+ * <a href="{@docRoot}/../mapred-default.html#mapreduce.tasktracker.reduce.tasks.maximum">
+ * mapreduce.tasktracker.reduce.tasks.maximum</a>).
* </p>
*
* <p>With <code>0.95</code> all of the reduces can launch immediately and
@@ -1209,17 +1216,17 @@
*
* @param n the number of reduce tasks for this job.
*/
- public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
+ public void setNumReduceTasks(int n) { setInt(JobContext.NUM_REDUCES, n); }
/**
* Get the configured number of maximum attempts that will be made to run a
- * map task, as specified by the <code>mapred.map.max.attempts</code>
+ * map task, as specified by the <code>mapreduce.map.maxattempts</code>
* property. If this property is not already set, the default is 4 attempts.
*
* @return the max number of attempts per map task.
*/
public int getMaxMapAttempts() {
- return getInt("mapred.map.max.attempts", 4);
+ return getInt(JobContext.MAP_MAX_ATTEMPTS, 4);
}
/**
@@ -1229,18 +1236,18 @@
* @param n the number of attempts per map task.
*/
public void setMaxMapAttempts(int n) {
- setInt("mapred.map.max.attempts", n);
+ setInt(JobContext.MAP_MAX_ATTEMPTS, n);
}
/**
* Get the configured number of maximum attempts that will be made to run a
- * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+ * reduce task, as specified by the <code>mapreduce.reduce.maxattempts</code>
* property. If this property is not already set, the default is 4 attempts.
*
* @return the max number of attempts per reduce task.
*/
public int getMaxReduceAttempts() {
- return getInt("mapred.reduce.max.attempts", 4);
+ return getInt(JobContext.REDUCE_MAX_ATTEMPTS, 4);
}
/**
* Expert: Set the number of maximum attempts that will be made to run a
@@ -1249,7 +1256,7 @@
* @param n the number of attempts per reduce task.
*/
public void setMaxReduceAttempts(int n) {
- setInt("mapred.reduce.max.attempts", n);
+ setInt(JobContext.REDUCE_MAX_ATTEMPTS, n);
}
/**
@@ -1259,7 +1266,7 @@
* @return the job's name, defaulting to "".
*/
public String getJobName() {
- return get("mapred.job.name", "");
+ return get(JobContext.JOB_NAME, "");
}
/**
@@ -1268,7 +1275,7 @@
* @param name the job's new name.
*/
public void setJobName(String name) {
- set("mapred.job.name", name);
+ set(JobContext.JOB_NAME, name);
}
/**
@@ -1286,6 +1293,7 @@
*
* @return the session identifier, defaulting to "".
*/
+ @Deprecated
public String getSessionId() {
return get("session.id", "");
}
@@ -1295,6 +1303,7 @@
*
* @param sessionId the new session id.
*/
+ @Deprecated
public void setSessionId(String sessionId) {
set("session.id", sessionId);
}
@@ -1307,7 +1316,7 @@
* @param noFailures maximum no. of failures of a given job per tasktracker.
*/
public void setMaxTaskFailuresPerTracker(int noFailures) {
- setInt("mapred.max.tracker.failures", noFailures);
+ setInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, noFailures);
}
/**
@@ -1318,7 +1327,7 @@
* @return the maximum no. of failures of a given job per tasktracker.
*/
public int getMaxTaskFailuresPerTracker() {
- return getInt("mapred.max.tracker.failures", 4);
+ return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4);
}
/**
@@ -1335,7 +1344,7 @@
* the job being aborted.
*/
public int getMaxMapTaskFailuresPercent() {
- return getInt("mapred.max.map.failures.percent", 0);
+ return getInt(JobContext.MAP_FAILURES_MAX_PERCENT, 0);
}
/**
@@ -1349,7 +1358,7 @@
* the job being aborted.
*/
public void setMaxMapTaskFailuresPercent(int percent) {
- setInt("mapred.max.map.failures.percent", percent);
+ setInt(JobContext.MAP_FAILURES_MAX_PERCENT, percent);
}
/**
@@ -1366,7 +1375,7 @@
* the job being aborted.
*/
public int getMaxReduceTaskFailuresPercent() {
- return getInt("mapred.max.reduce.failures.percent", 0);
+ return getInt(JobContext.REDUCE_FAILURES_MAXPERCENT, 0);
}
/**
@@ -1380,7 +1389,7 @@
* the job being aborted.
*/
public void setMaxReduceTaskFailuresPercent(int percent) {
- setInt("mapred.max.reduce.failures.percent", percent);
+ setInt(JobContext.REDUCE_FAILURES_MAXPERCENT, percent);
}
/**
@@ -1389,7 +1398,7 @@
* @param prio the {@link JobPriority} for this job.
*/
public void setJobPriority(JobPriority prio) {
- set("mapred.job.priority", prio.toString());
+ set(JobContext.PRIORITY, prio.toString());
}
/**
@@ -1398,7 +1407,7 @@
* @return the {@link JobPriority} for this job.
*/
public JobPriority getJobPriority() {
- String prio = get("mapred.job.priority");
+ String prio = get(JobContext.PRIORITY);
if(prio == null) {
return JobPriority.NORMAL;
}
@@ -1411,7 +1420,7 @@
* @return true if some tasks will be profiled
*/
public boolean getProfileEnabled() {
- return getBoolean("mapred.task.profile", false);
+ return getBoolean(JobContext.TASK_PROFILE, false);
}
/**
@@ -1421,7 +1430,7 @@
* @param newValue true means it should be gathered
*/
public void setProfileEnabled(boolean newValue) {
- setBoolean("mapred.task.profile", newValue);
+ setBoolean(JobContext.TASK_PROFILE, newValue);
}
/**
@@ -1433,7 +1442,7 @@
* @return the parameters to pass to the task child to configure profiling
*/
public String getProfileParams() {
- return get("mapred.task.profile.params",
+ return get(JobContext.TASK_PROFILE_PARAMS,
"-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
"verbose=n,file=%s");
}
@@ -1448,7 +1457,7 @@
* @param value the configuration string
*/
public void setProfileParams(String value) {
- set("mapred.task.profile.params", value);
+ set(JobContext.TASK_PROFILE_PARAMS, value);
}
/**
@@ -1457,8 +1466,8 @@
* @return the task ranges
*/
public IntegerRanges getProfileTaskRange(boolean isMap) {
- return getRange((isMap ? "mapred.task.profile.maps" :
- "mapred.task.profile.reduces"), "0-2");
+ return getRange((isMap ? JobContext.NUM_MAP_PROFILES :
+ JobContext.NUM_REDUCE_PROFILES), "0-2");
}
/**
@@ -1468,9 +1477,9 @@
*/
public void setProfileTaskRange(boolean isMap, String newValue) {
// parse the value to make sure it is legal
- new Configuration.IntegerRanges(newValue);
- set((isMap ? "mapred.task.profile.maps" : "mapred.task.profile.reduces"),
- newValue);
+ new Configuration.IntegerRanges(newValue);
+ set((isMap ? JobContext.NUM_MAP_PROFILES : JobContext.NUM_REDUCE_PROFILES),
+ newValue);
}
/**
@@ -1497,7 +1506,7 @@
* @param mDbgScript the script name
*/
public void setMapDebugScript(String mDbgScript) {
- set("mapred.map.task.debug.script", mDbgScript);
+ set(JobContext.MAP_DEBUG_SCRIPT, mDbgScript);
}
/**
@@ -1507,7 +1516,7 @@
* @see #setMapDebugScript(String)
*/
public String getMapDebugScript() {
- return get("mapred.map.task.debug.script");
+ return get(JobContext.MAP_DEBUG_SCRIPT);
}
/**
@@ -1534,7 +1543,7 @@
* @param rDbgScript the script name
*/
public void setReduceDebugScript(String rDbgScript) {
- set("mapred.reduce.task.debug.script", rDbgScript);
+ set(JobContext.REDUCE_DEBUG_SCRIPT, rDbgScript);
}
/**
@@ -1544,7 +1553,7 @@
* @see #setReduceDebugScript(String)
*/
public String getReduceDebugScript() {
- return get("mapred.reduce.task.debug.script");
+ return get(JobContext.REDUCE_DEBUG_SCRIPT);
}
/**
@@ -1556,7 +1565,7 @@
* @see #setJobEndNotificationURI(String)
*/
public String getJobEndNotificationURI() {
- return get("job.end.notification.url");
+ return get(JobContext.END_NOTIFICATION_URL);
}
/**
@@ -1576,7 +1585,7 @@
* JobCompletionAndChaining">Job Completion and Chaining</a>
*/
public void setJobEndNotificationURI(String uri) {
- set("job.end.notification.url", uri);
+ set(JobContext.END_NOTIFICATION_URL, uri);
}
/**
@@ -1585,9 +1594,9 @@
* <p>
* When a job starts, a shared directory is created at location
* <code>
- * ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+ * ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/ </code>.
* This directory is exposed to the users through
- * <code>job.local.dir </code>.
+ * <code>mapreduce.job.local.dir </code>.
* So, the tasks can use this space
* as scratch space and share files among them. </p>
* This value is available as System property also.
@@ -1595,35 +1604,71 @@
* @return The localized job specific shared directory
*/
public String getJobLocalDir() {
- return get(TaskTracker.JOB_LOCAL_DIR);
+ return get(JobContext.JOB_LOCAL_DIR);
}
+ /**
+ * Get memory required to run a map task of the job, in MB.
+ *
+ * If a value is specified in the configuration, it is returned.
+ * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
+ * <p/>
+ * For backward compatibility, if the job configuration sets the
+ * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+ * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
+ * after converting it from bytes to MB.
+ * @return memory required to run a map task of the job, in MB,
+ * or {@link #DISABLED_MEMORY_LIMIT} if unset.
+ */
public long getMemoryForMapTask() {
- if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
- long val = getLong(
- MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
- return (val == DISABLED_MEMORY_LIMIT) ? val :
- ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+ long value = getDeprecatedMemoryValue();
+ if (value == DISABLED_MEMORY_LIMIT) {
+ value = normalizeMemoryConfigValue(
+ getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+ DISABLED_MEMORY_LIMIT));
}
- return getLong(
- JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
- DISABLED_MEMORY_LIMIT);
+ return value;
}
public void setMemoryForMapTask(long mem) {
setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
}
+ /**
+ * Get memory required to run a reduce task of the job, in MB.
+ *
+ * If a value is specified in the configuration, it is returned.
+ * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
+ * <p/>
+ * For backward compatibility, if the job configuration sets the
+ * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+ * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
+ * after converting it from bytes to MB.
+ * @return memory required to run a reduce task of the job, in MB,
+ * or {@link #DISABLED_MEMORY_LIMIT} if unset.
+ */
public long getMemoryForReduceTask() {
- if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
- long val = getLong(
- MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
- return (val == DISABLED_MEMORY_LIMIT) ? val :
- ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+ long value = getDeprecatedMemoryValue();
+ if (value == DISABLED_MEMORY_LIMIT) {
+ value = normalizeMemoryConfigValue(
+ getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+ DISABLED_MEMORY_LIMIT));
}
- return getLong(
- JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
- DISABLED_MEMORY_LIMIT);
+ return value;
+ }
+
+ // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
+ // converted into MBs.
+ // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
+ // value.
+ private long getDeprecatedMemoryValue() {
+ long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY,
+ DISABLED_MEMORY_LIMIT);
+ oldValue = normalizeMemoryConfigValue(oldValue);
+ if (oldValue != DISABLED_MEMORY_LIMIT) {
+ oldValue /= (1024*1024);
+ }
+ return oldValue;
}
public void setMemoryForReduceTask(long mem) {
@@ -1637,7 +1682,7 @@
* @return name of the queue
*/
public String getQueueName() {
- return get("mapred.job.queue.name", DEFAULT_QUEUE_NAME);
+ return get(JobContext.QUEUE_NAME, DEFAULT_QUEUE_NAME);
}
/**
@@ -1646,7 +1691,7 @@
* @param queueName Name of the queue
*/
public void setQueueName(String queueName) {
- set("mapred.job.queue.name", queueName);
+ set(JobContext.QUEUE_NAME, queueName);
}
/**
@@ -1728,18 +1773,21 @@
/**
- * The maximum amount of memory any task of this job will use. See
+ * Get the memory required to run a task of this job, in bytes. See
* {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
* <p/>
- * mapred.task.maxvmem is split into
- * mapred.job.map.memory.mb
- * and mapred.job.map.memory.mb,mapred
- * each of the new key are set
- * as mapred.task.maxvmem / 1024
- * as new values are in MB
+ * This method is deprecated. Now, different memory limits can be
+ * set for map and reduce tasks of a job, in MB.
+ * <p/>
+ * For backward compatibility, if the job configuration sets the
+ * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+ * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned.
+ * Otherwise, this method will return the larger of the values returned by
+ * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
+ * after converting them into bytes.
*
- * @return The maximum amount of memory any task of this job will use, in
- * bytes.
+ * @return Memory required to run a task of this job, in bytes,
+ * or {@link #DISABLED_MEMORY_LIMIT}, if unset.
* @see #setMaxVirtualMemoryForTask(long)
* @deprecated Use {@link #getMemoryForMapTask()} and
* {@link #getMemoryForReduceTask()}
@@ -1750,24 +1798,16 @@
"getMaxVirtualMemoryForTask() is deprecated. " +
"Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
- if (get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
- if (get(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY) != null || get(
- JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY) != null) {
- long val = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
- if (val == JobConf.DISABLED_MEMORY_LIMIT) {
- return val;
- } else {
- if (val < 0) {
- return JobConf.DISABLED_MEMORY_LIMIT;
- }
- return val * 1024 * 1024;
- //Convert MB to byte as new value is in
- // MB and old deprecated method returns bytes
- }
+ long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+ value = normalizeMemoryConfigValue(value);
+ if (value == DISABLED_MEMORY_LIMIT) {
+ value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
+ value = normalizeMemoryConfigValue(value);
+ if (value != DISABLED_MEMORY_LIMIT) {
+ value *= 1024*1024;
}
}
-
- return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+ return value;
}
/**
@@ -1775,8 +1815,8 @@
* {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
* <p/>
* mapred.task.maxvmem is split into
- * mapred.job.map.memory.mb
- * and mapred.job.map.memory.mb,mapred
+ * mapreduce.map.memory.mb
+ * and mapreduce.map.memory.mb,mapred
* each of the new key are set
* as mapred.task.maxvmem / 1024
* as new values are in MB
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobContext.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -23,36 +24,18 @@
* @deprecated Use {@link org.apache.hadoop.mapreduce.JobContext} instead.
*/
@Deprecated
-public class JobContext extends org.apache.hadoop.mapreduce.JobContext {
- private JobConf job;
- private Progressable progress;
-
- JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId,
- Progressable progress) {
- super(conf, jobId);
- this.job = conf;
- this.progress = progress;
- }
-
- JobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID jobId) {
- this(conf, jobId, Reporter.NULL);
- }
-
+public interface JobContext extends org.apache.hadoop.mapreduce.JobContext {
/**
* Get the job Configuration
*
* @return JobConf
*/
- public JobConf getJobConf() {
- return job;
- }
+ public JobConf getJobConf();
/**
* Get the progress mechanism for reporting progress.
*
* @return progress mechanism
*/
- public Progressable getProgressible() {
- return progress;
- }
+ public Progressable getProgressible();
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Sat Nov 28 20:26:01 2009
@@ -100,8 +100,8 @@
String uri = conf.getJobEndNotificationURI();
if (uri != null) {
// +1 to make logic for first notification identical to a retry
- int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
- long retryInterval = conf.getInt("job.end.retry.interval", 30000);
+ int retryAttempts = conf.getInt(JobContext.END_NOTIFICATION_RETRIES, 0) + 1;
+ long retryInterval = conf.getInt(JobContext.END_NOTIFICATION_RETRIE_INTERVAL, 30000);
if (uri.contains("$jobId")) {
uri = uri.replace("$jobId", status.getJobID().toString());
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Nov 28 20:26:01 2009
@@ -18,13 +18,15 @@
package org.apache.hadoop.mapred;
import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -41,10 +43,29 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -52,7 +73,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.fs.FSDataOutputStream;
/*************************************************************
* JobInProgress maintains all the info for keeping
@@ -99,7 +120,7 @@
int failedMapTasks = 0;
int failedReduceTasks = 0;
- private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+ static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
int completedMapsForReduceSlowstart = 0;
// runningMapTasks include speculative tasks, so we need to capture
@@ -119,6 +140,8 @@
JobPriority priority = JobPriority.NORMAL;
protected JobTracker jobtracker;
+
+ JobHistory jobHistory;
// NetworkTopology Node to the set of TIPs
Map<Node, List<TaskInProgress>> nonRunningMapCache;
@@ -151,7 +174,7 @@
* {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
* schedule any available map tasks for this job, including speculative tasks.
*/
- private int anyCacheLevel;
+ int anyCacheLevel;
/**
* A special value indicating that
@@ -180,7 +203,7 @@
new TreeMap<String, Integer>();
//Confine estimation algorithms to an "oracle" class that JIP queries.
- private ResourceEstimator resourceEstimator;
+ ResourceEstimator resourceEstimator;
long startTime;
long launchTime;
@@ -189,20 +212,20 @@
// Indicates how many times the job got restarted
private final int restartCount;
- private JobConf conf;
+ JobConf conf;
protected AtomicBoolean tasksInited = new AtomicBoolean(false);
private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
- private LocalFileSystem localFs;
- private FileSystem fs;
- private JobID jobId;
+ LocalFileSystem localFs;
+ FileSystem fs;
+ JobID jobId;
private boolean hasSpeculativeMaps;
private boolean hasSpeculativeReduces;
- private long inputLength = 0;
+ long inputLength = 0;
- private Counters jobCounters = new Counters();
+ Counters jobCounters = new Counters();
- private MetricsRecord jobMetrics;
+ MetricsRecord jobMetrics;
// Maximum no. of fetch-failure notifications after which map task is killed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -219,9 +242,9 @@
private Object schedulingInfo;
//thresholds for speculative execution
- private float slowTaskThreshold;
- private float speculativeCap;
- private float slowNodeThreshold; //standard deviations
+ float slowTaskThreshold;
+ float speculativeCap;
+ float slowNodeThreshold; //standard deviations
//Statistics are maintained for a couple of things
//mapTaskStats is used for maintaining statistics about
@@ -303,17 +326,26 @@
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP,
this.profile.getUser(), this.profile.getJobName(),
this.profile.getJobFile(), "");
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
this.slowTaskThreshold = Math.max(0.0f,
- conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+ conf.getFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
this.speculativeCap = conf.getFloat(
- "mapred.speculative.execution.speculativeCap",0.1f);
+ JobContext.SPECULATIVECAP,0.1f);
this.slowNodeThreshold = conf.getFloat(
- "mapred.speculative.execution.slowNodeThreshold",1.0f);
+ JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
this.jobSetupCleanupNeeded = conf.getBoolean(
- "mapred.committer.job.setup.cleanup.needed", true);
+ JobContext.SETUP_CLEANUP_NEEDED, true);
+ if (tracker != null) { // Some mock tests have null tracker
+ this.jobHistory = tracker.getJobHistory();
+ }
+ }
+
+ JobInProgress() {
+ restartCount = 0;
+ jobSetupCleanupNeeded = false;
}
/**
@@ -321,18 +353,13 @@
* to the tracker.
*/
public JobInProgress(JobID jobid, JobTracker jobtracker,
- JobConf default_conf) throws IOException {
- this(jobid, jobtracker, default_conf, 0);
- }
-
- public JobInProgress(JobID jobid, JobTracker jobtracker,
JobConf default_conf, int rCount) throws IOException {
this.restartCount = rCount;
this.jobId = jobid;
String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
+ jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
this.jobtracker = jobtracker;
-
+ this.jobHistory = jobtracker.getJobHistory();
this.startTime = System.currentTimeMillis();
this.localFs = jobtracker.getLocalFileSystem();
@@ -354,6 +381,7 @@
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP,
profile.getUser(), profile.getJobName(), profile.getJobFile(),
profile.getURL().toString());
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
status.setStartTime(startTime);
this.status.setJobPriority(this.priority);
@@ -367,7 +395,7 @@
this.numReduceTasks = conf.getNumReduceTasks();
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
- JobContext jobContext = new JobContext(conf, jobId);
+ JobContext jobContext = new JobContextImpl(conf, jobId);
this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
@@ -379,10 +407,8 @@
this.jobMetrics.setTag("sessionId", conf.getSessionId());
this.jobMetrics.setTag("jobName", conf.getJobName());
this.jobMetrics.setTag("jobId", jobid.toString());
- if (!hasRestarted()) { //This is temporary until we fix the restart model
- hasSpeculativeMaps = conf.getMapSpeculativeExecution();
- hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
- }
+ hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+ hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
this.maxLevel = jobtracker.getNumTaskCacheLevels();
this.anyCacheLevel = this.maxLevel+1;
this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -398,11 +424,11 @@
this.nonRunningReduces = new LinkedList<TaskInProgress>();
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.slowTaskThreshold = Math.max(0.0f,
- conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+ conf.getFloat(JobContext.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
this.speculativeCap = conf.getFloat(
- "mapred.speculative.execution.speculativeCap",0.1f);
+ JobContext.SPECULATIVECAP,0.1f);
this.slowNodeThreshold = conf.getFloat(
- "mapred.speculative.execution.slowNodeThreshold",1.0f);
+ JobContext.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
}
@@ -447,7 +473,7 @@
}
Map<Node, List<TaskInProgress>> createCache(
- JobClient.RawSplit[] splits, int maxLevel) {
+ Job.RawSplit[] splits, int maxLevel) {
Map<Node, List<TaskInProgress>> cache =
new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
@@ -491,10 +517,6 @@
public boolean inited() {
return tasksInited.get();
}
-
- boolean hasRestarted() {
- return restartCount > 0;
- }
/**
* Get the number of slots required to run a single map task-attempt.
@@ -551,17 +573,22 @@
LOG.info("Initializing " + jobId);
- logToJobHistory();
+ logSubmissionToJobHistory();
// log the job priority
setPriority(this.priority);
//
+ // generate security keys needed by Tasks
+ //
+ generateJobTokens(jobtracker.getSystemDirectoryForJob(jobId));
+
+ //
// read input splits and create a map per a split
//
String jobFile = profile.getJobFile();
- JobClient.RawSplit[] splits = createSplits();
+ Job.RawSplit[] splits = createSplits();
numMapTasks = splits.length;
checkTaskLimits();
@@ -584,7 +611,7 @@
// we should start scheduling reduces
completedMapsForReduceSlowstart =
(int)Math.ceil(
- (conf.getFloat("mapred.reduce.slowstart.completed.maps",
+ (conf.getFloat(JobContext.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));
@@ -599,8 +626,13 @@
}
tasksInited.set(true);
- JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
- numMapTasks, numReduceTasks);
+ JobInitedEvent jie = new JobInitedEvent(
+ profile.getJobID(), this.launchTime,
+ numMapTasks, numReduceTasks,
+ JobStatus.getJobRunState(JobStatus.PREP));
+
+ jobHistory.logEvent(jie, jobId);
+
}
// Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
@@ -623,18 +655,26 @@
setupComplete();
}
- void logToJobHistory() throws IOException {
+ void logSubmissionToJobHistory() throws IOException {
// log job info
- JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
- this.startTime, hasRestarted());
+ String username = conf.getUser();
+ if (username == null) { username = ""; }
+ String jobname = conf.getJobName();
+ if (jobname == null) { jobname = ""; }
+ setUpLocalizedJobConf(conf, jobId);
+ jobHistory.setupEventWriter(jobId, conf);
+ JobSubmittedEvent jse = new JobSubmittedEvent(jobId, jobname, username,
+ this.startTime, jobFile.toString());
+ jobHistory.logEvent(jse, jobId);
+
}
- JobClient.RawSplit[] createSplits() throws IOException {
+ Job.RawSplit[] createSplits() throws IOException {
DataInputStream splitFile =
- fs.open(new Path(conf.get("mapred.job.split.file")));
- JobClient.RawSplit[] splits;
+ fs.open(new Path(conf.get(JobContext.SPLIT_FILE)));
+ Job.RawSplit[] splits;
try {
- splits = JobClient.readSplitFile(splitFile);
+ splits = Job.readSplitFile(splitFile);
} finally {
splitFile.close();
}
@@ -655,7 +695,7 @@
}
}
- synchronized void createMapTasks(String jobFile, JobClient.RawSplit[] splits) {
+ synchronized void createMapTasks(String jobFile, Job.RawSplit[] splits) {
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
@@ -690,7 +730,7 @@
// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
- JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+ Job.RawSplit emptySplit = new Job.RawSplit();
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks, 1);
cleanup[0].setJobCleanupTask();
@@ -715,11 +755,14 @@
setup[1].setJobSetupTask();
}
- private void setupComplete() {
+ void setupComplete() {
status.setSetupProgress(1.0f);
if (this.status.getRunState() == JobStatus.PREP) {
- this.status.setRunState(JobStatus.RUNNING);
- JobHistory.JobInfo.logStarted(profile.getJobID());
+ changeStateTo(JobStatus.RUNNING);
+ JobStatusChangedEvent jse =
+ new JobStatusChangedEvent(profile.getJobID(),
+ JobStatus.getJobRunState(JobStatus.RUNNING));
+ jobHistory.logEvent(jse, profile.getJobID());
}
}
@@ -767,6 +810,7 @@
return numReduceTasks - runningReduceTasks - failedReduceTIPs -
finishedReduceTasks + speculativeReduceTasks;
}
+
public synchronized int getNumSlotsPerTask(TaskType taskType) {
if (taskType == TaskType.MAP) {
return numSlotsPerMap;
@@ -787,7 +831,11 @@
this.priority = priority;
status.setJobPriority(priority);
// log and change to the job's priority
- JobHistory.JobInfo.logJobPriority(jobId, priority);
+ JobPriorityChangeEvent prEvent =
+ new JobPriorityChangeEvent(jobId, priority);
+
+ jobHistory.logEvent(prEvent, jobId);
+
}
}
@@ -796,7 +844,11 @@
// log and change to the job's start/launch time
this.startTime = startTime;
this.launchTime = launchTime;
- JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime);
+ JobInfoChangeEvent event =
+ new JobInfoChangeEvent(jobId, startTime, launchTime);
+
+ jobHistory.logEvent(event, jobId);
+
}
/**
@@ -1303,6 +1355,16 @@
Task result = tip.getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+ if (jobFailed) {
+ result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+ .State.FAILED);
+ } else if (jobKilled) {
+ result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
+ .State.KILLED);
+ } else {
+ result.setJobCleanupTaskState(org.apache.hadoop.mapreduce
+ .JobStatus.State.SUCCEEDED);
+ }
}
return result;
}
@@ -1476,18 +1538,18 @@
final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
// keeping the earlier ordering intact
- String name;
+ TaskType name;
String splits = "";
Enum counter = null;
if (tip.isJobSetupTask()) {
launchedSetup = true;
- name = Values.SETUP.name();
+ name = TaskType.JOB_SETUP;
} else if (tip.isJobCleanupTask()) {
launchedCleanup = true;
- name = Values.CLEANUP.name();
+ name = TaskType.JOB_CLEANUP;
} else if (tip.isMapTask()) {
++runningMapTasks;
- name = Values.MAP.name();
+ name = TaskType.MAP;
counter = JobCounter.TOTAL_LAUNCHED_MAPS;
splits = tip.getSplitNodes();
if (tip.isSpeculating()) {
@@ -1498,7 +1560,7 @@
metrics.launchMap(id);
} else {
++runningReduceTasks;
- name = Values.REDUCE.name();
+ name = TaskType.REDUCE;
counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
if (tip.isSpeculating()) {
speculativeReduceTasks++;
@@ -1510,8 +1572,12 @@
// Note that the logs are for the scheduled tasks only. Tasks that join on
// restart has already their logs in place.
if (tip.isFirstAttempt(id)) {
- JobHistory.Task.logStarted(tip.getTIPId(), name,
- tip.getExecStartTime(), splits);
+ TaskStartedEvent tse = new TaskStartedEvent(tip.getTIPId(),
+ tip.getExecStartTime(),
+ name, splits);
+
+ jobHistory.logEvent(tse, tip.getJob().jobId);
+
}
if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
jobCounters.incrCounter(counter, 1);
@@ -1551,7 +1617,7 @@
}
}
- static String convertTrackerNameToHostName(String trackerName) {
+ public static String convertTrackerNameToHostName(String trackerName) {
// Ugly!
// Convert the trackerName to it's host name
int indexOfColon = trackerName.indexOf(":");
@@ -1605,8 +1671,10 @@
long now = System.currentTimeMillis();
FallowSlotInfo info = map.get(taskTracker);
+ int reservedSlots = 0;
if (info == null) {
info = new FallowSlotInfo(now, numSlots);
+ reservedSlots = numSlots;
} else {
// Increment metering info if the reservation is changing
if (info.getNumSlots() != numSlots) {
@@ -1618,11 +1686,19 @@
jobCounters.incrCounter(counter, fallowSlotMillis);
// Update
+ reservedSlots = numSlots - info.getNumSlots();
info.setTimestamp(now);
info.setNumSlots(numSlots);
}
}
map.put(taskTracker, info);
+ if (type == TaskType.MAP) {
+ jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+ }
+ else {
+ jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+ }
+ jobtracker.incrementReservations(type, reservedSlots);
}
public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
@@ -1648,6 +1724,14 @@
jobCounters.incrCounter(counter, fallowSlotMillis);
map.remove(taskTracker);
+ if (type == TaskType.MAP) {
+ jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+ }
+ else {
+ jobtracker.getInstrumentation().decReservedReduceSlots(
+ info.getNumSlots());
+ }
+ jobtracker.decrementReservations(type, info.getNumSlots());
}
public int getNumReservedTaskTrackersForMaps() {
@@ -1981,7 +2065,9 @@
String taskTrackerName = tts.getTrackerName();
String taskTrackerHost = tts.getHost();
if (numMapTasks == 0) {
- LOG.info("No maps to schedule for " + profile.getJobID());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("No maps to schedule for " + profile.getJobID());
+ }
return -1;
}
@@ -2171,7 +2257,9 @@
String taskTrackerName = tts.getTrackerName();
String taskTrackerHost = tts.getHost();
if (numReduceTasks == 0) {
- LOG.info("No reduces to schedule for " + profile.getJobID());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("No reduces to schedule for " + profile.getJobID());
+ }
return -1;
}
TaskInProgress tip = null;
@@ -2457,35 +2545,45 @@
TaskTrackerStatus ttStatus =
this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
- String taskType = getTaskType(tip);
+ TaskType taskType = getTaskType(tip);
+
+ TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
+ status.getTaskID(), taskType, status.getStartTime(),
+ status.getTaskTracker(), ttStatus.getHttpPort());
+
+ jobHistory.logEvent(tse, status.getTaskID().getJobID());
+
+
if (status.getIsMap()){
- JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(),
- status.getTaskTracker(),
- ttStatus.getHttpPort(),
- taskType);
- JobHistory.MapAttempt.logFinished(status.getTaskID(),
- status.getMapFinishTime(),
- status.getFinishTime(),
- trackerHostname, taskType,
- status.getStateString(),
- status.getCounters());
+ MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
+ status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(),
+ status.getMapFinishTime(),
+ status.getFinishTime(), trackerHostname,
+ status.getStateString(),
+ new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+
+ jobHistory.logEvent(mfe, status.getTaskID().getJobID());
+
}else{
- JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(),
- status.getTaskTracker(),
- ttStatus.getHttpPort(),
- taskType);
- JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
- status.getSortFinishTime(), status.getFinishTime(),
- trackerHostname,
- taskType,
- status.getStateString(),
- status.getCounters());
- }
- JobHistory.Task.logFinished(tip.getTIPId(),
- taskType,
- tip.getExecFinishTime(),
- status.getCounters());
-
+ ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
+ status.getTaskID(), taskType, TaskStatus.State.SUCCEEDED.toString(),
+ status.getShuffleFinishTime(),
+ status.getSortFinishTime(), status.getFinishTime(),
+ trackerHostname, status.getStateString(),
+ new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+
+ jobHistory.logEvent(rfe, status.getTaskID().getJobID());
+
+ }
+
+ TaskFinishedEvent tfe = new TaskFinishedEvent(tip.getTIPId(),
+ tip.getExecFinishTime(), taskType,
+ TaskStatus.State.SUCCEEDED.toString(),
+ new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
+
+ jobHistory.logEvent(tfe, tip.getJob().getJobID());
+
+
if (tip.isJobSetupTask()) {
// setup task has finished. kill the extra setup tip
killSetupTip(!tip.isMapTask());
@@ -2591,7 +2689,32 @@
public float getSlowTaskThreshold() {
return slowTaskThreshold;
}
-
+
+ /**
+ * Job state change must happen thru this call
+ */
+ private void changeStateTo(int newState) {
+ int oldState = this.status.getRunState();
+ if (oldState == newState) {
+ return; //old and new states are same
+ }
+ this.status.setRunState(newState);
+
+ //update the metrics
+ if (oldState == JobStatus.PREP) {
+ this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+ } else if (oldState == JobStatus.RUNNING) {
+ this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+ }
+
+ if (newState == JobStatus.PREP) {
+ this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+ } else if (newState == JobStatus.RUNNING) {
+ this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+ }
+
+ }
+
/**
* The job is done since all it's component tasks are either
* successful or have failed.
@@ -2603,7 +2726,7 @@
//
if (this.status.getRunState() == JobStatus.RUNNING ||
this.status.getRunState() == JobStatus.PREP) {
- this.status.setRunState(JobStatus.SUCCEEDED);
+ changeStateTo(JobStatus.SUCCEEDED);
this.status.setCleanupProgress(1.0f);
if (maps.length == 0) {
this.status.setMapProgress(1.0f);
@@ -2621,10 +2744,18 @@
JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
// Log job-history
- JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime,
- this.finishedMapTasks,
- this.finishedReduceTasks, failedMapTasks,
- failedReduceTasks, getCounters());
+ JobFinishedEvent jfe =
+ new JobFinishedEvent(this.status.getJobID(),
+ this.finishTime,
+ this.finishedMapTasks,this.finishedReduceTasks, failedMapTasks,
+ failedReduceTasks,
+ new org.apache.hadoop.mapreduce.Counters(getMapCounters()),
+ new org.apache.hadoop.mapreduce.Counters(getReduceCounters()),
+ new org.apache.hadoop.mapreduce.Counters(getCounters()));
+
+ jobHistory.logEvent(jfe, this.status.getJobID());
+ jobHistory.closeWriter(this.status.getJobID());
+
// Note that finalize will close the job history handles which garbage collect
// might try to finalize
garbageCollect();
@@ -2644,30 +2775,34 @@
this.status.setFinishTime(this.finishTime);
if (jobTerminationState == JobStatus.FAILED) {
- this.status.setRunState(JobStatus.FAILED);
-
- // Log the job summary
- JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
-
- // Log to job-history
- JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime,
- this.finishedMapTasks,
- this.finishedReduceTasks);
+ changeStateTo(JobStatus.FAILED);
} else {
- this.status.setRunState(JobStatus.KILLED);
+ changeStateTo(JobStatus.KILLED);
+ }
+ // Log the job summary
+ JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
- // Log the job summary
- JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+ JobUnsuccessfulCompletionEvent failedEvent =
+ new JobUnsuccessfulCompletionEvent(this.status.getJobID(),
+ finishTime,
+ this.finishedMapTasks,
+ this.finishedReduceTasks,
+ JobStatus.getJobRunState(jobTerminationState));
+
+ jobHistory.logEvent(failedEvent, this.status.getJobID());
+ jobHistory.closeWriter(this.status.getJobID());
- // Log to job-history
- JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime,
- this.finishedMapTasks,
- this.finishedReduceTasks);
- }
garbageCollect();
-
+
jobtracker.getInstrumentation().terminateJob(
this.conf, this.status.getJobID());
+ if (jobTerminationState == JobStatus.FAILED) {
+ jobtracker.getInstrumentation().failedJob(
+ this.conf, this.status.getJobID());
+ } else {
+ jobtracker.getInstrumentation().killedJob(
+ this.conf, this.status.getJobID());
+ }
}
}
@@ -2893,28 +3028,18 @@
List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
String diagInfo = taskDiagnosticInfo == null ? "" :
StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
- String taskType = getTaskType(tip);
- if (taskStatus.getIsMap()) {
- JobHistory.MapAttempt.logStarted(taskid, startTime,
- taskTrackerName, taskTrackerPort, taskType);
- if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
- JobHistory.MapAttempt.logFailed(taskid, finishTime,
- taskTrackerHostName, diagInfo, taskType);
- } else {
- JobHistory.MapAttempt.logKilled(taskid, finishTime,
- taskTrackerHostName, diagInfo, taskType);
- }
- } else {
- JobHistory.ReduceAttempt.logStarted(taskid, startTime,
- taskTrackerName, taskTrackerPort, taskType);
- if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
- JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
- taskTrackerHostName, diagInfo, taskType);
- } else {
- JobHistory.ReduceAttempt.logKilled(taskid, finishTime,
- taskTrackerHostName, diagInfo, taskType);
- }
- }
+ TaskType taskType = getTaskType(tip);
+ TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
+ taskid, taskType, startTime, taskTrackerName, taskTrackerPort);
+
+ jobHistory.logEvent(tse, taskid.getJobID());
+
+ TaskAttemptUnsuccessfulCompletionEvent tue =
+ new TaskAttemptUnsuccessfulCompletionEvent(taskid,
+ taskType, taskStatus.getRunState().toString(),
+ finishTime,
+ taskTrackerHostName, diagInfo);
+ jobHistory.logEvent(tue, taskid.getJobID());
// After this, try to assign tasks with the one after this, so that
// the failed task goes to the end of the list.
@@ -2955,10 +3080,13 @@
if (killJob) {
LOG.info("Aborting job " + profile.getJobID());
- JobHistory.Task.logFailed(tip.getTIPId(),
- taskType,
- finishTime,
- diagInfo);
+ TaskFailedEvent tfe =
+ new TaskFailedEvent(tip.getTIPId(), finishTime, taskType, diagInfo,
+ TaskStatus.State.FAILED.toString(),
+ null);
+
+ jobHistory.logEvent(tfe, tip.getJob().getJobID());
+
if (tip.isJobCleanupTask()) {
// kill the other tip
if (tip.isMapTask()) {
@@ -3042,9 +3170,14 @@
updateTaskStatus(tip, status);
boolean isComplete = tip.isComplete();
if (wasComplete && !isComplete) { // mark a successful tip as failed
- String taskType = getTaskType(tip);
- JobHistory.Task.logFailed(tip.getTIPId(), taskType,
- tip.getExecFinishTime(), reason, taskid);
+ TaskType taskType = getTaskType(tip);
+ TaskFailedEvent tfe =
+ new TaskFailedEvent(tip.getTIPId(), tip.getExecFinishTime(), taskType,
+ reason, TaskStatus.State.FAILED.toString(),
+ taskid);
+
+ jobHistory.logEvent(tfe, tip.getJob().getJobID());
+
}
}
@@ -3232,15 +3365,15 @@
/**
* Get the task type for logging it to {@link JobHistory}.
*/
- private String getTaskType(TaskInProgress tip) {
+ private TaskType getTaskType(TaskInProgress tip) {
if (tip.isJobCleanupTask()) {
- return Values.CLEANUP.name();
+ return TaskType.JOB_CLEANUP;
} else if (tip.isJobSetupTask()) {
- return Values.SETUP.name();
+ return TaskType.JOB_SETUP;
} else if (tip.isMapTask()) {
- return Values.MAP.name();
+ return TaskType.MAP;
} else {
- return Values.REDUCE.name();
+ return TaskType.REDUCE;
}
}
@@ -3337,4 +3470,73 @@
);
}
}
+
+ /**
+ * Creates the localized copy of job conf
+ * @param jobConf
+ * @param id
+ */
+ void setUpLocalizedJobConf(JobConf jobConf,
+ org.apache.hadoop.mapreduce.JobID id) {
+ String localJobFilePath = jobtracker.getLocalJobFilePath(id);
+ File localJobFile = new File(localJobFilePath);
+ FileOutputStream jobOut = null;
+ try {
+ jobOut = new FileOutputStream(localJobFile);
+ jobConf.writeXml(jobOut);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Job conf for " + id + " stored at "
+ + localJobFile.getAbsolutePath());
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to store job conf on the local filesystem ", ioe);
+ } finally {
+ if (jobOut != null) {
+ try {
+ jobOut.close();
+ } catch (IOException ie) {
+ LOG.info("Failed to close the job configuration file "
+ + StringUtils.stringifyException(ie));
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes localized copy of job conf
+ */
+ void cleanupLocalizedJobConf(org.apache.hadoop.mapreduce.JobID id) {
+ String localJobFilePath = jobtracker.getLocalJobFilePath(id);
+ File f = new File (localJobFilePath);
+ LOG.info("Deleting localized job conf at " + f);
+ if (!f.delete()) {
+ LOG.debug("Failed to delete file " + f);
+ }
+ }
+
+ /**
+ * generate keys and save it into the file
+ * @param jobDir
+ * @throws IOException
+ */
+ private void generateJobTokens(Path jobDir) throws IOException{
+ Path keysFile = new Path(jobDir, JobTokens.JOB_TOKEN_FILENAME);
+ FSDataOutputStream os = fs.create(keysFile);
+ //create JobTokens file and add key to it
+ JobTokens jt = new JobTokens();
+ byte [] key;
+ try {
+ // new key
+ key = SecureShuffleUtils.getNewEncodedKey();
+ } catch (java.security.GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ // remember the key
+ jt.setShuffleJobToken(key);
+ // other keys..
+ jt.write(os);
+ os.close();
+ LOG.debug("jobTokens generated and stored in "+ keysFile.toUri().getPath());
+ }
+
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobPriority.java Sat Nov 28 20:26:01 2009
@@ -19,8 +19,9 @@
/**
* Used to describe the priority of the running job.
- *
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.JobPriority} instead
*/
+@Deprecated
public enum JobPriority {
VERY_HIGH,
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueClient.java Sat Nov 28 20:26:01 2009
@@ -18,41 +18,46 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.util.List;
+import java.util.ArrayList;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+
/**
- * <code>JobQueueClient</code> is interface provided to the user in order
- * to get JobQueue related information from the {@link JobTracker}
+ * <code>JobQueueClient</code> is interface provided to the user in order to get
+ * JobQueue related information from the {@link JobTracker}
*
- * It provides the facility to list the JobQueues present and ability to
- * view the list of jobs within a specific JobQueue
+ * It provides the facility to list the JobQueues present and ability to view
+ * the list of jobs within a specific JobQueue
*
-**/
+ **/
+
+class JobQueueClient extends Configured implements Tool {
-class JobQueueClient extends Configured implements Tool {
-
JobClient jc;
-
+
public JobQueueClient() {
}
-
+
public JobQueueClient(JobConf conf) throws IOException {
setConf(conf);
}
-
+
private void init(JobConf conf) throws IOException {
setConf(conf);
jc = new JobClient(conf);
}
-
+
@Override
public int run(String[] argv) throws Exception {
int exitcode = -1;
-
- if(argv.length < 1){
+
+ if (argv.length < 1) {
displayUsage("");
return exitcode;
}
@@ -61,102 +66,130 @@
boolean displayQueueInfoWithJobs = false;
boolean displayQueueInfoWithoutJobs = false;
boolean displayQueueAclsInfoForCurrentUser = false;
-
- if("-list".equals(cmd)){
+
+ if ("-list".equals(cmd)) {
displayQueueList = true;
- }else if("-showacls".equals(cmd)) {
+ } else if ("-showacls".equals(cmd)) {
displayQueueAclsInfoForCurrentUser = true;
- }else if("-info".equals(cmd)){
- if(argv.length == 2 && !(argv[1].equals("-showJobs"))) {
+ } else if ("-info".equals(cmd)) {
+ if (argv.length == 2 && !(argv[1].equals("-showJobs"))) {
displayQueueInfoWithoutJobs = true;
- } else if(argv.length == 3){
- if(argv[2].equals("-showJobs")){
+ } else if (argv.length == 3) {
+ if (argv[2].equals("-showJobs")) {
displayQueueInfoWithJobs = true;
- }else {
+ } else {
displayUsage(cmd);
return exitcode;
}
- }else {
+ } else {
displayUsage(cmd);
return exitcode;
- }
+ }
} else {
displayUsage(cmd);
return exitcode;
}
+
JobConf conf = new JobConf(getConf());
init(conf);
if (displayQueueList) {
displayQueueList();
exitcode = 0;
- } else if (displayQueueInfoWithoutJobs){
- displayQueueInfo(argv[1],false);
+ } else if (displayQueueInfoWithoutJobs) {
+ displayQueueInfo(argv[1], false);
exitcode = 0;
} else if (displayQueueInfoWithJobs) {
- displayQueueInfo(argv[1],true);
+ displayQueueInfo(argv[1], true);
exitcode = 0;
- }else if (displayQueueAclsInfoForCurrentUser) {
+ } else if (displayQueueAclsInfoForCurrentUser) {
this.displayQueueAclsInfoForCurrentUser();
exitcode = 0;
}
-
return exitcode;
}
-
- /**
- * Method used to display information pertaining to a Single JobQueue
- * registered with the {@link QueueManager}. Display of the Jobs is
- * determine by the boolean
- *
- * @throws IOException
- */
- private void displayQueueInfo(String queue, boolean showJobs) throws IOException {
- JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue);
+ // format and print information about the passed in job queue.
+ void printJobQueueInfo(JobQueueInfo jobQueueInfo, Writer writer)
+ throws IOException {
if (jobQueueInfo == null) {
- System.out.printf("Queue Name : %s has no scheduling information \n", queue);
- } else {
- printJobQueueInfo(jobQueueInfo);
+ writer.write("No queue found.\n");
+ writer.flush();
+ return;
+ }
+ writer.write(String.format("Queue Name : %s \n",
+ jobQueueInfo.getQueueName()));
+ writer.write(String.format("Queue State : %s \n",
+ jobQueueInfo.getQueueState()));
+ writer.write(String.format("Scheduling Info : %s \n",
+ jobQueueInfo.getSchedulingInfo()));
+ List<JobQueueInfo> childQueues = jobQueueInfo.getChildren();
+ if (childQueues != null && childQueues.size() > 0) {
+ writer.write(String.format("Child Queues : "));
+ for (int i = 0; i < childQueues.size(); i++) {
+ JobQueueInfo childQueue = childQueues.get(i);
+ writer.write(String.format("%s", childQueue.getQueueName()));
+ if (i != childQueues.size() - 1) {
+ writer.write(String.format(", "));
+ }
+ }
+ writer.write("\n");
}
- if (showJobs) {
- System.out.printf("Job List\n");
- JobStatus[] jobs = jc.getJobsFromQueue(queue);
- if (jobs == null)
- jobs = new JobStatus[0];
- jc.displayJobList(jobs);
+ writer.write(String.format("======================\n"));
+ writer.flush();
+ }
+
+ private void displayQueueList() throws IOException {
+ JobQueueInfo[] rootQueues = jc.getRootQueues();
+ List<JobQueueInfo> allQueues = expandQueueList(rootQueues);
+ for (JobQueueInfo queue : allQueues) {
+ printJobQueueInfo(queue, new PrintWriter(System.out));
}
}
-
- // format and print information about the passed in job queue.
- private void printJobQueueInfo(JobQueueInfo jobQueueInfo) {
- System.out.printf("Queue Name : %s \n", jobQueueInfo.getQueueName());
- System.out.printf("Queue State : %s \n", jobQueueInfo.getQueueState());
- System.out.printf("Scheduling Info : %s \n",jobQueueInfo.getSchedulingInfo());
+
+ /**
+ * Expands the hierarchy of queues and gives the list of all queues in
+ * depth-first order
+ * @param rootQueues the top-level queues
+ * @return the list of all the queues in depth-first order.
+ */
+ List<JobQueueInfo> expandQueueList(JobQueueInfo[] rootQueues) {
+ List<JobQueueInfo> allQueues = new ArrayList<JobQueueInfo>();
+ for (JobQueueInfo queue : rootQueues) {
+ allQueues.add(queue);
+ if (queue.getChildren() != null) {
+ JobQueueInfo[] childQueues
+ = queue.getChildren().toArray(new JobQueueInfo[0]);
+ allQueues.addAll(expandQueueList(childQueues));
+ }
+ }
+ return allQueues;
}
-
+
/**
- * Method used to display the list of the JobQueues registered
- * with the {@link QueueManager}
+ * Method used to display information pertaining to a Single JobQueue
+ * registered with the {@link QueueManager}. Display of the Jobs is determine
+ * by the boolean
*
* @throws IOException
*/
- private void displayQueueList() throws IOException {
- JobQueueInfo[] queues = jc.getQueues();
- for (JobQueueInfo queue : queues) {
- String schedInfo = queue.getSchedulingInfo();
- if(schedInfo.trim().equals("")){
- schedInfo = "N/A";
- }
- printJobQueueInfo(queue);
+ private void displayQueueInfo(String queue, boolean showJobs)
+ throws IOException {
+ JobQueueInfo jobQueueInfo = jc.getQueueInfo(queue);
+ printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
+ if (showJobs && (jobQueueInfo.getChildren() == null ||
+ jobQueueInfo.getChildren().size() == 0)) {
+ JobStatus[] jobs = jc.getJobsFromQueue(queue);
+ if (jobs == null)
+ jobs = new JobStatus[0];
+ jc.displayJobList(jobs);
}
}
-
+
private void displayQueueAclsInfoForCurrentUser() throws IOException {
QueueAclsInfo[] queueAclsInfoList = jc.getQueueAclsForCurrentUser();
UserGroupInformation ugi = UserGroupInformation.readFrom(getConf());
if (queueAclsInfoList.length > 0) {
- System.out.println("Queue acls for user : "
- + ugi.getUserName());
+ System.out.println("Queue acls for user : " + ugi.getUserName());
System.out.println("\nQueue Operations");
System.out.println("=====================");
for (QueueAclsInfo queueInfo : queueAclsInfoList) {
@@ -172,17 +205,16 @@
System.out.println();
}
} else {
- System.out.println("User " +
- ugi.getUserName() +
- " does not have access to any queue. \n");
+ System.out.println("User " + ugi.getUserName()
+ + " does not have access to any queue. \n");
}
}
-
+
private void displayUsage(String cmd) {
String prefix = "Usage: JobQueueClient ";
- if ("-queueinfo".equals(cmd)){
+ if ("-queueinfo".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "<job-queue-name> [-showJobs]]");
- }else {
+ } else {
System.err.printf(prefix + "<command> <args>\n");
System.err.printf("\t[-list]\n");
System.err.printf("\t[-info <job-queue-name> [-showJobs]]\n");
@@ -195,5 +227,5 @@
int res = ToolRunner.run(new JobQueueClient(), argv);
System.exit(res);
}
-
+
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobQueueInfo.java Sat Nov 28 20:26:01 2009
@@ -17,35 +17,29 @@
*/
package org.apache.hadoop.mapred;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.QueueState;
/**
* Class that contains the information regarding the Job Queues which are
* maintained by the Hadoop Map/Reduce framework.
- *
+ * @deprecated Use {@link QueueInfo} instead
*/
+@Deprecated
+public class JobQueueInfo extends QueueInfo {
-public class JobQueueInfo implements Writable {
-
- private String queueName = "";
- //The scheduling Information object is read back as String.
- //Once the scheduling information is set there is no way to recover it.
- private String schedulingInfo;
-
- private String queueState;
-
/**
* Default constructor for Job Queue Info.
*
*/
public JobQueueInfo() {
-
+ super();
}
+
/**
* Construct a new JobQueueInfo object using the queue name and the
* scheduling information passed.
@@ -55,29 +49,24 @@
* queue
*/
public JobQueueInfo(String queueName, String schedulingInfo) {
- this.queueName = queueName;
- this.schedulingInfo = schedulingInfo;
- // make it running by default.
- this.queueState = Queue.QueueState.RUNNING.getStateName();
+ super(queueName, schedulingInfo);
}
+ JobQueueInfo(QueueInfo queue) {
+ this(queue.getQueueName(), queue.getSchedulingInfo());
+ setQueueState(queue.getState().getStateName());
+ setQueueChildren(queue.getQueueChildren());
+ setProperties(queue.getProperties());
+ setJobStatuses(queue.getJobStatuses());
+ }
/**
* Set the queue name of the JobQueueInfo
*
* @param queueName Name of the job queue.
*/
- public void setQueueName(String queueName) {
- this.queueName = queueName;
- }
-
- /**
- * Get the queue name from JobQueueInfo
- *
- * @return queue name
- */
- public String getQueueName() {
- return queueName;
+ protected void setQueueName(String queueName) {
+ super.setQueueName(queueName);
}
/**
@@ -85,55 +74,73 @@
*
* @param schedulingInfo
*/
- public void setSchedulingInfo(String schedulingInfo) {
- this.schedulingInfo = schedulingInfo;
+ protected void setSchedulingInfo(String schedulingInfo) {
+ super.setSchedulingInfo(schedulingInfo);
}
/**
- * Gets the scheduling information associated to particular job queue.
- * If nothing is set would return <b>"N/A"</b>
- *
- * @return Scheduling information associated to particular Job Queue
- */
- public String getSchedulingInfo() {
- if(schedulingInfo != null) {
- return schedulingInfo;
- }else {
- return "N/A";
- }
- }
-
- /**
* Set the state of the queue
* @param state state of the queue.
*/
- public void setQueueState(String state) {
- queueState = state;
+ protected void setQueueState(String state) {
+ super.setState(QueueState.getState(state));
}
- /**
- * Return the queue state
- * @return the queue state.
- */
- public String getQueueState() {
- return queueState;
+ String getQueueState() {
+ return super.getState().toString();
}
- @Override
- public void readFields(DataInput in) throws IOException {
- queueName = Text.readString(in);
- queueState = Text.readString(in);
- schedulingInfo = Text.readString(in);
+ protected void setChildren(List<JobQueueInfo> children) {
+ List<QueueInfo> list = new ArrayList<QueueInfo>();
+ for (JobQueueInfo q : children) {
+ list.add(q);
+ }
+ super.setQueueChildren(list);
}
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, queueName);
- Text.writeString(out, queueState);
- if(schedulingInfo!= null) {
- Text.writeString(out, schedulingInfo);
- }else {
- Text.writeString(out, "N/A");
+ public List<JobQueueInfo> getChildren() {
+ List<JobQueueInfo> list = new ArrayList<JobQueueInfo>();
+ for (QueueInfo q : super.getQueueChildren()) {
+ list.add((JobQueueInfo)q);
}
+ return list;
+ }
+
+ protected void setProperties(Properties props) {
+ super.setProperties(props);
+ }
+
+ /**
+ * Add a child {@link JobQueueInfo} to this {@link JobQueueInfo}. Modify the
+ * fully-qualified name of the child {@link JobQueueInfo} to reflect the
+ * hierarchy.
+ *
+ * Only for testing.
+ *
+ * @param child
+ */
+ void addChild(JobQueueInfo child) {
+ List<JobQueueInfo> children = getChildren();
+ children.add(child);
+ setChildren(children);
}
+
+ /**
+ * Remove the child from this {@link JobQueueInfo}. This also resets the
+ * queue-name of the child from a fully-qualified name to a simple queue name.
+ *
+ * Only for testing.
+ *
+ * @param child
+ */
+ void removeChild(JobQueueInfo child) {
+ List<JobQueueInfo> children = getChildren();
+ children.remove(child);
+ setChildren(children);
+ }
+
+ protected void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
+ super.setJobStatuses(stats);
+ }
+
}