You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [22/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Sat Nov 28 20:26:01 2009
@@ -105,7 +105,7 @@
}
@Override
- void addFetchFailedMap(TaskAttemptID mapTaskId) {
+ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
failedFetchTasks.add(mapTaskId);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Reducer.java Sat Nov 28 20:26:01 2009
@@ -109,7 +109,7 @@
* private int noKeys = 0;
*
* public void configure(JobConf job) {
- * reduceTaskId = job.get("mapred.task.id");
+ * reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
* }
*
* public void reduce(K key, Iterator<V> values,
@@ -185,8 +185,8 @@
* takes an insignificant amount of time to process individual key/value
* pairs, this is crucial since the framework might assume that the task has
* timed-out and kill that task. The other way of avoiding this is to set
- * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
- * mapred.task.timeout</a> to a high-enough value (or even zero for no
+ * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
+ * mapreduce.task.timeout</a> to a high-enough value (or even zero for no
* time-outs).</p>
*
* @param key the key.
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ResourceEstimator.java Sat Nov 28 20:26:01 2009
@@ -56,9 +56,11 @@
completedMapsInputSize+=(tip.getMapInputSize()+1);
completedMapsOutputSize+=ts.getOutputSize();
- LOG.info("completedMapsUpdates:"+completedMapsUpdates+" "+
- "completedMapsInputSize:"+completedMapsInputSize+" " +
- "completedMapsOutputSize:"+completedMapsOutputSize);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("completedMapsUpdates:"+completedMapsUpdates+" "+
+ "completedMapsInputSize:"+completedMapsInputSize+" " +
+ "completedMapsOutputSize:"+completedMapsOutputSize);
+ }
}
}
@@ -73,7 +75,9 @@
//add desiredMaps() so that randomwriter case doesn't blow up
long estimate = Math.round((inputSize *
completedMapsOutputSize * 2.0)/completedMapsInputSize);
- LOG.debug("estimate total map output will be " + estimate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("estimate total map output will be " + estimate);
+ }
return estimate;
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/RunningJob.java Sat Nov 28 20:26:01 2009
@@ -30,7 +30,9 @@
* progress etc.</p>
*
* @see JobClient
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.Job} instead
*/
+@Deprecated
public interface RunningJob {
/**
* Get the job identifier.
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -61,7 +61,8 @@
*/
static public void setSequenceFileOutputKeyClass(JobConf conf,
Class<?> theClass) {
- conf.setClass("mapred.seqbinary.output.key.class", theClass, Object.class);
+ conf.setClass(org.apache.hadoop.mapreduce.lib.output.
+ SequenceFileAsBinaryOutputFormat.KEY_CLASS, theClass, Object.class);
}
/**
@@ -74,8 +75,8 @@
*/
static public void setSequenceFileOutputValueClass(JobConf conf,
Class<?> theClass) {
- conf.setClass("mapred.seqbinary.output.value.class",
- theClass, Object.class);
+ conf.setClass(org.apache.hadoop.mapreduce.lib.output.
+ SequenceFileAsBinaryOutputFormat.VALUE_CLASS, theClass, Object.class);
}
/**
@@ -84,9 +85,10 @@
* @return the key class of the {@link SequenceFile}
*/
static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) {
- return conf.getClass("mapred.seqbinary.output.key.class",
- conf.getOutputKeyClass().asSubclass(WritableComparable.class),
- WritableComparable.class);
+ return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
+ SequenceFileAsBinaryOutputFormat.KEY_CLASS,
+ conf.getOutputKeyClass().asSubclass(WritableComparable.class),
+ WritableComparable.class);
}
/**
@@ -95,9 +97,9 @@
* @return the value class of the {@link SequenceFile}
*/
static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) {
- return conf.getClass("mapred.seqbinary.output.value.class",
- conf.getOutputValueClass().asSubclass(Writable.class),
- Writable.class);
+ return conf.getClass(org.apache.hadoop.mapreduce.lib.output.
+ SequenceFileAsBinaryOutputFormat.VALUE_CLASS,
+ conf.getOutputValueClass().asSubclass(Writable.class), Writable.class);
}
@Override
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -102,8 +102,8 @@
* defaulting to {@link CompressionType#RECORD}
*/
public static CompressionType getOutputCompressionType(JobConf conf) {
- String val = conf.get("mapred.output.compression.type",
- CompressionType.RECORD.toString());
+ String val = conf.get(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputFormat.COMPRESS_TYPE, CompressionType.RECORD.toString());
return CompressionType.valueOf(val);
}
@@ -116,7 +116,8 @@
public static void setOutputCompressionType(JobConf conf,
CompressionType style) {
setCompressOutput(conf, true);
- conf.set("mapred.output.compression.type", style.toString());
+ conf.set(org.apache.hadoop.mapreduce.lib.output.
+ FileOutputFormat.COMPRESS_TYPE, style.toString());
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/SkipBadRecords.java Sat Nov 28 20:26:01 2009
@@ -72,16 +72,16 @@
"ReduceProcessedGroups";
private static final String ATTEMPTS_TO_START_SKIPPING =
- "mapred.skip.attempts.to.start.skipping";
+ JobContext.SKIP_START_ATTEMPTS;
private static final String AUTO_INCR_MAP_PROC_COUNT =
- "mapred.skip.map.auto.incr.proc.count";
+ JobContext.MAP_SKIP_INCR_PROC_COUNT;
private static final String AUTO_INCR_REDUCE_PROC_COUNT =
- "mapred.skip.reduce.auto.incr.proc.count";
- private static final String OUT_PATH = "mapred.skip.out.dir";
+ JobContext.REDUCE_SKIP_INCR_PROC_COUNT;
+ private static final String OUT_PATH = JobContext.SKIP_OUTDIR;
private static final String MAPPER_MAX_SKIP_RECORDS =
- "mapred.skip.map.max.skip.records";
+ JobContext.MAP_SKIP_MAX_RECORDS;
private static final String REDUCER_MAX_SKIP_GROUPS =
- "mapred.skip.reduce.max.skip.groups";
+ JobContext.REDUCE_SKIP_MAXGROUPS;
/**
* Get the number of Task attempts AFTER which skip mode
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java Sat Nov 28 20:26:01 2009
@@ -21,8 +21,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
@@ -42,15 +40,24 @@
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
/**
* Base class for tasks.
@@ -61,6 +68,30 @@
private static final Log LOG =
LogFactory.getLog(Task.class);
+ // Counters used by Task subclasses
+ protected static enum Counter {
+ MAP_INPUT_RECORDS,
+ MAP_OUTPUT_RECORDS,
+ MAP_SKIPPED_RECORDS,
+ MAP_INPUT_BYTES,
+ MAP_OUTPUT_BYTES,
+ COMBINE_INPUT_RECORDS,
+ COMBINE_OUTPUT_RECORDS,
+ REDUCE_INPUT_GROUPS,
+ REDUCE_SHUFFLE_BYTES,
+ REDUCE_INPUT_RECORDS,
+ REDUCE_OUTPUT_RECORDS,
+ REDUCE_SKIPPED_GROUPS,
+ REDUCE_SKIPPED_RECORDS,
+ SPILLED_RECORDS,
+ FAILED_SHUFFLE,
+ SHUFFLED_MAPS,
+ MERGED_MAP_OUTPUTS,
+ }
+
+ public static String MERGED_OUTPUT_PREFIX = ".merged";
+
+
/**
* Counters to measure the usage of the different file systems.
* Always return the String array with two elements. First one is the name of
@@ -97,9 +128,11 @@
////////////////////////////////////////////
private String jobFile; // job configuration file
+ private String user; // user running the job
private TaskAttemptID taskId; // unique, includes job id
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
+ protected JobStatus.State jobRunStateForCleanup;
protected boolean jobCleanup = false;
protected boolean jobSetup = false;
protected boolean taskCleanup = false;
@@ -123,7 +156,11 @@
protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
protected org.apache.hadoop.mapreduce.OutputCommitter committer;
protected final Counters.Counter spilledRecordsCounter;
+ protected final Counters.Counter failedShuffleCounter;
+ protected final Counters.Counter mergedMapOutputsCounter;
private int numSlotsRequired;
+ protected TaskUmbilicalProtocol umbilical;
+ protected JobTokens jobTokens=null; // storage of the secret keys
////////////////////////////////////////////
// Constructors
@@ -133,6 +170,8 @@
taskStatus = TaskStatus.createTaskStatus(isMapTask());
taskId = new TaskAttemptID();
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+ failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
+ mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
}
public Task(String jobFile, TaskAttemptID taskId, int partition,
@@ -151,6 +190,8 @@
TaskStatus.Phase.SHUFFLE,
counters);
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+ failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE);
+ mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS);
}
////////////////////////////////////////////
@@ -172,6 +213,23 @@
public JobID getJobID() {
return taskId.getJobID();
}
+
+ /**
+ * set JobToken storage
+ * @param jt
+ */
+ public void setJobTokens(JobTokens jt) {
+ this.jobTokens = jt;
+ }
+
+ /**
+ * get JobToken storage
+ * @return storage object
+ */
+ public JobTokens getJobTokens() {
+ return this.jobTokens;
+ }
+
/**
* Get the index of this task within the job.
@@ -211,6 +269,24 @@
}
/**
+ * Report a fatal error to the parent (task) tracker.
+ */
+ protected void reportFatalError(TaskAttemptID id, Throwable throwable,
+ String logMsg) {
+ LOG.fatal(logMsg);
+ Throwable tCause = throwable.getCause();
+ String cause = tCause == null
+ ? StringUtils.stringifyException(throwable)
+ : StringUtils.stringifyException(tCause);
+ try {
+ umbilical.fatalError(id, cause);
+ } catch (IOException ioe) {
+ LOG.fatal("Failed to contact the tasktracker", ioe);
+ System.exit(-1);
+ }
+ }
+
+ /**
* Get skipRanges.
*/
public SortedRanges getSkipRanges() {
@@ -268,6 +344,14 @@
return jobCleanup;
}
+ boolean isJobAbortTask() {
+ // the task is an abort task if its marked for cleanup and the final
+ // expected state is either failed or killed.
+ return isJobCleanupTask()
+ && (jobRunStateForCleanup == JobStatus.State.KILLED
+ || jobRunStateForCleanup == JobStatus.State.FAILED);
+ }
+
boolean isJobSetupTask() {
return jobSetup;
}
@@ -280,10 +364,29 @@
jobCleanup = true;
}
+ /**
+ * Sets the task to do job abort in the cleanup.
+ * @param status the final runstate of the job.
+ */
+ void setJobCleanupTaskState(JobStatus.State status) {
+ jobRunStateForCleanup = status;
+ }
+
boolean isMapOrReduce() {
return !jobSetup && !jobCleanup && !taskCleanup;
}
-
+
+ /**
+ * Get the name of the user running the job/task. TaskTracker needs task's
+ * user name even before it's JobConf is localized. So we explicitly serialize
+ * the user name.
+ *
+ * @return user
+ */
+ String getUser() {
+ return user;
+ }
+
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -297,9 +400,13 @@
skipRanges.write(out);
out.writeBoolean(skipping);
out.writeBoolean(jobCleanup);
+ if (jobCleanup) {
+ WritableUtils.writeEnum(out, jobRunStateForCleanup);
+ }
out.writeBoolean(jobSetup);
out.writeBoolean(writeSkipRecs);
- out.writeBoolean(taskCleanup);
+ out.writeBoolean(taskCleanup);
+ Text.writeString(out, user);
}
public void readFields(DataInput in) throws IOException {
@@ -313,12 +420,17 @@
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
jobCleanup = in.readBoolean();
+ if (jobCleanup) {
+ jobRunStateForCleanup =
+ WritableUtils.readEnum(in, JobStatus.State.class);
+ }
jobSetup = in.readBoolean();
writeSkipRecs = in.readBoolean();
taskCleanup = in.readBoolean();
if (taskCleanup) {
setPhase(TaskStatus.Phase.CLEANUP);
}
+ user = Text.readString(in);
}
@Override
@@ -328,11 +440,11 @@
* Localize the given JobConf to be specific for this task.
*/
public void localizeConfiguration(JobConf conf) throws IOException {
- conf.set("mapred.tip.id", taskId.getTaskID().toString());
- conf.set("mapred.task.id", taskId.toString());
- conf.setBoolean("mapred.task.is.map", isMapTask());
- conf.setInt("mapred.task.partition", partition);
- conf.set("mapred.job.id", taskId.getJobID().toString());
+ conf.set(JobContext.TASK_ID, taskId.getTaskID().toString());
+ conf.set(JobContext.TASK_ATTEMPT_ID, taskId.toString());
+ conf.setBoolean(JobContext.TASK_ISMAP, isMapTask());
+ conf.setInt(JobContext.TASK_PARTITION, partition);
+ conf.set(JobContext.ID, taskId.getJobID().toString());
}
/** Run this task as a part of the named job. This method is executed in the
@@ -368,8 +480,8 @@
boolean useNewApi) throws IOException,
ClassNotFoundException,
InterruptedException {
- jobContext = new JobContext(job, id, reporter);
- taskContext = new TaskAttemptContext(job, taskId, reporter);
+ jobContext = new JobContextImpl(job, id, reporter);
+ taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
if (getState() == TaskStatus.State.UNASSIGNED) {
setState(TaskStatus.State.RUNNING);
}
@@ -674,7 +786,12 @@
sendDone(umbilical);
}
- protected void statusUpdate(TaskUmbilicalProtocol umbilical)
+ /**
+ * Send a status update to the task tracker
+ * @param umbilical
+ * @throws IOException
+ */
+ public void statusUpdate(TaskUmbilicalProtocol umbilical)
throws IOException {
int retries = MAX_RETRIES;
while (true) {
@@ -800,7 +917,27 @@
getProgress().setStatus("cleanup");
statusUpdate(umbilical);
// do the cleanup
- committer.cleanupJob(jobContext);
+ LOG.info("Cleaning up job");
+ if (jobRunStateForCleanup == JobStatus.State.FAILED
+ || jobRunStateForCleanup == JobStatus.State.KILLED) {
+ LOG.info("Aborting job with runstate : " + jobRunStateForCleanup.name());
+ if (conf.getUseNewMapper()) {
+ committer.abortJob(jobContext, jobRunStateForCleanup);
+ } else {
+ org.apache.hadoop.mapred.OutputCommitter oldCommitter =
+ (org.apache.hadoop.mapred.OutputCommitter)committer;
+ oldCommitter.abortJob(jobContext, jobRunStateForCleanup);
+ }
+ } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){
+ LOG.info("Committing job");
+ committer.commitJob(jobContext);
+ } else {
+ throw new IOException("Invalid state of the job for cleanup. State found "
+ + jobRunStateForCleanup + " expecting "
+ + JobStatus.State.SUCCEEDED + ", "
+ + JobStatus.State.FAILED + " or "
+ + JobStatus.State.KILLED);
+ }
done(umbilical, reporter);
}
@@ -820,11 +957,11 @@
this.conf = new JobConf(conf);
}
this.mapOutputFile.setConf(this.conf);
- this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ this.lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
// add the static resolutions (this is required for the junit to
// work on testcases that simulate multiple nodes on a single physical
// node.
- String hostToResolved[] = conf.getStrings("hadoop.net.static.resolutions");
+ String hostToResolved[] = conf.getStrings(TTConfig.TT_STATIC_RESOLUTIONS);
if (hostToResolved != null) {
for (String str : hostToResolved) {
String name = str.substring(0, str.indexOf('='));
@@ -832,6 +969,7 @@
NetUtils.addStaticResolution(name, resolvedName);
}
}
+ this.user = this.conf.getUser();
}
public Configuration getConf() {
@@ -841,7 +979,7 @@
/**
* OutputCollector for the combiner.
*/
- protected static class CombineOutputCollector<K extends Object, V extends Object>
+ public static class CombineOutputCollector<K extends Object, V extends Object>
implements OutputCollector<K, V> {
private Writer<K, V> writer;
private Counters.Counter outCounter;
@@ -919,7 +1057,7 @@
/// Auxiliary methods
/** Start processing next unique key. */
- void nextKey() throws IOException {
+ public void nextKey() throws IOException {
// read until we find a new key
while (hasNext) {
readNextKey();
@@ -934,12 +1072,12 @@
}
/** True iff more keys remain. */
- boolean more() {
+ public boolean more() {
return more;
}
/** The current key. */
- KEY getKey() {
+ public KEY getKey() {
return key;
}
@@ -969,7 +1107,8 @@
}
}
- protected static class CombineValuesIterator<KEY,VALUE>
+ /** Iterator to return Combined values */
+ public static class CombineValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
private final Counters.Counter combineInputCounter;
@@ -988,28 +1127,6 @@
}
}
- private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context>
- contextConstructor;
- static {
- try {
- contextConstructor =
- org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
- (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
- Configuration.class,
- org.apache.hadoop.mapreduce.TaskAttemptID.class,
- RawKeyValueIterator.class,
- org.apache.hadoop.mapreduce.Counter.class,
- org.apache.hadoop.mapreduce.RecordWriter.class,
- org.apache.hadoop.mapreduce.OutputCommitter.class,
- org.apache.hadoop.mapreduce.StatusReporter.class,
- RawComparator.class,
- Class.class,
- Class.class});
- } catch (NoSuchMethodException nme) {
- throw new IllegalArgumentException("Can't find constructor");
- }
- }
-
@SuppressWarnings("unchecked")
protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
@@ -1018,26 +1135,33 @@
Configuration job,
org.apache.hadoop.mapreduce.TaskAttemptID taskId,
RawKeyValueIterator rIter,
- org.apache.hadoop.mapreduce.Counter inputCounter,
+ org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+ org.apache.hadoop.mapreduce.Counter inputValueCounter,
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
org.apache.hadoop.mapreduce.OutputCommitter committer,
org.apache.hadoop.mapreduce.StatusReporter reporter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass, Class<INVALUE> valueClass
- ) throws IOException, ClassNotFoundException {
- try {
+ ) throws IOException, InterruptedException {
+ org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
+ reduceContext =
+ new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId,
+ rIter,
+ inputKeyCounter,
+ inputValueCounter,
+ output,
+ committer,
+ reporter,
+ comparator,
+ keyClass,
+ valueClass);
+
+ org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+ reducerContext =
+ new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
+ reduceContext);
- return contextConstructor.newInstance(reducer, job, taskId,
- rIter, inputCounter, output,
- committer, reporter, comparator,
- keyClass, valueClass);
- } catch (InstantiationException e) {
- throw new IOException("Can't create Context", e);
- } catch (InvocationTargetException e) {
- throw new IOException("Can't invoke Context constructor", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Can't invoke Context constructor", e);
- }
+ return reducerContext;
}
protected static abstract class CombinerRunner<K,V> {
@@ -1079,7 +1203,7 @@
}
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
- new org.apache.hadoop.mapreduce.TaskAttemptContext(job, taskId);
+ new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId);
Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =
(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
taskContext.getCombinerClass();
@@ -1188,13 +1312,12 @@
ReflectionUtils.newInstance(reducerClass, job);
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, taskId,
- iterator, inputCounter,
+ iterator, null, inputCounter,
new OutputConverter(collector),
committer,
reporter, comparator, keyClass,
valueClass);
reducer.run(reducerContext);
- }
-
+ }
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskAttemptContext.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -24,39 +25,12 @@
* instead.
*/
@Deprecated
-public class TaskAttemptContext
+public interface TaskAttemptContext
extends org.apache.hadoop.mapreduce.TaskAttemptContext {
- private Progressable progress;
- TaskAttemptContext(JobConf conf, TaskAttemptID taskid) {
- this(conf, taskid, Reporter.NULL);
- }
-
- TaskAttemptContext(JobConf conf, TaskAttemptID taskid,
- Progressable progress) {
- super(conf, taskid);
- this.progress = progress;
- }
-
- /**
- * Get the taskAttemptID.
- *
- * @return TaskAttemptID
- */
- public TaskAttemptID getTaskAttemptID() {
- return (TaskAttemptID) super.getTaskAttemptID();
- }
-
- public Progressable getProgressible() {
- return progress;
- }
-
- public JobConf getJobConf() {
- return (JobConf) getConfiguration();
- }
+ public TaskAttemptID getTaskAttemptID();
- @Override
- public void progress() {
- progress.progress();
- }
+ public Progressable getProgressible();
+
+ public JobConf getJobConf();
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Sat Nov 28 20:26:01 2009
@@ -18,35 +18,25 @@
package org.apache.hadoop.mapred;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
/**
* This is used to track task completion events on
* job tracker.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.TaskCompletionEvent} instead
*/
-public class TaskCompletionEvent implements Writable{
+@Deprecated
+public class TaskCompletionEvent
+ extends org.apache.hadoop.mapreduce.TaskCompletionEvent {
static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
-
- private int eventId;
- private String taskTrackerHttp;
- private int taskRunTime; // using int since runtime is the time difference
- private TaskAttemptID taskId;
- Status status;
- boolean isMap = false;
- private int idWithinJob;
+
public static final TaskCompletionEvent[] EMPTY_ARRAY =
- new TaskCompletionEvent[0];
+ new TaskCompletionEvent[0];
/**
* Default constructor for Writable.
*
*/
- public TaskCompletionEvent(){
- taskId = new TaskAttemptID();
+ public TaskCompletionEvent() {
+ super();
}
/**
@@ -64,20 +54,16 @@
boolean isMap,
Status status,
String taskTrackerHttp){
-
- this.taskId = taskId;
- this.idWithinJob = idWithinJob;
- this.isMap = isMap;
- this.eventId = eventId;
- this.status =status;
- this.taskTrackerHttp = taskTrackerHttp;
+ super(eventId, taskId, idWithinJob, isMap, org.apache.hadoop.mapreduce.
+ TaskCompletionEvent.Status.valueOf(status.name()), taskTrackerHttp);
}
- /**
- * Returns event Id.
- * @return event id
- */
- public int getEventId() {
- return eventId;
+
+ static TaskCompletionEvent downgrade(
+ org.apache.hadoop.mapreduce.TaskCompletionEvent event) {
+ return new TaskCompletionEvent(event.getEventId(),
+ TaskAttemptID.downgrade(event.getTaskAttemptId()),event.idWithinJob(),
+ event.isMapTask(), Status.valueOf(event.getStatus().name()),
+ event.getTaskTrackerHttp());
}
/**
* Returns task id.
@@ -86,7 +72,7 @@
*/
@Deprecated
public String getTaskId() {
- return taskId.toString();
+ return getTaskAttemptId().toString();
}
/**
@@ -94,7 +80,7 @@
* @return task id
*/
public TaskAttemptID getTaskAttemptId() {
- return taskId;
+ return TaskAttemptID.downgrade(super.getTaskAttemptId());
}
/**
@@ -102,133 +88,57 @@
* @return task tracker status
*/
public Status getTaskStatus() {
- return status;
- }
- /**
- * http location of the tasktracker where this task ran.
- * @return http location of tasktracker user logs
- */
- public String getTaskTrackerHttp() {
- return taskTrackerHttp;
- }
-
- /**
- * Returns time (in millisec) the task took to complete.
- */
- public int getTaskRunTime() {
- return taskRunTime;
- }
-
- /**
- * Set the task completion time
- * @param taskCompletionTime time (in millisec) the task took to complete
- */
- public void setTaskRunTime(int taskCompletionTime) {
- this.taskRunTime = taskCompletionTime;
- }
-
- /**
- * set event Id. should be assigned incrementally starting from 0.
- * @param eventId
- */
- public void setEventId(
- int eventId) {
- this.eventId = eventId;
+ return Status.valueOf(super.getStatus().name());
}
+
/**
* Sets task id.
* @param taskId
- * @deprecated use {@link #setTaskID(TaskAttemptID)} instead.
+ * @deprecated use {@link #setTaskAttemptId(TaskAttemptID)} instead.
*/
@Deprecated
public void setTaskId(String taskId) {
- this.taskId = TaskAttemptID.forName(taskId);
+ this.setTaskAttemptId(TaskAttemptID.forName(taskId));
}
/**
* Sets task id.
* @param taskId
*/
- public void setTaskID(TaskAttemptID taskId) {
- this.taskId = taskId;
+ protected void setTaskAttemptId(TaskAttemptID taskId) {
+ super.setTaskAttemptId(taskId);
}
/**
* Set task status.
* @param status
*/
- public void setTaskStatus(
- Status status) {
- this.status = status;
+ protected void setTaskStatus(Status status) {
+ super.setTaskStatus(org.apache.hadoop.mapreduce.
+ TaskCompletionEvent.Status.valueOf(status.name()));
}
+
/**
- * Set task tracker http location.
- * @param taskTrackerHttp
+ * Set the task completion time
+ * @param taskCompletionTime time (in millisec) the task took to complete
*/
- public void setTaskTrackerHttp(
- String taskTrackerHttp) {
- this.taskTrackerHttp = taskTrackerHttp;
- }
-
- @Override
- public String toString(){
- StringBuffer buf = new StringBuffer();
- buf.append("Task Id : ");
- buf.append(taskId);
- buf.append(", Status : ");
- buf.append(status.name());
- return buf.toString();
- }
-
- @Override
- public boolean equals(Object o) {
- if(o == null)
- return false;
- if(o.getClass().equals(this.getClass())) {
- TaskCompletionEvent event = (TaskCompletionEvent) o;
- return this.isMap == event.isMapTask()
- && this.eventId == event.getEventId()
- && this.idWithinJob == event.idWithinJob()
- && this.status.equals(event.getTaskStatus())
- && this.taskId.equals(event.getTaskAttemptId())
- && this.taskRunTime == event.getTaskRunTime()
- && this.taskTrackerHttp.equals(event.getTaskTrackerHttp());
- }
- return false;
+ protected void setTaskRunTime(int taskCompletionTime) {
+ super.setTaskRunTime(taskCompletionTime);
}
- @Override
- public int hashCode() {
- return toString().hashCode();
+ /**
+ * set event Id. should be assigned incrementally starting from 0.
+ * @param eventId
+ */
+ protected void setEventId(int eventId) {
+ super.setEventId(eventId);
}
- public boolean isMapTask() {
- return isMap;
- }
-
- public int idWithinJob() {
- return idWithinJob;
- }
- //////////////////////////////////////////////
- // Writable
- //////////////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- taskId.write(out);
- WritableUtils.writeVInt(out, idWithinJob);
- out.writeBoolean(isMap);
- WritableUtils.writeEnum(out, status);
- WritableUtils.writeString(out, taskTrackerHttp);
- WritableUtils.writeVInt(out, taskRunTime);
- WritableUtils.writeVInt(out, eventId);
- }
-
- public void readFields(DataInput in) throws IOException {
- taskId.readFields(in);
- idWithinJob = WritableUtils.readVInt(in);
- isMap = in.readBoolean();
- status = WritableUtils.readEnum(in, Status.class);
- taskTrackerHttp = WritableUtils.readString(in);
- taskRunTime = WritableUtils.readVInt(in);
- eventId = WritableUtils.readVInt(in);
+ /**
+ * Set task tracker http location.
+ * @param taskTrackerHttp
+ */
+ protected void setTaskTrackerHttp(String taskTrackerHttp) {
+ super.setTaskTrackerHttp(taskTrackerHttp);
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java Sat Nov 28 20:26:01 2009
@@ -14,22 +14,23 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
-import java.util.Map;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.classification.InterfaceAudience;
/**
* Controls initialization, finalization and clean up of tasks, and
@@ -38,9 +39,12 @@
* This class defines the API for initializing, finalizing and cleaning
* up of tasks, as also the launching and killing task JVMs.
* Subclasses of this class will implement the logic required for
- * performing the actual actions.
+ * performing the actual actions.
+ *
+ * <br/>
*/
-abstract class TaskController implements Configurable {
+@InterfaceAudience.Private
+public abstract class TaskController implements Configurable {
private Configuration conf;
@@ -50,21 +54,21 @@
return conf;
}
- // The list of directory paths specified in the variable mapred.local.dir.
+ // The list of directory paths specified in the variable Configs.LOCAL_DIR
// This is used to determine which among the list of directories is picked up
// for storing data for a particular task.
protected String[] mapredLocalDirs;
public void setConf(Configuration conf) {
this.conf = conf;
- mapredLocalDirs = conf.getStrings("mapred.local.dir");
+ mapredLocalDirs = conf.getStrings(MRConfig.LOCAL_DIR);
}
/**
* Sets up the permissions of the following directories on all the configured
* disks:
* <ul>
- * <li>mapred-local directories</li>
+ * <li>mapreduce.cluster.local.directories</li>
* <li>Job cache directories</li>
* <li>Archive directories</li>
* <li>Hadoop log directories</li>
@@ -72,35 +76,14 @@
*/
void setup() {
for (String localDir : this.mapredLocalDirs) {
- // Set up the mapred-local directories.
+ // Set up the mapreduce.cluster.local.directories.
File mapredlocalDir = new File(localDir);
if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
- LOG.warn("Unable to create mapred-local directory : "
+ LOG.warn("Unable to create mapreduce.cluster.local.directory : "
+ mapredlocalDir.getPath());
} else {
- PermissionsHandler.setPermissions(mapredlocalDir,
- PermissionsHandler.sevenFiveFive);
- }
-
- // Set up the cache directory used for distributed cache files
- File distributedCacheDir =
- new File(localDir, TaskTracker.getDistributedCacheDir());
- if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
- LOG.warn("Unable to create cache directory : "
- + distributedCacheDir.getPath());
- } else {
- PermissionsHandler.setPermissions(distributedCacheDir,
- PermissionsHandler.sevenFiveFive);
- }
-
- // Set up the jobcache directory
- File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
- if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
- LOG.warn("Unable to create job cache directory : "
- + jobCacheDir.getPath());
- } else {
- PermissionsHandler.setPermissions(jobCacheDir,
- PermissionsHandler.sevenFiveFive);
+ Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
+ Localizer.PermissionsHandler.sevenFiveFive);
}
}
@@ -109,8 +92,8 @@
if (!taskLog.exists() && !taskLog.mkdirs()) {
LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
} else {
- PermissionsHandler.setPermissions(taskLog,
- PermissionsHandler.sevenFiveFive);
+ Localizer.PermissionsHandler.setPermissions(taskLog,
+ Localizer.PermissionsHandler.sevenFiveFive);
}
}
@@ -124,6 +107,17 @@
abstract void initializeJob(JobInitializationContext context) throws IOException;
/**
+ * Take task-controller specific actions to initialize the distributed cache
+ * files. This involves setting appropriate permissions for these files so as
+ * to secure them to be accessible only their owners.
+ *
+ * @param context
+ * @throws IOException
+ */
+ public abstract void initializeDistributedCache(InitializationContext context)
+ throws IOException;
+
+ /**
* Launch a task JVM
*
* This method defines how a JVM will be launched to run a task. Each
@@ -178,12 +172,14 @@
abstract void initializeTask(TaskControllerContext context)
throws IOException;
+ static class TaskExecContext {
+ // task being executed
+ Task task;
+ }
/**
* Contains task information required for the task controller.
*/
- static class TaskControllerContext {
- // task being executed
- Task task;
+ static class TaskControllerContext extends TaskExecContext {
ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
// Information used only when this context is used for launching new tasks.
@@ -194,10 +190,23 @@
long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
}
- static class JobInitializationContext {
+ /**
+ * NOTE: This class is internal only class and not intended for users!!
+ *
+ */
+ public static class InitializationContext {
+ public File workDir;
+ public String user;
+ }
+
+ static class JobInitializationContext extends InitializationContext {
JobID jobid;
+ }
+
+ static class DebugScriptContext extends TaskExecContext {
+ List<String> args;
File workDir;
- String user;
+ File stdout;
}
/**
@@ -214,4 +223,23 @@
* @param context task context
*/
abstract void killTask(TaskControllerContext context);
+
+ /**
+ * Initialize user on this TaskTracer in a TaskController specific manner.
+ *
+ * @param context
+ * @throws IOException
+ */
+ public abstract void initializeUser(InitializationContext context)
+ throws IOException;
+
+ /**
+ * Launch the task debug script
+ *
+ * @param context
+ * @throws IOException
+ */
+ abstract void runDebugScript(DebugScriptContext context)
+ throws IOException;
+
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sat Nov 28 20:26:01 2009
@@ -32,10 +32,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
import org.apache.hadoop.net.Node;
@@ -63,10 +65,11 @@
// Defines the TIP
private String jobFile = null;
- private RawSplit rawSplit;
+ private Job.RawSplit rawSplit;
private int numMaps;
private int partition;
private JobTracker jobtracker;
+ private JobHistory jobHistory;
private TaskID id;
private JobInProgress job;
private final int numSlotsRequired;
@@ -110,7 +113,7 @@
/**
* Map from taskId -> TaskStatus
*/
- private TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
+ TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
new TreeMap<TaskAttemptID,TaskStatus>();
// Map from taskId -> TaskTracker Id,
@@ -137,7 +140,7 @@
* Constructor for MapTask
*/
public TaskInProgress(JobID jobid, String jobFile,
- RawSplit rawSplit,
+ Job.RawSplit rawSplit,
JobTracker jobtracker, JobConf conf,
JobInProgress job, int partition,
int numSlotsRequired) {
@@ -151,6 +154,9 @@
this.numSlotsRequired = numSlotsRequired;
setMaxTaskAttempts();
init(jobid);
+ if (jobtracker != null) {
+ this.jobHistory = jobtracker.getJobHistory();
+ }
}
/**
@@ -170,6 +176,9 @@
this.numSlotsRequired = numSlotsRequired;
setMaxTaskAttempts();
init(jobid);
+ if (jobtracker != null) {
+ this.jobHistory = jobtracker.getJobHistory();
+ }
}
/**
@@ -287,7 +296,8 @@
*/
public void setExecFinishTime(long finishTime) {
execFinishTime = finishTime;
- JobHistory.Task.logUpdates(id, execFinishTime); // log the update
+ TaskUpdatedEvent tue = new TaskUpdatedEvent(id, execFinishTime);
+ jobHistory.logEvent(tue, id.getJobID());
}
/**
@@ -975,6 +985,8 @@
public Task addRunningTask(TaskAttemptID taskid,
String taskTracker,
boolean taskCleanup) {
+ // 1 slot is enough for taskCleanup task
+ int numSlotsNeeded = taskCleanup ? 1 : numSlotsRequired;
// create the task
Task t = null;
if (isMapTask()) {
@@ -989,9 +1001,9 @@
split = new BytesWritable();
}
t = new MapTask(jobFile, taskid, partition, splitClass, split,
- numSlotsRequired);
+ numSlotsNeeded);
} else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsRequired);
+ t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded);
}
if (jobCleanup) {
t.setJobCleanupTask();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskLog.java Sat Nov 28 20:26:01 2009
@@ -39,7 +39,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@@ -59,13 +59,9 @@
private static final File LOG_DIR =
new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
+ // localFS is set in (and used by) writeToIndexFile()
static LocalFileSystem localFS = null;
static {
- try {
- localFS = FileSystem.getLocal(new Configuration());
- } catch (IOException ioe) {
- LOG.warn("Getting local file system failed.");
- }
if (!LOG_DIR.exists()) {
boolean b = LOG_DIR.mkdirs();
if (!b) {
@@ -200,6 +196,10 @@
File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
Path indexFilePath = new Path(indexFile.getAbsolutePath());
Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+
+ if (localFS == null) {// set localFS once
+ localFS = FileSystem.getLocal(new Configuration());
+ }
localFS.rename (tmpIndexFilePath, indexFilePath);
}
private static void resetPrevLengths(TaskAttemptID firstTaskid) {
@@ -395,7 +395,7 @@
* @return the number of bytes to cap the log files at
*/
public static long getTaskLogLength(JobConf conf) {
- return conf.getLong("mapred.userlog.limit.kb", 0) * 1024;
+ return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
}
/**
@@ -561,6 +561,37 @@
}
/**
+ * Construct the command line for running the debug script
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @return the command line as a String
+ * @throws IOException
+ */
+ static String buildDebugScriptCommandLine(List<String> cmd, String debugout)
+ throws IOException {
+ StringBuilder mergedCmd = new StringBuilder();
+ mergedCmd.append("exec ");
+ boolean isExecutable = true;
+ for(String s: cmd) {
+ if (isExecutable) {
+ // the executable name needs to be expressed as a shell path for the
+ // shell to find it.
+ mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+ isExecutable = false;
+ } else {
+ mergedCmd.append(s);
+ }
+ mergedCmd.append(" ");
+ }
+ mergedCmd.append(" < /dev/null ");
+ mergedCmd.append(" >");
+ mergedCmd.append(debugout);
+ mergedCmd.append(" 2>&1 ");
+ return mergedCmd.toString();
+ }
+ /**
* Add quotes to each of the command strings and
* return as a single string
* @param cmd The command to be quoted
@@ -604,25 +635,7 @@
List<String> result = new ArrayList<String>(3);
result.add(bashCommand);
result.add("-c");
- StringBuffer mergedCmd = new StringBuffer();
- mergedCmd.append("exec ");
- boolean isExecutable = true;
- for(String s: cmd) {
- if (isExecutable) {
- // the executable name needs to be expressed as a shell path for the
- // shell to find it.
- mergedCmd.append(FileUtil.makeShellPath(new File(s)));
- isExecutable = false;
- } else {
- mergedCmd.append(s);
- }
- mergedCmd.append(" ");
- }
- mergedCmd.append(" < /dev/null ");
- mergedCmd.append(" >");
- mergedCmd.append(debugout);
- mergedCmd.append(" 2>&1 ");
- result.add(mergedCmd.toString());
+ result.add(buildDebugScriptCommandLine(cmd, debugout));
return result;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Sat Nov 28 20:26:01 2009
@@ -29,8 +29,9 @@
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
-import org.apache.hadoop.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.util.ProcessTree;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.StringUtils;
/**
@@ -50,11 +51,14 @@
private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
private List<TaskAttemptID> tasksToBeRemoved;
+ private static final String MEMORY_USAGE_STRING =
+ "Memory usage of ProcessTree %s for task-id %s : %d bytes, " +
+ "limit : %d bytes";
+
public TaskMemoryManagerThread(TaskTracker taskTracker) {
this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
- taskTracker.getJobConf().getLong(
- "mapred.tasktracker.taskmemorymanager.monitoring-interval",
- 5000L));
+ taskTracker.getJobConf().getLong(
+ TTConfig.TT_MEMORY_MANAGER_MONITORING_INTERVAL, 5000L));
this.taskTracker = taskTracker;
}
@@ -179,7 +183,7 @@
taskTracker
.getJobConf()
.getLong(
- "mapred.tasktracker.tasks.sleeptime-before-sigkill",
+ TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
// create process tree object
@@ -209,18 +213,19 @@
// are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
long limit = ptInfo.getMemLimit();
- LOG.info("Memory usage of ProcessTree " + pId + " :"
- + currentMemUsage + "bytes. Limit : " + limit + "bytes");
+ LOG.info(String.format(MEMORY_USAGE_STRING,
+ pId, tid.toString(), currentMemUsage, limit));
if (isProcessTreeOverLimit(tid.toString(), currentMemUsage,
curMemUsageOfAgedProcesses, limit)) {
// Task (the root process) is still alive and overflowing memory.
- // Clean up.
+ // Dump the process-tree and then clean it up.
String msg =
"TaskTree [pid=" + pId + ",tipID=" + tid
+ "] is running beyond memory-limits. Current usage : "
+ currentMemUsage + "bytes. Limit : " + limit
- + "bytes. Killing task.";
+ + "bytes. Killing task. \nDump of the process-tree for "
+ + tid + " : \n" + pTree.getProcessTreeDump();
LOG.warn(msg);
taskTracker.cleanUpOverMemoryTask(tid, true, msg);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskReport.java Sat Nov 28 20:26:01 2009
@@ -17,33 +17,18 @@
*/
package org.apache.hadoop.mapred;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-/** A report on the state of a task. */
-public class TaskReport implements Writable {
- private TaskID taskid;
- private float progress;
- private String state;
- private String[] diagnostics;
- private long startTime;
- private long finishTime;
- private Counters counters;
- private TIPStatus currentStatus;
-
- private Collection<TaskAttemptID> runningAttempts =
- new ArrayList<TaskAttemptID>();
- private TaskAttemptID successfulAttempt = new TaskAttemptID();
+/** A report on the state of a task.
+ * @deprecated Use {@link org.apache.hadoop.mapreduce.TaskReport} instead
+ **/
+@Deprecated
+public class TaskReport extends org.apache.hadoop.mapreduce.TaskReport {
+
public TaskReport() {
- taskid = new TaskID();
+ super();
}
/**
@@ -80,160 +65,83 @@
String[] diagnostics, TIPStatus currentStatus,
long startTime, long finishTime,
Counters counters) {
- this.taskid = taskid;
- this.progress = progress;
- this.state = state;
- this.diagnostics = diagnostics;
- this.currentStatus = currentStatus;
- this.startTime = startTime;
- this.finishTime = finishTime;
- this.counters = counters;
- }
-
- /** @deprecated use {@link #getTaskID()} instead */
- @Deprecated
- public String getTaskId() { return taskid.toString(); }
- /** The id of the task. */
- public TaskID getTaskID() { return taskid; }
- /** The amount completed, between zero and one. */
- public float getProgress() { return progress; }
- /** The most recent state, reported by a {@link Reporter}. */
- public String getState() { return state; }
- /** A list of error messages. */
- public String[] getDiagnostics() { return diagnostics; }
- /** A table of counters. */
- public Counters getCounters() { return counters; }
- /** The current status */
- public TIPStatus getCurrentStatus() {
- return currentStatus;
+ super(taskid, progress, state, diagnostics, currentStatus, startTime,
+ finishTime, new org.apache.hadoop.mapreduce.Counters(counters));
}
- /**
- * Get finish time of task.
- * @return 0, if finish time was not set else returns finish time.
- */
- public long getFinishTime() {
- return finishTime;
- }
-
- /**
- * set finish time of task.
- * @param finishTime finish time of task.
- */
- void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
+ static TaskReport downgrade(
+ org.apache.hadoop.mapreduce.TaskReport report) {
+ return new TaskReport(TaskID.downgrade(report.getTaskId()),
+ report.getProgress(), report.getState(), report.getDiagnostics(),
+ report.getCurrentStatus(), report.getStartTime(), report.getFinishTime(),
+ Counters.downgrade(report.getTaskCounters()));
}
-
- /**
- * Get start time of task.
- * @return 0 if start time was not set, else start time.
- */
- public long getStartTime() {
- return startTime;
+
+ static TaskReport[] downgradeArray(org.apache.hadoop.
+ mapreduce.TaskReport[] reports) {
+ List<TaskReport> ret = new ArrayList<TaskReport>();
+ for (org.apache.hadoop.mapreduce.TaskReport report : reports) {
+ ret.add(downgrade(report));
+ }
+ return ret.toArray(new TaskReport[0]);
}
-
- /**
- * set start time of the task.
- */
- void setStartTime(long startTime) {
- this.startTime = startTime;
+
+ /** The id of the task. */
+ public TaskID getTaskID() { return TaskID.downgrade(super.getTaskId()); }
+
+ public Counters getCounters() {
+ return Counters.downgrade(super.getTaskCounters());
}
-
+
/**
* set successful attempt ID of the task.
*/
public void setSuccessfulAttempt(TaskAttemptID t) {
- successfulAttempt = t;
+ super.setSuccessfulAttemptId(t);
}
/**
* Get the attempt ID that took this task to completion
*/
public TaskAttemptID getSuccessfulTaskAttempt() {
- return successfulAttempt;
+ return TaskAttemptID.downgrade(super.getSuccessfulTaskAttemptId());
}
/**
* set running attempt(s) of the task.
*/
public void setRunningTaskAttempts(
Collection<TaskAttemptID> runningAttempts) {
- this.runningAttempts = runningAttempts;
+ Collection<org.apache.hadoop.mapreduce.TaskAttemptID> attempts =
+ new ArrayList<org.apache.hadoop.mapreduce.TaskAttemptID>();
+ for (TaskAttemptID id : runningAttempts) {
+ attempts.add(id);
+ }
+ super.setRunningTaskAttemptIds(attempts);
}
/**
* Get the running task attempt IDs for this task
*/
public Collection<TaskAttemptID> getRunningTaskAttempts() {
- return runningAttempts;
- }
-
-
- @Override
- public boolean equals(Object o) {
- if(o == null)
- return false;
- if(o.getClass().equals(this.getClass())) {
- TaskReport report = (TaskReport) o;
- return counters.equals(report.getCounters())
- && Arrays.toString(this.diagnostics)
- .equals(Arrays.toString(report.getDiagnostics()))
- && this.finishTime == report.getFinishTime()
- && this.progress == report.getProgress()
- && this.startTime == report.getStartTime()
- && this.state.equals(report.getState())
- && this.taskid.equals(report.getTaskID());
+ Collection<TaskAttemptID> attempts = new ArrayList<TaskAttemptID>();
+ for (org.apache.hadoop.mapreduce.TaskAttemptID id :
+ super.getRunningTaskAttemptIds()) {
+ attempts.add(TaskAttemptID.downgrade(id));
}
- return false;
+ return attempts;
}
-
- @Override
- public int hashCode() {
- return (counters.toString() + Arrays.toString(this.diagnostics)
- + this.finishTime + this.progress + this.startTime + this.state
- + this.taskid.toString()).hashCode();
- }
- //////////////////////////////////////////////
- // Writable
- //////////////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- taskid.write(out);
- out.writeFloat(progress);
- Text.writeString(out, state);
- out.writeLong(startTime);
- out.writeLong(finishTime);
- WritableUtils.writeStringArray(out, diagnostics);
- counters.write(out);
- WritableUtils.writeEnum(out, currentStatus);
- if (currentStatus == TIPStatus.RUNNING) {
- WritableUtils.writeVInt(out, runningAttempts.size());
- TaskAttemptID t[] = new TaskAttemptID[0];
- t = runningAttempts.toArray(t);
- for (int i = 0; i < t.length; i++) {
- t[i].write(out);
- }
- } else if (currentStatus == TIPStatus.COMPLETE) {
- successfulAttempt.write(out);
- }
+
+ /**
+ * set finish time of task.
+ * @param finishTime finish time of task.
+ */
+ protected void setFinishTime(long finishTime) {
+ super.setFinishTime(finishTime);
}
- public void readFields(DataInput in) throws IOException {
- this.taskid.readFields(in);
- this.progress = in.readFloat();
- this.state = Text.readString(in);
- this.startTime = in.readLong();
- this.finishTime = in.readLong();
-
- diagnostics = WritableUtils.readStringArray(in);
- counters = new Counters();
- counters.readFields(in);
- currentStatus = WritableUtils.readEnum(in, TIPStatus.class);
- if (currentStatus == TIPStatus.RUNNING) {
- int num = WritableUtils.readVInt(in);
- for (int i = 0; i < num; i++) {
- TaskAttemptID t = new TaskAttemptID();
- t.readFields(in);
- runningAttempts.add(t);
- }
- } else if (currentStatus == TIPStatus.COMPLETE) {
- successfulAttempt.readFields(in);
- }
+ /**
+ * set start time of the task.
+ */
+ protected void setStartTime(long startTime) {
+ super.setStartTime(startTime);
}
+
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java Sat Nov 28 20:26:01 2009
@@ -32,15 +32,18 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
+import org.apache.hadoop.mapred.TaskController.InitializationContext;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
@@ -141,6 +144,7 @@
* set via {@link JobConf#MAPRED_MAP_TASK_ENV} or
* {@link JobConf#MAPRED_REDUCE_TASK_ENV}
*/
+ @Deprecated
public String getChildEnv(JobConf jobConf) {
return jobConf.get(JobConf.MAPRED_TASK_ENV);
}
@@ -160,21 +164,26 @@
//before preparing the job localize
//all the archives
TaskAttemptID taskid = t.getTaskID();
- LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
// We don't create any symlinks yet, so presence/absence of workDir
// actually on the file system doesn't matter.
taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
.newTaskDistributedCacheManager(conf);
- taskDistributedCacheManager.setup(
- lDirAlloc, workDir, TaskTracker.getDistributedCacheDir());
+ taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
+ .getDistributedCacheDir(conf.getUser()));
// Set up the child task's configuration. After this call, no localization
// of files should happen in the TaskTracker's process space. Any changes to
// the conf object after this will NOT be reflected to the child.
setupChildTaskConfiguration(lDirAlloc);
+ InitializationContext context = new InitializationContext();
+ context.user = conf.getUser();
+ context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR));
+ tracker.getTaskController().initializeDistributedCache(context);
+
if (!prepare()) {
return;
}
@@ -202,7 +211,8 @@
stderr);
Map<String, String> env = new HashMap<String, String>();
- errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
+ errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
+ taskid, logSize);
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
@@ -246,7 +256,12 @@
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
- tip.reportTaskFinished();
+
+ // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
+ // *false* since the task has either
+ // a) SUCCEEDED - which means commit has been done
+ // b) FAILED - which means we do not need to commit
+ tip.reportTaskFinished(false);
}
}
@@ -265,8 +280,8 @@
if (!b) {
LOG.warn("mkdirs failed. Ignoring");
} else {
- PermissionsHandler.setPermissions(logDir,
- PermissionsHandler.sevenZeroZero);
+ Localizer.PermissionsHandler.setPermissions(logDir,
+ Localizer.PermissionsHandler.sevenZeroZero);
}
return logFiles;
}
@@ -282,9 +297,9 @@
throws IOException {
Path localTaskFile =
- lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
- .getJobID().toString(), t.getTaskID().toString(), t
- .isTaskCleanupTask()), conf);
+ lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(
+ t.getUser(), t.getJobID().toString(), t.getTaskID().toString(), t
+ .isTaskCleanupTask()), conf);
// write the child's task configuration file to the local disk
writeLocalTaskFile(localTaskFile.toString(), conf);
@@ -433,8 +448,8 @@
JobConf conf)
throws IOException {
- // add java.io.tmpdir given by mapred.child.tmp
- String tmp = conf.get("mapred.child.tmp", "./tmp");
+ // add java.io.tmpdir given by mapreduce.task.tmp.dir
+ String tmp = conf.get(JobContext.TASK_TEMP_DIR, "./tmp");
Path tmpDir = new Path(tmp);
// if temp directory path is not absolute, prepend it with workDir.
@@ -471,6 +486,7 @@
}
/**
+ * sets the environment variables needed for task jvm and its children.
* @param errorInfo
* @param workDir
* @param env
@@ -478,7 +494,7 @@
* @throws Throwable
*/
private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
- Map<String, String> env)
+ Map<String, String> env, TaskAttemptID taskid, long logSize)
throws Throwable {
StringBuffer ldLibraryPath = new StringBuffer();
ldLibraryPath.append(workDir.toString());
@@ -490,6 +506,23 @@
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ // put jobTokenFile name into env
+ String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+ LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
+ env.put("JOB_TOKEN_FILE", jobTokenFile);
+
+ // for the child of task jvm, set hadoop.root.logger
+ env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+ String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+ if (hadoopClientOpts == null) {
+ hadoopClientOpts = "";
+ } else {
+ hadoopClientOpts = hadoopClientOpts + " ";
+ }
+ hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+ + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+ env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
// add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf);
if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
@@ -550,25 +583,26 @@
}
/**
- * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+ * Prepare the Configs.LOCAL_DIR for the child. The child is sand-boxed now.
* Whenever it uses LocalDirAllocator from now on inside the child, it will
* only see files inside the attempt-directory. This is done in the Child's
* process space.
*/
static void setupChildMapredLocalDirs(Task t, JobConf conf) {
- String[] localDirs = conf.getStrings("mapred.local.dir");
+ String[] localDirs = conf.getStrings(MRConfig.LOCAL_DIR);
String jobId = t.getJobID().toString();
String taskId = t.getTaskID().toString();
boolean isCleanup = t.isTaskCleanupTask();
+ String user = t.getUser();
StringBuffer childMapredLocalDir =
new StringBuffer(localDirs[0] + Path.SEPARATOR
- + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+ + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
for (int i = 1; i < localDirs.length; i++) {
childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
- + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+ + TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
}
- LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
- conf.set("mapred.local.dir", childMapredLocalDir.toString());
+ LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
+ conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
}
/** Creates the working directory pathname for a task attempt. */
@@ -576,8 +610,9 @@
TaskAttemptID task, boolean isCleanup, JobConf conf)
throws IOException {
Path workDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
- .getJobID().toString(), task.toString(), isCleanup), conf);
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+ conf.getUser(), task.getJobID().toString(), task.toString(),
+ isCleanup), conf);
return new File(workDir.toString());
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskScheduler.java Sat Nov 28 20:26:01 2009
@@ -91,5 +91,60 @@
* @return
*/
public abstract Collection<JobInProgress> getJobs(String queueName);
-
+
+ /**
+ * Abstract QueueRefresher class. Scheduler's can extend this and return an
+ * instance of this in the {@link #getQueueRefresher()} method. The
+ * {@link #refreshQueues(List)} method of this instance will be invoked by the
+ * {@link QueueManager} whenever it gets a request from an administrator to
+ * refresh its own queue-configuration. This method has a documented contract
+ * between the {@link QueueManager} and the {@link TaskScheduler}.
+ */
+ abstract class QueueRefresher {
+
+ /**
+ * Refresh the queue-configuration in the scheduler. This method has the
+ * following contract.
+ * <ol>
+ * <li>Before this method, {@link QueueManager} does a validation of the new
+ * queue-configuration. For e.g, currently addition of new queues, or
+ * removal of queues at any level in the hierarchy is not supported by
+ * {@link QueueManager} and so are not supported for schedulers too.</li>
+ * <li>Schedulers will be passed a list of {@link JobQueueInfo}s of the root
+ * queues i.e. the queues at the top level. All the descendants are properly
+ * linked from these top-level queues.</li>
+ * <li>Schedulers should use the scheduler specific queue properties from
+ * the newRootQueues, validate the properties themselves and apply them
+ * internally.</li>
+ * <li>
+ * Once the method returns successfully from the schedulers, it is assumed
+ * that the refresh of queue properties is successful throughout and will be
+ * 'committed' internally to {@link QueueManager} too. It is guaranteed that
+ * at no point, after successful return from the scheduler, is the queue
+ * refresh in QueueManager failed. If ever, such abnormalities happen, the
+ * queue framework will be inconsistent and will need a JT restart.</li>
+ * <li>If scheduler throws an exception during {@link #refreshQueues()},
+ * {@link QueueManager} throws away the newly read configuration, retains
+ * the old (consistent) configuration and informs the request issuer about
+ * the error appropriately.</li>
+ * </ol>
+ *
+ * @param newRootQueues
+ */
+ abstract void refreshQueues(List<JobQueueInfo> newRootQueues)
+ throws Throwable;
+ }
+
+ /**
+ * Get the {@link QueueRefresher} for this scheduler. By default, no
+ * {@link QueueRefresher} exists for a scheduler and is set to null.
+ * Schedulers need to return an instance of {@link QueueRefresher} if they
+ * wish to refresh their queue-configuration when {@link QueueManager}
+ * refreshes its own queue-configuration via an administrator request.
+ *
+ * @return
+ */
+ QueueRefresher getQueueRefresher() {
+ return null;
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskStatus.java Sat Nov 28 20:26:01 2009
@@ -27,12 +27,13 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringUtils;
/**************************************************
* Describes the current status of a task. This is
* not intended to be a comprehensive piece of data.
*
**************************************************/
-abstract class TaskStatus implements Writable, Cloneable {
+public abstract class TaskStatus implements Writable, Cloneable {
static final Log LOG =
LogFactory.getLog(TaskStatus.class.getName());
@@ -132,11 +133,21 @@
}
/**
- * Sets finishTime.
+ * Sets finishTime for the task status if and only if the
+ * start time is set and passed finish time is greater than
+ * zero.
+ *
* @param finishTime finish time of task.
*/
void setFinishTime(long finishTime) {
- this.finishTime = finishTime;
+ if(this.getStartTime() > 0 && finishTime > 0) {
+ this.finishTime = finishTime;
+ } else {
+ //Using String utils to get the stack trace.
+ LOG.error("Trying to set finish time for task " + taskid +
+ " when no start time is set, stackTrace is : " +
+ StringUtils.stringifyException(new Exception()));
+ }
}
/**
* Get shuffle finish time for the task. If shuffle finish time was
@@ -201,11 +212,20 @@
}
/**
- * Set startTime of the task.
+ * Set startTime of the task if start time is greater than zero.
* @param startTime start time
*/
void setStartTime(long startTime) {
- this.startTime = startTime;
+ //Making the assumption of passed startTime to be a positive
+ //long value explicit.
+ if (startTime > 0) {
+ this.startTime = startTime;
+ } else {
+ //Using String utils to get the stack trace.
+ LOG.error("Trying to set illegal startTime for task : " + taskid +
+ ".Stack trace is : " +
+ StringUtils.stringifyException(new Exception()));
+ }
}
/**
* Get current phase of this task. Phase.Map in case of map tasks,
@@ -219,7 +239,7 @@
* Set current phase of this task.
* @param phase phase of this task
*/
- void setPhase(Phase phase){
+ public void setPhase(Phase phase){
TaskStatus.Phase oldPhase = getPhase();
if (oldPhase != phase){
// sort phase started
@@ -294,7 +314,7 @@
*
* @param mapTaskId map from which fetch failed
*/
- synchronized void addFetchFailedMap(TaskAttemptID mapTaskId) {}
+ public abstract void addFetchFailedMap(TaskAttemptID mapTaskId);
/**
* Update the status of the task.
@@ -326,11 +346,11 @@
setDiagnosticInfo(status.getDiagnosticInfo());
- if (status.getStartTime() != 0) {
- this.startTime = status.getStartTime();
+ if (status.getStartTime() > 0) {
+ this.setStartTime(status.getStartTime());
}
- if (status.getFinishTime() != 0) {
- this.finishTime = status.getFinishTime();
+ if (status.getFinishTime() > 0) {
+ this.setFinishTime(status.getFinishTime());
}
this.phase = status.getPhase();
@@ -359,8 +379,8 @@
setProgress(progress);
setStateString(state);
setPhase(phase);
- if (finishTime != 0) {
- this.finishTime = finishTime;
+ if (finishTime > 0) {
+ setFinishTime(finishTime);
}
}