You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/08/12 18:17:49 UTC
svn commit: r803583 [2/3] - in /hadoop/mapreduce/trunk: ./ conf/
src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/docs/src/documentation/content/xdocs/ src/java/org/apa...
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Aug 12 16:17:47 2009
@@ -22,12 +22,12 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
@@ -90,7 +90,7 @@
private JobStatus status;
private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
- private MapOutputFile mapoutputFile;
+
private JobProfile profile;
private Path localFile;
private FileSystem localFs;
@@ -110,8 +110,6 @@
public Job(JobID jobid, JobConf conf) throws IOException {
this.file = new Path(getSystemDir(), jobid + "/job.xml");
this.id = jobid;
- this.mapoutputFile = new MapOutputFile(jobid);
- this.mapoutputFile.setConf(conf);
this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
this.localFs = FileSystem.getLocal(conf);
@@ -168,7 +166,9 @@
}
outputCommitter.setupJob(jContext);
status.setSetupProgress(1.0f);
-
+
+ Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
+ new HashMap<TaskAttemptID, MapOutputFile>();
for (int i = 0; i < rawSplits.length; i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = new TaskAttemptID(
@@ -179,6 +179,12 @@
rawSplits[i].getClassName(),
rawSplits[i].getBytes(), 1);
JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+ MapOutputFile mapOutput = new MapOutputFile();
+ mapOutput.setConf(localConf);
+ mapOutputFiles.put(mapId, mapOutput);
+
map.setJobFile(localFile.toString());
map.localizeConfiguration(localConf);
map.setConf(localConf);
@@ -196,14 +202,20 @@
new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
try {
if (numReduceTasks > 0) {
+ ReduceTask reduce = new ReduceTask(file.toString(),
+ reduceId, 0, mapIds.size(), 1);
+ JobConf localConf = new JobConf(job);
+ TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
// move map output to reduce input
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
- Path mapOut = this.mapoutputFile.getOutputFile(mapId);
- Path reduceIn = this.mapoutputFile.getInputFileForWrite(
- mapId.getTaskID(),reduceId,
- localFs.getFileStatus(mapOut).getLen());
+ Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+ MapOutputFile localOutputFile = new MapOutputFile();
+ localOutputFile.setConf(localConf);
+ Path reduceIn =
+ localOutputFile.getInputFileForWrite(mapId.getTaskID(),
+ localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
@@ -215,9 +227,6 @@
}
}
if (!this.isInterrupted()) {
- ReduceTask reduce = new ReduceTask(file.toString(),
- reduceId, 0, mapIds.size(), 1);
- JobConf localConf = new JobConf(job);
reduce.setJobFile(localFile.toString());
reduce.localizeConfiguration(localConf);
reduce.setConf(localConf);
@@ -232,11 +241,8 @@
}
}
} finally {
- for (TaskAttemptID mapId: mapIds) {
- this.mapoutputFile.removeAll(mapId);
- }
- if (numReduceTasks == 1) {
- this.mapoutputFile.removeAll(reduceId);
+ for (MapOutputFile output : mapOutputFiles.values()) {
+ output.removeAll();
}
}
// delete the temporary directory in output directory
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Wed Aug 12 16:17:47 2009
@@ -30,144 +30,152 @@
class MapOutputFile {
private JobConf conf;
- private JobID jobId;
-
- MapOutputFile() {
- }
- MapOutputFile(JobID jobId) {
- this.jobId = jobId;
+ static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
+ MapOutputFile() {
}
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
- /** Return the path to local map output file created earlier
- * @param mapTaskId a map task id
- */
- public Path getOutputFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", conf);
+ /**
+ * Return the path to local map output file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", conf);
}
- /** Create a local map output file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map output file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out", size, conf);
- }
-
- /** Return the path to a local map output index file created earlier
- * @param mapTaskId a map task id
- */
- public Path getOutputIndexFile(TaskAttemptID mapTaskId)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index", conf);
- }
-
- /** Create a local map output index file name.
- * @param mapTaskId a map task id
+ public Path getOutputFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out", size, conf);
+ }
+
+ /**
+ * Return the path to a local map output index file created earlier
+ *
+ * @return path
+ * @throws IOException
+ */
+ public Path getOutputIndexFile()
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", conf);
+ }
+
+ /**
+ * Create a local map output index file name.
+ *
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
- throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/file.out.index",
- size, conf);
+ public Path getOutputIndexFileForWrite(long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+ + "file.out.index", size, conf);
}
- /** Return a local map spill file created earlier.
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill file created earlier.
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill"
- + spillNumber + ".out", conf);
+ public Path getSpillFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", conf);
}
- /** Create a local map spill file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out", size, conf);
+ public Path getSpillFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out", size, conf);
}
- /** Return a local map spill index file created earlier
- * @param mapTaskId a map task id
+ /**
+ * Return a local map spill index file created earlier
+ *
* @param spillNumber the number
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
- throws IOException {
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" +
- spillNumber + ".out.index", conf);
+ public Path getSpillIndexFile(int spillNumber)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", conf);
}
- /** Create a local map spill index file name.
- * @param mapTaskId a map task id
+ /**
+ * Create a local map spill index file name.
+ *
* @param spillNumber the number
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
- long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), mapTaskId.toString())
- + "/spill" + spillNumber +
- ".out.index", size, conf);
- }
-
- /** Return a local reduce input file created earlier
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
- */
- public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId + ".out",
- conf);
- }
-
- /** Create a local reduce input file name.
- * @param mapTaskId a map task id
- * @param reduceTaskId a reduce task id
+ public Path getSpillIndexFileForWrite(int spillNumber, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+ + spillNumber + ".out.index", size, conf);
+ }
+
+ /**
+ * Return a local reduce input file created earlier
+ *
+ * @param mapId a map task id
+ * @return path
+ * @throws IOException
+ */
+ public Path getInputFile(int mapId)
+ throws IOException {
+ return lDirAlloc.getLocalPathToRead(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
+ .valueOf(mapId)), conf);
+ }
+
+ /**
+ * Create a local reduce input file name.
+ *
+ * @param mapId a map task id
* @param size the size of the file
+ * @return path
+ * @throws IOException
*/
- public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId,
- long size)
- throws IOException {
- // TODO *oom* should use a format here
- return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), reduceTaskId.toString())
- + "/map_" + mapId.getId() + ".out",
- size, conf);
+ public Path getInputFileForWrite(TaskID mapId, long size)
+ throws IOException {
+ return lDirAlloc.getLocalPathForWrite(String.format(
+ REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
+ size, conf);
}
/** Removes all of the files related to a task. */
- public void removeAll(TaskAttemptID taskId) throws IOException {
- conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
- jobId.toString(), taskId.toString())
-);
+ public void removeAll()
+ throws IOException {
+ conf.deleteLocalFiles(TaskTracker.OUTPUT);
}
public void setConf(Configuration conf) {
@@ -177,9 +185,4 @@
this.conf = new JobConf(conf);
}
}
-
- public void setJobId(JobID jobId) {
- this.jobId = jobId;
- }
-
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Aug 12 16:17:47 2009
@@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -66,7 +67,6 @@
* The size of each record in the index file for the map-outputs.
*/
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
private BytesWritable split = new BytesWritable();
private String splitClass;
@@ -101,11 +101,21 @@
}
@Override
- public void localizeConfiguration(JobConf conf) throws IOException {
+ public void localizeConfiguration(JobConf conf)
+ throws IOException {
super.localizeConfiguration(conf);
- if (isMapOrReduce()) {
- Path localSplit = new Path(new Path(getJobFile()).getParent(),
- "split.dta");
+ // split.dta file is used only by IsolationRunner.
+ // Write the split file to the local disk if it is a normal map task (not a
+ // job-setup or a job-cleanup task) and if the user wishes to run
+ // IsolationRunner either by setting keep.failed.tasks.files to true or by
+ // using keep.tasks.files.pattern
+ if (isMapOrReduce()
+ && (conf.getKeepTaskFilesPattern() != null || conf
+ .getKeepFailedTaskFiles())) {
+ Path localSplit =
+ new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
+ TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
+ .toString()), conf);
LOG.debug("Writing local split to " + localSplit);
DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
Text.writeString(out, splitClass);
@@ -1220,8 +1230,8 @@
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart)
@@ -1285,9 +1295,9 @@
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1313,8 +1323,8 @@
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
@@ -1350,9 +1360,9 @@
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
@@ -1442,14 +1452,14 @@
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(mapId, i);
+ filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
+ rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {
indexCacheList.get(0).writeToFile(
@@ -1460,7 +1470,7 @@
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+ Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
@@ -1468,10 +1478,10 @@
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- mapId, finalIndexFileSize);
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Wed Aug 12 16:17:47 2009
@@ -34,13 +34,13 @@
return false;
}
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
/** Delete all of the temporary map output files. */
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Aug 12 16:17:47 2009
@@ -208,7 +208,7 @@
if (isLocal) {
// for local jobs
for(int i = 0; i < numMaps; ++i) {
- fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
+ fileList.add(mapOutputFile.getInputFile(i));
}
} else {
// for non local jobs
@@ -1283,12 +1283,11 @@
// else, we will check the localFS to find a suitable final location
// for this path
TaskAttemptID reduceId = reduceTask.getTaskID();
- Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
- reduceId.getJobID().toString(),
- reduceId.toString())
- + "/map_" +
- loc.getTaskId().getId() + ".out");
-
+ Path filename =
+ new Path(String.format(
+ MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
+ TaskTracker.OUTPUT, loc.getTaskId().getId()));
+
// Copy the map output to a temp file whose name is unique to this attempt
Path tmpMapOutput = new Path(filename+"-"+id);
@@ -2325,8 +2324,8 @@
sortPhaseFinished = true;
// must spill to disk, but can't retain in-mem for intermediate merge
- final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), inMemToDiskBytes);
+ final Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
@@ -2620,8 +2619,8 @@
long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
int noInMemorySegments = inMemorySegments.size();
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), mergeOutputSize);
+ Path outputPath =
+ mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
Writer writer =
new Writer(conf, rfs, outputPath,
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed Aug 12 16:17:47 2009
@@ -37,7 +37,7 @@
}
// cleanup from failures
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
return true;
}
@@ -46,6 +46,6 @@
public void close() throws IOException {
LOG.info(getTask()+" done; removing files.");
getTask().getProgress().setStatus("closed");
- mapOutputFile.removeAll(getTask().getTaskID());
+ mapOutputFile.removeAll();
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Aug 12 16:17:47 2009
@@ -150,7 +150,6 @@
TaskStatus.Phase.MAP :
TaskStatus.Phase.SHUFFLE,
counters);
- this.mapOutputFile.setJobId(taskId.getJobID());
spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
}
@@ -309,7 +308,6 @@
partition = in.readInt();
numSlotsRequired = in.readInt();
taskStatus.readFields(in);
- this.mapOutputFile.setJobId(taskId.getJobID());
skipRanges.readFields(in);
currentRecIndexIterator = skipRanges.skipRangeIterator();
currentRecStartIndex = currentRecIndexIterator.next();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskController.java Wed Aug 12 16:17:47 2009
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -45,27 +49,95 @@
public Configuration getConf() {
return conf;
}
-
+
+ // The list of directory paths specified in the variable mapred.local.dir.
+ // This is used to determine which among the list of directories is picked up
+ // for storing data for a particular task.
+ protected String[] mapredLocalDirs;
+
public void setConf(Configuration conf) {
this.conf = conf;
+ mapredLocalDirs = conf.getStrings("mapred.local.dir");
}
-
+
+ /**
+ * Sets up the permissions of the following directories on all the configured
+ * disks:
+ * <ul>
+ * <li>mapred-local directories</li>
+ * <li>Job cache directories</li>
+ * <li>Archive directories</li>
+ * <li>Hadoop log directories</li>
+ * </ul>
+ */
+ void setup() {
+ for (String localDir : this.mapredLocalDirs) {
+ // Set up the mapred-local directories.
+ File mapredlocalDir = new File(localDir);
+ if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
+ LOG.warn("Unable to create mapred-local directory : "
+ + mapredlocalDir.getPath());
+ } else {
+ PermissionsHandler.setPermissions(mapredlocalDir,
+ PermissionsHandler.sevenFiveFive);
+ }
+
+ // Set up the cache directory used for distributed cache files
+ File distributedCacheDir =
+ new File(localDir, TaskTracker.getDistributedCacheDir());
+ if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
+ LOG.warn("Unable to create cache directory : "
+ + distributedCacheDir.getPath());
+ } else {
+ PermissionsHandler.setPermissions(distributedCacheDir,
+ PermissionsHandler.sevenFiveFive);
+ }
+
+ // Set up the jobcache directory
+ File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+ if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
+ LOG.warn("Unable to create job cache directory : "
+ + jobCacheDir.getPath());
+ } else {
+ PermissionsHandler.setPermissions(jobCacheDir,
+ PermissionsHandler.sevenFiveFive);
+ }
+ }
+
+ // Set up the user log directory
+ File taskLog = TaskLog.getUserLogDir();
+ if (!taskLog.exists() && !taskLog.mkdirs()) {
+ LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
+ } else {
+ PermissionsHandler.setPermissions(taskLog,
+ PermissionsHandler.sevenFiveFive);
+ }
+ }
+
/**
- * Setup task controller component.
+ * Take task-controller specific actions to initialize job. This involves
+ * setting appropriate permissions to job-files so as to secure the files to
+ * be accessible only by the user's tasks.
*
+ * @throws IOException
*/
- abstract void setup();
-
-
+ abstract void initializeJob(JobInitializationContext context) throws IOException;
+
/**
* Launch a task JVM
*
- * This method defines how a JVM will be launched to run a task.
+ * This method defines how a JVM will be launched to run a task. Each
+ * task-controller should also do an
+ * {@link #initializeTask(TaskControllerContext)} inside this method so as to
+ * initialize the task before launching it. This is for reasons of
+ * task-controller specific optimizations w.r.t combining initialization and
+ * launching of tasks.
+ *
* @param context the context associated to the task
*/
abstract void launchTaskJVM(TaskControllerContext context)
throws IOException;
-
+
/**
* Top level cleanup a task JVM method.
*
@@ -90,47 +162,44 @@
}
killTask(context);
}
-
- /**
- * Perform initializing actions required before a task can run.
- *
- * For instance, this method can be used to setup appropriate
- * access permissions for files and directories that will be
- * used by tasks. Tasks use the job cache, log, PID and distributed cache
- * directories and files as part of their functioning. Typically,
- * these files are shared between the daemon and the tasks
- * themselves. So, a TaskController that is launching tasks
- * as different users can implement this method to setup
- * appropriate ownership and permissions for these directories
- * and files.
- */
- abstract void initializeTask(TaskControllerContext context);
-
-
+
+ /** Perform initializing actions required before a task can run.
+ *
+ * For instance, this method can be used to setup appropriate
+ * access permissions for files and directories that will be
+ * used by tasks. Tasks use the job cache, log, and distributed cache
+ * directories and files as part of their functioning. Typically,
+ * these files are shared between the daemon and the tasks
+ * themselves. So, a TaskController that is launching tasks
+ * as different users can implement this method to setup
+ * appropriate ownership and permissions for these directories
+ * and files.
+ */
+ abstract void initializeTask(TaskControllerContext context)
+ throws IOException;
+
/**
* Contains task information required for the task controller.
*/
static class TaskControllerContext {
// task being executed
- Task task;
- // the JVM environment for the task
- JvmEnv env;
- // the Shell executor executing the JVM for this task
- ShellCommandExecutor shExec;
- // process handle of task JVM
- String pid;
- // waiting time before sending SIGKILL to task JVM after sending SIGTERM
- long sleeptimeBeforeSigkill;
+ Task task;
+ ShellCommandExecutor shExec; // the Shell executor executing the JVM for this task.
+
+ // Information used only when this context is used for launching new tasks.
+ JvmEnv env; // the JVM environment for the task.
+
+ // Information used only when this context is used for destroying a task jvm.
+ String pid; // process handle of task JVM.
+ long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+ }
+
+ static class JobInitializationContext {
+ JobID jobid;
+ File workDir;
+ String user;
}
- /**
- * Method which is called after the job is localized so that task controllers
- * can implement their own job localization logic.
- *
- * @param tip Task of job for which localization happens.
- */
- abstract void initializeJob(JobID jobId);
-
/**
* Sends a graceful terminate signal to taskJVM and it sub-processes.
*
@@ -144,6 +213,5 @@
*
* @param context task context
*/
-
abstract void killTask(TaskControllerContext context);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Wed Aug 12 16:17:47 2009
@@ -54,9 +54,10 @@
private static final Log LOG =
LogFactory.getLog(TaskLog.class);
+ static final String USERLOGS_DIR_NAME = "userlogs";
+
private static final File LOG_DIR =
- new File(System.getProperty("hadoop.log.dir"),
- "userlogs").getAbsoluteFile();
+ new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
static LocalFileSystem localFS = null;
static {
@@ -156,8 +157,12 @@
return new File(getBaseDir(taskid), "log.index");
}
}
-
- private static File getBaseDir(String taskid) {
+
+ static String getBaseLogDir() {
+ return System.getProperty("hadoop.log.dir");
+ }
+
+ static File getBaseDir(String taskid) {
return new File(LOG_DIR, taskid);
}
private static long prevOutLength;
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Aug 12 16:17:47 2009
@@ -39,6 +39,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -77,7 +78,7 @@
this.t = tip.getTask();
this.tracker = tracker;
this.conf = conf;
- this.mapOutputFile = new MapOutputFile(t.getJobID());
+ this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
this.jvmManager = tracker.getJvmManagerInstance();
}
@@ -121,213 +122,45 @@
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-
+
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
+ // We don't create any symlinks yet, so presence/absence of workDir
+ // actually on the file system doesn't matter.
setupDistributedCache(lDirAlloc, workDir, archives, files);
-
+
+ // Set up the child task's configuration. After this call, no localization
+ // of files should happen in the TaskTracker's process space. Any changes to
+ // the conf object after this will NOT be reflected to the child.
+ setupChildTaskConfiguration(lDirAlloc);
+
if (!prepare()) {
return;
}
- // Accumulates class paths for child.
- List<String> classPaths = new ArrayList<String>();
- // start with same classpath as parent process
- appendSystemClasspaths(classPaths);
-
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- LOG.fatal("Mkdirs failed to create " + workDir.toString());
- }
- }
-
- // include the user specified classpath
- appendJobJarClasspaths(conf.getJar(), classPaths);
-
- // Distributed cache paths
- appendDistributedCacheClasspaths(conf, archives, files, classPaths);
-
- // Include the working dir too
- classPaths.add(workDir.toString());
-
// Build classpath
-
-
- // Build exec child JVM args.
- Vector<String> vargs = new Vector<String>(8);
- File jvm = // use same jvm as parent
- new File(new File(System.getProperty("java.home"), "bin"), "java");
-
- vargs.add(jvm.toString());
-
- // Add child (task) java-vm options.
- //
- // The following symbols if present in mapred.child.java.opts value are
- // replaced:
- // + @taskid@ is interpolated with value of TaskID.
- // Other occurrences of @ will not be altered.
- //
- // Example with multiple arguments and substitutions, showing
- // jvm GC logging, and start of a passwordless JVM JMX agent so can
- // connect with jconsole and the likes to watch child memory, threads
- // and get thread dumps.
- //
- // <property>
- // <name>mapred.child.java.opts</name>
- // <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
- // -Dcom.sun.management.jmxremote.authenticate=false \
- // -Dcom.sun.management.jmxremote.ssl=false \
- // </value>
- // </property>
- //
- String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
- javaOpts = javaOpts.replace("@taskid@", taskid.toString());
- String [] javaOptsSplit = javaOpts.split(" ");
-
- // Add java.library.path; necessary for loading native libraries.
- //
- // 1. To support native-hadoop library i.e. libhadoop.so, we add the
- // parent processes' java.library.path to the child.
- // 2. We also add the 'cwd' of the task to it's java.library.path to help
- // users distribute native libraries via the DistributedCache.
- // 3. The user can also specify extra paths to be added to the
- // java.library.path via mapred.child.java.opts.
- //
- String libraryPath = System.getProperty("java.library.path");
- if (libraryPath == null) {
- libraryPath = workDir.getAbsolutePath();
- } else {
- libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
- }
- boolean hasUserLDPath = false;
- for(int i=0; i<javaOptsSplit.length ;i++) {
- if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
- javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
- hasUserLDPath = true;
- break;
- }
- }
- if(!hasUserLDPath) {
- vargs.add("-Djava.library.path=" + libraryPath);
- }
- for (int i = 0; i < javaOptsSplit.length; i++) {
- vargs.add(javaOptsSplit[i]);
- }
-
- // add java.io.tmpdir given by mapred.child.tmp
- String tmp = conf.get("mapred.child.tmp", "./tmp");
- Path tmpDir = new Path(tmp);
-
- // if temp directory path is not absolute
- // prepend it with workDir.
- if (!tmpDir.isAbsolute()) {
- tmpDir = new Path(workDir.toString(), tmp);
- }
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
-
- // Add classpath.
- vargs.add("-classpath");
- String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
- vargs.add(classPath);
+ List<String> classPaths = getClassPaths(conf, workDir, archives, files);
- // Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
- vargs.add("-Dhadoop.log.dir=" +
- new File(System.getProperty("hadoop.log.dir")
- ).getAbsolutePath());
- vargs.add("-Dhadoop.root.logger=INFO,TLA");
- vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
- vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
-
- if (conf.getProfileEnabled()) {
- if (conf.getProfileTaskRange(t.isMapTask()
- ).isIncluded(t.getPartition())) {
- File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
- vargs.add(String.format(conf.getProfileParams(), prof.toString()));
- }
- }
- // Add main class and its arguments
- vargs.add(Child.class.getName()); // main of Child
- // pass umbilical address
- InetSocketAddress address = tracker.getTaskTrackerReportAddress();
- vargs.add(address.getAddress().getHostAddress());
- vargs.add(Integer.toString(address.getPort()));
- vargs.add(taskid.toString()); // pass task identifier
+ // Build exec child JVM args.
+ Vector<String> vargs =
+ getVMArgs(taskid, workDir, classPaths, logSize);
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
// set memory limit using ulimit if feasible and necessary ...
- String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
- List<String> setup = null;
- if (ulimitCmd != null) {
- setup = new ArrayList<String>();
- for (String arg : ulimitCmd) {
- setup.add(arg);
- }
- }
+ List<String> setup = getVMSetupCmd();
// Set up the redirection of the task's stdout and stderr streams
- File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
- File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
- boolean b = stdout.getParentFile().mkdirs();
- if (!b) {
- LOG.warn("mkdirs failed. Ignoring");
- }
- tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
+ File[] logFiles = prepareLogFiles(taskid);
+ File stdout = logFiles[0];
+ File stderr = logFiles[1];
+ tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
+ stderr);
Map<String, String> env = new HashMap<String, String>();
- StringBuffer ldLibraryPath = new StringBuffer();
- ldLibraryPath.append(workDir.toString());
- String oldLdLibraryPath = null;
- oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
- if (oldLdLibraryPath != null) {
- ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
- ldLibraryPath.append(oldLdLibraryPath);
- }
- env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-
- // add the env variables passed by the user
- String mapredChildEnv = conf.get("mapred.child.env");
- if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
- String childEnvs[] = mapredChildEnv.split(",");
- for (String cEnv : childEnvs) {
- try {
- String[] parts = cEnv.split("="); // split on '='
- String value = env.get(parts[0]);
- if (value != null) {
- // replace $env with the child's env constructed by tt's
- // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // this key is not configured by the tt for the child .. get it
- // from the tt's env
- // example PATH=$PATH:/tmp
- value = System.getenv(parts[0]);
- if (value != null) {
- // the env key is present in the tt's env
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // the env key is note present anywhere .. simply set it
- // example X=$X:/tmp or X=/tmp
- value = parts[1].replace("$" + parts[0], "");
- }
- }
- env.put(parts[0], value);
- } catch (Throwable t) {
- // set the error msg
- errorInfo = "Invalid User environment settings : " + mapredChildEnv
- + ". Failed to parse user-passed environment param."
- + " Expecting : env1=value1,env2=value2...";
- LOG.warn(errorInfo);
- throw t;
- }
- }
- }
+ errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
@@ -355,7 +188,7 @@
LOG.fatal(t.getTaskID()+" reporting FSError", ie);
}
} catch (Throwable throwable) {
- LOG.warn(t.getTaskID() + errorInfo, throwable);
+ LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
Throwable causeThrowable = new Throwable(errorInfo, throwable);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
causeThrowable.printStackTrace(new PrintStream(baos));
@@ -385,15 +218,327 @@
}
}
+ /**
+ * Prepare the log files for the task
+ *
+ * @param taskid
+ * @return an array of files. The first file is stdout, the second is stderr.
+ */
+ static File[] prepareLogFiles(TaskAttemptID taskid) {
+ File[] logFiles = new File[2];
+ logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+ logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+ File logDir = logFiles[0].getParentFile();
+ boolean b = logDir.mkdirs();
+ if (!b) {
+ LOG.warn("mkdirs failed. Ignoring");
+ } else {
+ PermissionsHandler.setPermissions(logDir,
+ PermissionsHandler.sevenZeroZero);
+ }
+ return logFiles;
+ }
+
+ /**
+ * Write the child's configuration to the disk and set it in configuration so
+ * that the child can pick it up from there.
+ *
+ * @param lDirAlloc
+ * @throws IOException
+ */
+ void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc)
+ throws IOException {
+
+ Path localTaskFile =
+ lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
+ .getJobID().toString(), t.getTaskID().toString(), t
+ .isTaskCleanupTask()), conf);
+
+ // write the child's task configuration file to the local disk
+ writeLocalTaskFile(localTaskFile.toString(), conf);
+
+ // Set the final job file in the task. The child needs to know the correct
+ // path to job.xml. So set this path accordingly.
+ t.setJobFile(localTaskFile.toString());
+ }
+
+ /**
+ * @return
+ */
+ private List<String> getVMSetupCmd() {
+ String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+ List<String> setup = null;
+ if (ulimitCmd != null) {
+ setup = new ArrayList<String>();
+ for (String arg : ulimitCmd) {
+ setup.add(arg);
+ }
+ }
+ return setup;
+ }
+
+ /**
+ * @param taskid
+ * @param workDir
+ * @param classPaths
+ * @param logSize
+ * @return
+ * @throws IOException
+ */
+ private Vector<String> getVMArgs(TaskAttemptID taskid, File workDir,
+ List<String> classPaths, long logSize)
+ throws IOException {
+ Vector<String> vargs = new Vector<String>(8);
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+
+ vargs.add(jvm.toString());
+
+ // Add child (task) java-vm options.
+ //
+ // The following symbols if present in mapred.child.java.opts value are
+ // replaced:
+ // + @taskid@ is interpolated with value of TaskID.
+ // Other occurrences of @ will not be altered.
+ //
+ // Example with multiple arguments and substitutions, showing
+ // jvm GC logging, and start of a passwordless JVM JMX agent so can
+ // connect with jconsole and the likes to watch child memory, threads
+ // and get thread dumps.
+ //
+ // <property>
+ // <name>mapred.child.java.opts</name>
+ // <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
+ // -Dcom.sun.management.jmxremote.authenticate=false \
+ // -Dcom.sun.management.jmxremote.ssl=false \
+ // </value>
+ // </property>
+ //
+ String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
+ javaOpts = javaOpts.replace("@taskid@", taskid.toString());
+ String [] javaOptsSplit = javaOpts.split(" ");
+
+ // Add java.library.path; necessary for loading native libraries.
+ //
+ // 1. To support native-hadoop library i.e. libhadoop.so, we add the
+ // parent processes' java.library.path to the child.
+ // 2. We also add the 'cwd' of the task to it's java.library.path to help
+ // users distribute native libraries via the DistributedCache.
+ // 3. The user can also specify extra paths to be added to the
+ // java.library.path via mapred.child.java.opts.
+ //
+ String libraryPath = System.getProperty("java.library.path");
+ if (libraryPath == null) {
+ libraryPath = workDir.getAbsolutePath();
+ } else {
+ libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
+ }
+ boolean hasUserLDPath = false;
+ for(int i=0; i<javaOptsSplit.length ;i++) {
+ if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
+ javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
+ hasUserLDPath = true;
+ break;
+ }
+ }
+ if(!hasUserLDPath) {
+ vargs.add("-Djava.library.path=" + libraryPath);
+ }
+ for (int i = 0; i < javaOptsSplit.length; i++) {
+ vargs.add(javaOptsSplit[i]);
+ }
+
+ Path childTmpDir = createChildTmpDir(workDir, conf);
+ vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+ // Add classpath.
+ vargs.add("-classpath");
+ String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+ vargs.add(classPath);
+
+ // Setup the log4j prop
+ vargs.add("-Dhadoop.log.dir=" +
+ new File(System.getProperty("hadoop.log.dir")
+ ).getAbsolutePath());
+ vargs.add("-Dhadoop.root.logger=INFO,TLA");
+ vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+ vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+
+ if (conf.getProfileEnabled()) {
+ if (conf.getProfileTaskRange(t.isMapTask()
+ ).isIncluded(t.getPartition())) {
+ File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+ vargs.add(String.format(conf.getProfileParams(), prof.toString()));
+ }
+ }
+
+ // Add main class and its arguments
+ vargs.add(Child.class.getName()); // main of Child
+ // pass umbilical address
+ InetSocketAddress address = tracker.getTaskTrackerReportAddress();
+ vargs.add(address.getAddress().getHostAddress());
+ vargs.add(Integer.toString(address.getPort()));
+ vargs.add(taskid.toString()); // pass task identifier
+ return vargs;
+ }
+
+ /**
+ * @param taskid
+ * @param workDir
+ * @return
+ * @throws IOException
+ */
+ static Path createChildTmpDir(File workDir,
+ JobConf conf)
+ throws IOException {
+
+ // add java.io.tmpdir given by mapred.child.tmp
+ String tmp = conf.get("mapred.child.tmp", "./tmp");
+ Path tmpDir = new Path(tmp);
+
+ // if temp directory path is not absolute, prepend it with workDir.
+ if (!tmpDir.isAbsolute()) {
+ tmpDir = new Path(workDir.toString(), tmp);
+
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
+ throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+ }
+ }
+ return tmpDir;
+ }
+
+ /**
+ */
+ private static List<String> getClassPaths(JobConf conf, File workDir,
+ URI[] archives, URI[] files)
+ throws IOException {
+ // Accumulates class paths for child.
+ List<String> classPaths = new ArrayList<String>();
+ // start with same classpath as parent process
+ appendSystemClasspaths(classPaths);
+
+ // include the user specified classpath
+ appendJobJarClasspaths(conf.getJar(), classPaths);
+
+ // Distributed cache paths
+ appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+
+ // Include the working dir too
+ classPaths.add(workDir.toString());
+ return classPaths;
+ }
+
+ /**
+ * @param errorInfo
+ * @param workDir
+ * @param env
+ * @return
+ * @throws Throwable
+ */
+ private static String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
+ Map<String, String> env)
+ throws Throwable {
+ StringBuffer ldLibraryPath = new StringBuffer();
+ ldLibraryPath.append(workDir.toString());
+ String oldLdLibraryPath = null;
+ oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
+ if (oldLdLibraryPath != null) {
+ ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+ ldLibraryPath.append(oldLdLibraryPath);
+ }
+ env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+
+ // add the env variables passed by the user
+ String mapredChildEnv = conf.get("mapred.child.env");
+ if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+ String childEnvs[] = mapredChildEnv.split(",");
+ for (String cEnv : childEnvs) {
+ try {
+ String[] parts = cEnv.split("="); // split on '='
+ String value = env.get(parts[0]);
+ if (value != null) {
+ // replace $env with the child's env constructed by tt's
+ // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // this key is not configured by the tt for the child .. get it
+ // from the tt's env
+ // example PATH=$PATH:/tmp
+ value = System.getenv(parts[0]);
+ if (value != null) {
+ // the env key is present in the tt's env
+ value = parts[1].replace("$" + parts[0], value);
+ } else {
+ // the env key is note present anywhere .. simply set it
+ // example X=$X:/tmp or X=/tmp
+ value = parts[1].replace("$" + parts[0], "");
+ }
+ }
+ env.put(parts[0], value);
+ } catch (Throwable t) {
+ // set the error msg
+ errorInfo = "Invalid User environment settings : " + mapredChildEnv
+ + ". Failed to parse user-passed environment param."
+ + " Expecting : env1=value1,env2=value2...";
+ LOG.warn(errorInfo);
+ throw t;
+ }
+ }
+ }
+ return errorInfo;
+ }
+
+ /**
+ * Write the task specific job-configuration file.
+ *
+ * @param localFs
+ * @throws IOException
+ */
+ private static void writeLocalTaskFile(String jobFile, JobConf conf)
+ throws IOException {
+ Path localTaskFile = new Path(jobFile);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ localFs.delete(localTaskFile, true);
+ OutputStream out = localFs.create(localTaskFile);
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+ * Whenever it uses LocalDirAllocator from now on inside the child, it will
+ * only see files inside the attempt-directory. This is done in the Child's
+ * process space.
+ */
+ static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+ String[] localDirs = conf.getStrings("mapred.local.dir");
+ String jobId = t.getJobID().toString();
+ String taskId = t.getTaskID().toString();
+ boolean isCleanup = t.isTaskCleanupTask();
+ StringBuffer childMapredLocalDir =
+ new StringBuffer(localDirs[0] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+ for (int i = 1; i < localDirs.length; i++) {
+ childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+ + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+ }
+ LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
+ conf.set("mapred.local.dir", childMapredLocalDir.toString());
+ }
+
/** Creates the working directory pathname for a task attempt. */
static File formWorkDir(LocalDirAllocator lDirAlloc,
TaskAttemptID task, boolean isCleanup, JobConf conf)
throws IOException {
- File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.toString(), isCleanup)
- + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
- return workDir;
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+ .getJobID().toString(), task.toString(), isCleanup), conf);
+
+ return new File(workDir.toString());
}
private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
@@ -412,7 +557,7 @@
fileStatus = fileSystem.getFileStatus(
new Path(archives[i].getPath()));
String cacheId = DistributedCache.makeRelative(archives[i],conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -438,7 +583,7 @@
fileStatus = fileSystem.getFileStatus(
new Path(files[i].getPath()));
String cacheId = DistributedCache.makeRelative(files[i], conf);
- String cachePath = TaskTracker.getCacheSubdir() +
+ String cachePath = TaskTracker.getDistributedCacheDir() +
Path.SEPARATOR + cacheId;
localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -455,20 +600,12 @@
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
- Path localTaskFile = new Path(t.getJobFile());
- FileSystem localFs = FileSystem.getLocal(conf);
- localFs.delete(localTaskFile, true);
- OutputStream out = localFs.create(localTaskFile);
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
}
}
- private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives,
- URI[] files, List<String> classPaths) throws IOException {
+ private static void appendDistributedCacheClasspaths(JobConf conf,
+ URI[] archives, URI[] files, List<String> classPaths)
+ throws IOException {
// Archive paths
Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
if (archiveClasspaths != null && archives != null) {
@@ -503,8 +640,9 @@
}
}
- private void appendSystemClasspaths(List<String> classPaths) {
- for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+ private static void appendSystemClasspaths(List<String> classPaths) {
+ for (String c : System.getProperty("java.class.path").split(
+ SYSTEM_PATH_SEPARATOR)) {
classPaths.add(c);
}
}
@@ -586,19 +724,8 @@
// Do not exit even if symlinks have not been created.
LOG.warn(StringUtils.stringifyException(ie));
}
- // add java.io.tmpdir given by mapred.child.tmp
- String tmp = conf.get("mapred.child.tmp", "./tmp");
- Path tmpDir = new Path(tmp);
- // if temp directory path is not absolute
- // prepend it with workDir.
- if (!tmpDir.isAbsolute()) {
- tmpDir = new Path(workDir.toString(), tmp);
- FileSystem localFs = FileSystem.getLocal(conf);
- if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
+ createChildTmpDir(workDir, conf);
}
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=803583&r1=803582&r2=803583&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Aug 12 16:17:47 2009
@@ -65,6 +65,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
@@ -197,10 +198,16 @@
//for serving map output to the other nodes
static Random r = new Random();
- private static final String SUBDIR = "taskTracker";
- private static final String CACHEDIR = "archive";
- private static final String JOBCACHE = "jobcache";
- private static final String OUTPUT = "output";
+ static final String SUBDIR = "taskTracker";
+ private static final String DISTCACHEDIR = "distcache";
+ static final String JOBCACHE = "jobcache";
+ static final String OUTPUT = "output";
+ private static final String JARSDIR = "jars";
+ static final String LOCAL_SPLIT_FILE = "split.dta";
+ static final String JOBFILE = "job.xml";
+
+ static final String JOB_LOCAL_DIR = "job.local.dir";
+
private JobConf fConf;
private FileSystem localFs;
private int maxMapSlots;
@@ -388,25 +395,52 @@
}
}
- static String getCacheSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+ static String getDistributedCacheDir() {
+ return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
}
static String getJobCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
-
+
static String getLocalJobDir(String jobid) {
- return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
- static String getLocalTaskDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid, false) ;
+ static String getLocalJobConfFile(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+ }
+
+ static String getTaskConfFile(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
+ + TaskTracker.JOBFILE;
+ }
+
+ static String getJobJarsDir(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+ }
+
+ static String getJobJarFile(String jobid) {
+ return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+ }
+
+ static String getJobWorkDir(String jobid) {
+ return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
+ static String getLocalSplitFile(String jobid, String taskid) {
+ return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.LOCAL_SPLIT_FILE;
}
static String getIntermediateOutputDir(String jobid, String taskid) {
- return getLocalTaskDir(jobid, taskid)
- + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+ + TaskTracker.OUTPUT;
+ }
+
+ static String getLocalTaskDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid, false);
}
static String getLocalTaskDir(String jobid,
@@ -418,7 +452,17 @@
}
return taskDir;
}
-
+
+ static String getTaskWorkDir(String jobid, String taskid,
+ boolean isCleanupAttempt) {
+ String dir =
+ getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ dir = dir + TASK_CLEANUP_SUFFIX;
+ }
+ return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+ }
+
String getPid(TaskAttemptID tid) {
TaskInProgress tip = tasks.get(tid);
if (tip != null) {
@@ -762,10 +806,233 @@
// intialize the job directory
private void localizeJob(TaskInProgress tip) throws IOException {
- Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
- Path jobFile = new Path(t.getJobFile());
+ RunningJob rjob = addTaskToJob(jobId, tip);
+
+ synchronized (rjob) {
+ if (!rjob.localized) {
+
+ JobConf localJobConf = localizeJobFiles(t);
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir. Note that initializeJob
+ // should be the last call after every other directory/file to be
+ // directly under the job directory is created.
+ JobInitializationContext context = new JobInitializationContext();
+ context.jobid = jobId;
+ context.user = localJobConf.getUser();
+ context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
+ taskController.initializeJob(context);
+
+ rjob.jobConf = localJobConf;
+ rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+ localJobConf.getKeepFailedTaskFiles());
+ rjob.localized = true;
+ }
+ }
+ launchTaskForJob(tip, new JobConf(rjob.jobConf));
+ }
+
+ /**
+ * Localize the job on this tasktracker. Specifically
+ * <ul>
+ * <li>Cleanup and create job directories on all disks</li>
+ * <li>Download the job config file job.xml from the FS</li>
+ * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
+ * in the configuration.
+ * <li>Download the job jar file job.jar from the FS, unjar it and set jar
+ * file in the configuration.</li>
+ * </ul>
+ *
+ * @param t task whose job has to be localized on this TT
+ * @return the modified job configuration to be used for all the tasks of this
+ * job as a starting point.
+ * @throws IOException
+ */
+ JobConf localizeJobFiles(Task t)
+ throws IOException {
+ JobID jobId = t.getJobID();
+
+ // Initialize the job directories first
+ FileSystem localFs = FileSystem.getLocal(fConf);
+ initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+
+ // Download the job.xml for this job from the system FS
+ Path localJobFile = localizeJobConfFile(new Path(t.getJobFile()), jobId);
+
+ JobConf localJobConf = new JobConf(localJobFile);
+
+ // create the 'job-work' directory: job-specific shared directory for use as
+ // scratch space by all tasks of the same job running on this TaskTracker.
+ Path workDir =
+ lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
+ fConf);
+ if (!localFs.mkdirs(workDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
+ }
+ System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
+ localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
+
+ // Download the job.jar for this job from the system FS
+ localizeJobJarFile(jobId, localFs, localJobConf);
+ return localJobConf;
+ }
+
+ static class PermissionsHandler {
+ /**
+ * Permission information useful for setting permissions for a given path.
+ * Using this, one can set all possible combinations of permissions for the
+ * owner of the file. But permissions for the group and all others can only
+ * be set together, i.e. permissions for group cannot be set different from
+ * those for others and vice versa.
+ */
+ static class PermissionsInfo {
+ public boolean readPermissions;
+ public boolean writePermissions;
+ public boolean executablePermissions;
+ public boolean readPermsOwnerOnly;
+ public boolean writePermsOwnerOnly;
+ public boolean executePermsOwnerOnly;
+
+ /**
+ * Create a permissions-info object with the given attributes
+ *
+ * @param readPerms
+ * @param writePerms
+ * @param executePerms
+ * @param readOwnerOnly
+ * @param writeOwnerOnly
+ * @param executeOwnerOnly
+ */
+ public PermissionsInfo(boolean readPerms, boolean writePerms,
+ boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
+ boolean executeOwnerOnly) {
+ readPermissions = readPerms;
+ writePermissions = writePerms;
+ executablePermissions = executePerms;
+ readPermsOwnerOnly = readOwnerOnly;
+ writePermsOwnerOnly = writeOwnerOnly;
+ executePermsOwnerOnly = executeOwnerOnly;
+ }
+ }
+
+ /**
+ * Set permission on the given file path using the specified permissions
+ * information. We use java api to set permission instead of spawning chmod
+ * processes. This saves a lot of time. Using this, one can set all possible
+ * combinations of permissions for the owner of the file. But permissions
+ * for the group and all others can only be set together, i.e. permissions
+ * for group cannot be set different from those for others and vice versa.
+ *
+ * This method should satisfy the needs of most of the applications. For
+ * those it doesn't, {@link FileUtil#chmod} can be used.
+ *
+ * @param f file path
+ * @param pInfo permissions information
+ * @return true if success, false otherwise
+ */
+ static boolean setPermissions(File f, PermissionsInfo pInfo) {
+ if (pInfo == null) {
+ LOG.debug(" PermissionsInfo is null, returning.");
+ return true;
+ }
+
+ LOG.debug("Setting permission for " + f.getAbsolutePath());
+
+ boolean ret = true;
+
+ // Clear all the flags
+ ret = f.setReadable(false, false) && ret;
+ ret = f.setWritable(false, false) && ret;
+ ret = f.setExecutable(false, false) && ret;
+
+ ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
+ LOG.debug("Readable status for " + f + " set to " + ret);
+ ret =
+ f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
+ && ret;
+ LOG.debug("Writable status for " + f + " set to " + ret);
+ ret =
+ f.setExecutable(pInfo.executablePermissions,
+ pInfo.executePermsOwnerOnly)
+ && ret;
+
+ LOG.debug("Executable status for " + f + " set to " + ret);
+ return ret;
+ }
+
+ /**
+ * Permissions rwxr_xr_x
+ */
+ static PermissionsInfo sevenFiveFive =
+ new PermissionsInfo(true, true, true, false, true, false);
+ /**
+ * Completely private permissions
+ */
+ static PermissionsInfo sevenZeroZero =
+ new PermissionsInfo(true, true, true, true, true, true);
+ }
+
+ /**
+ * Prepare the job directories for a given job. To be called by the job
+ * localization code, only if the job is not already localized.
+ *
+ * <br>
+ * Here, we set 700 permissions on the job directories created on all disks.
+ * This we do so as to avoid any misuse by other users till the time
+ * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+ * later time to set proper private permissions on the job directories. <br>
+ *
+ * @param jobId
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private static void initializeJobDirs(JobID jobId, FileSystem fs,
+ String[] localDirs)
+ throws IOException {
+ boolean initJobDirStatus = false;
+ String jobDirPath = getLocalJobDir(jobId.toString());
+ for (String localDir : localDirs) {
+ Path jobDir = new Path(localDir, jobDirPath);
+ if (fs.exists(jobDir)) {
+ // this will happen on a partial execution of localizeJob. Sometimes
+ // copying job.xml to the local disk succeeds but copying job.jar might
+ // throw out an exception. We should clean up and then try again.
+ fs.delete(jobDir, true);
+ }
+
+ boolean jobDirStatus = fs.mkdirs(jobDir);
+ if (!jobDirStatus) {
+ LOG.warn("Not able to create job directory " + jobDir.toString());
+ }
+
+ initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+ // job-dir has to be private to the TT
+ PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
+ PermissionsHandler.sevenZeroZero);
+ }
+
+ if (!initJobDirStatus) {
+ throw new IOException("Not able to initialize job directories "
+ + "in any of the configured local directories for job "
+ + jobId.toString());
+ }
+ }
+
+ /**
+ * Download the job configuration file from the FS.
+ *
+ * @param t Task whose job file has to be downloaded
+ * @param jobId jobid of the task
+ * @return the local file system path of the downloaded file.
+ * @throws IOException
+ */
+ private Path localizeJobConfFile(Path jobFile, JobID jobId)
+ throws IOException {
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.
FileStatus status = null;
@@ -776,82 +1043,95 @@
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
- Path localJobFile = lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "job.xml",
- jobFileSize, fConf);
- RunningJob rjob = addTaskToJob(jobId, tip);
- synchronized (rjob) {
- if (!rjob.localized) {
-
- FileSystem localFs = FileSystem.getLocal(fConf);
- // this will happen on a partial execution of localizeJob.
- // Sometimes the job.xml gets copied but copying job.jar
- // might throw out an exception
- // we should clean up and then try again
- Path jobDir = localJobFile.getParent();
- if (localFs.exists(jobDir)){
- localFs.delete(jobDir, true);
- boolean b = localFs.mkdirs(jobDir);
- if (!b)
- throw new IOException("Not able to create job directory "
- + jobDir.toString());
- }
- systemFS.copyToLocalFile(jobFile, localJobFile);
- JobConf localJobConf = new JobConf(localJobFile);
-
- // create the 'work' directory
- // job-specific shared directory for use as scratch space
- Path workDir = lDirAlloc.getLocalPathForWrite(
- (getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + MRConstants.WORKDIR), fConf);
- if (!localFs.mkdirs(workDir)) {
- throw new IOException("Mkdirs failed to create "
- + workDir.toString());
- }
- System.setProperty("job.local.dir", workDir.toString());
- localJobConf.set("job.local.dir", workDir.toString());
-
- // copy Jar file to the local FS and unjar it.
- String jarFile = localJobConf.getJar();
- long jarFileSize = -1;
- if (jarFile != null) {
- Path jarFilePath = new Path(jarFile);
- try {
- status = systemFS.getFileStatus(jarFilePath);
- jarFileSize = status.getLen();
- } catch(FileNotFoundException fe) {
- jarFileSize = -1;
- }
- // Here we check for and we check five times the size of jarFileSize
- // to accommodate for unjarring the jar file in work directory
- localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getLocalJobDir(jobId.toString())
- + Path.SEPARATOR + "jars",
- 5 * jarFileSize, fConf), "job.jar");
- if (!localFs.mkdirs(localJarFile.getParent())) {
- throw new IOException("Mkdirs failed to create jars directory ");
- }
- systemFS.copyToLocalFile(jarFilePath, localJarFile);
- localJobConf.setJar(localJarFile.toString());
- OutputStream out = localFs.create(localJobFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
- // also unjar the job.jar files
- RunJar.unJar(new File(localJarFile.toString()),
- new File(localJarFile.getParent().toString()));
- }
- rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
- localJobConf.getKeepFailedTaskFiles());
- rjob.localized = true;
- rjob.jobConf = localJobConf;
- taskController.initializeJob(jobId);
+
+ Path localJobFile =
+ lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+ jobFileSize, fConf);
+
+ // Download job.xml
+ systemFS.copyToLocalFile(jobFile, localJobFile);
+ return localJobFile;
+ }
+
+ /**
+ * Download the job jar file from FS to the local file system and unjar it.
+ * Set the local jar file in the passed configuration.
+ *
+ * @param jobId
+ * @param localFs
+ * @param localJobConf
+ * @throws IOException
+ */
+ private void localizeJobJarFile(JobID jobId, FileSystem localFs,
+ JobConf localJobConf)
+ throws IOException {
+ // copy Jar file to the local FS and unjar it.
+ String jarFile = localJobConf.getJar();
+ FileStatus status = null;
+ long jarFileSize = -1;
+ if (jarFile != null) {
+ Path jarFilePath = new Path(jarFile);
+ try {
+ status = systemFS.getFileStatus(jarFilePath);
+ jarFileSize = status.getLen();
+ } catch (FileNotFoundException fe) {
+ jarFileSize = -1;
+ }
+ // Here we check for and we check five times the size of jarFileSize
+ // to accommodate for unjarring the jar file in userfiles directory
+ Path localJarFile =
+ lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
+ 5 * jarFileSize, fConf);
+
+ // Download job.jar
+ systemFS.copyToLocalFile(jarFilePath, localJarFile);
+
+ localJobConf.setJar(localJarFile.toString());
+
+ // Also un-jar the job.jar files. We un-jar it so that classes inside
+ // sub-directories, for e.g., lib/, classes/ are available on class-path
+ RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile
+ .getParent().toString()));
+ }
+ }
+
+ /**
+ * Create taskDirs on all the disks. Otherwise, in some cases, like when
+ * LinuxTaskController is in use, child might wish to balance load across
+ * disks but cannot itself create attempt directory because of the fact that
+ * job directory is writable only by the TT.
+ *
+ * @param jobId
+ * @param attemptId
+ * @param isCleanupAttempt
+ * @param fs
+ * @param localDirs
+ * @throws IOException
+ */
+ private static void initializeAttemptDirs(String jobId, String attemptId,
+ boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
+ throws IOException {
+
+ boolean initStatus = false;
+ String attemptDirPath =
+ getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
+
+ for (String localDir : localDirs) {
+ Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+ boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+ if (!attemptDirStatus) {
+ LOG.warn("localAttemptDir " + localAttemptDir.toString()
+ + " couldn't be created.");
}
+ initStatus = initStatus || attemptDirStatus;
+ }
+
+ if (!initStatus) {
+ throw new IOException("Not able to initialize attempt directories "
+ + "in any of the configured local directories for the attempt "
+ + attemptId.toString());
}
- launchTaskForJob(tip, new JobConf(rjob.jobConf));
}
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
@@ -890,7 +1170,7 @@
for (TaskInProgress tip : tasksToClose.values()) {
tip.jobHasFinished(false);
}
-
+
this.running = false;
// Clear local storage
@@ -929,6 +1209,17 @@
}
/**
+ * For testing
+ */
+ TaskTracker() {
+ server = null;
+ }
+
+ void setConf(JobConf conf) {
+ fConf = conf;
+ }
+
+ /**
* Start with the local machine name, and the default JobTracker
*/
public TaskTracker(JobConf conf) throws IOException {
@@ -1568,10 +1859,9 @@
}
MapOutputFile mapOutputFile = new MapOutputFile();
- mapOutputFile.setJobId(taskId.getJobID());
mapOutputFile.setConf(conf);
- Path tmp_output = mapOutputFile.getOutputFile(taskId);
+ Path tmp_output = mapOutputFile.getOutputFile();
if(tmp_output == null)
return 0;
FileSystem localFS = FileSystem.getLocal(conf);
@@ -1847,54 +2137,36 @@
taskTimeout = (10 * 60 * 1000);
}
- private void localizeTask(Task task) throws IOException{
+ void localizeTask(Task task) throws IOException{
- Path localTaskDir =
- lDirAlloc.getLocalPathForWrite(
- TaskTracker.getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask()),
- defaultJobConf );
-
FileSystem localFs = FileSystem.getLocal(fConf);
- if (!localFs.mkdirs(localTaskDir)) {
- throw new IOException("Mkdirs failed to create "
- + localTaskDir.toString());
- }
- // create symlink for ../work if it already doesnt exist
- String workDir = lDirAlloc.getLocalPathToRead(
- TaskTracker.getLocalJobDir(task.getJobID().toString())
- + Path.SEPARATOR
- + "work", defaultJobConf).toString();
- String link = localTaskDir.getParent().toString()
- + Path.SEPARATOR + "work";
- File flink = new File(link);
- if (!flink.exists())
- FileUtil.symLink(workDir, link);
-
+ // create taskDirs on all the disks.
+ initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask(), localFs, fConf
+ .getStrings("mapred.local.dir"));
+
// create the working-directory of the task
- Path cwd = lDirAlloc.getLocalPathForWrite(
- getLocalTaskDir(task.getJobID().toString(),
- task.getTaskID().toString(), task.isTaskCleanupTask())
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ Path cwd =
+ lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
+ .toString(), task.getTaskID().toString(), task
+ .isTaskCleanupTask()), defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
}
- Path localTaskFile = new Path(localTaskDir, "job.xml");
- task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
+
if (fConf.get("slave.host.name") != null) {
localJobConf.set("slave.host.name",
fConf.get("slave.host.name"));
}
- localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+ // Do the task-type specific localization
task.localizeConfiguration(localJobConf);
List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
@@ -1927,12 +2199,6 @@
//disable jvm reuse
localJobConf.setNumTasksToExecutePerJvm(1);
}
- OutputStream out = localFs.create(localTaskFile);
- try {
- localJobConf.writeXml(out);
- } finally {
- out.close();
- }
task.setConf(localJobConf);
}
@@ -2188,7 +2454,7 @@
localJobConf). toString());
} catch (IOException e) {
LOG.warn("Working Directory of the task " + task.getTaskID() +
- "doesnt exist. Caught exception " +
+ " doesnt exist. Caught exception " +
StringUtils.stringifyException(e));
}
// Build the command
@@ -2463,34 +2729,39 @@
if (localJobConf == null) {
return;
}
- String taskDir = getLocalTaskDir(task.getJobID().toString(),
- taskId.toString(), task.isTaskCleanupTask());
+ String localTaskDir =
+ getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
+ String taskWorkDir =
+ getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
+ task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
//and reduce inputs get stored)
runner.close();
}
- //We don't delete the workdir
- //since some other task (running in the same JVM)
- //might be using the dir. The JVM running the tasks would clean
- //the workdir per a task in the task process itself.
+
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+ // No jvm reuse, remove everything
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir));
+ localTaskDir));
}
-
else {
- directoryCleanupThread.addToQueue(localFs,
- getLocalFiles(defaultJobConf,
- taskDir+"/job.xml"));
+ // Jvm reuse. We don't delete the workdir since some other task
+ // (running in the same JVM) might be using the dir. The JVM
+ // running the tasks would clean the workdir per a task in the
+ // task process itself.
+ directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+ defaultJobConf, localTaskDir + Path.SEPARATOR
+ + TaskTracker.JOBFILE));
}
} else {
if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
directoryCleanupThread.addToQueue(localFs,
getLocalFiles(defaultJobConf,
- taskDir+"/work"));
+ taskWorkDir));
}
}
} catch (Throwable ie) {