You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/20 12:19:39 UTC
svn commit: r639247 [3/3] - in /hadoop/core/trunk: ./ docs/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/docs/src/documentation/content/xdocs/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu Mar 20 04:19:34 2008
@@ -137,11 +137,10 @@
String[] argvSplit = splitArgs(argv);
String prog = argvSplit[0];
File currentDir = new File(".").getAbsoluteFile();
- File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
if (new File(prog).isAbsolute()) {
// we don't own it. Hope it is executable
} else {
- FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
+ FileUtil.chmod(new File(currentDir, prog).toString(), "a+x");
}
//
@@ -153,7 +152,7 @@
//
if (!new File(argvSplit[0]).isAbsolute()) {
PathFinder finder = new PathFinder("PATH");
- finder.prependPathComponent(jobCacheDir.toString());
+ finder.prependPathComponent(currentDir.toString());
File f = finder.getAbsolutePath(argvSplit[0]);
if (f != null) {
argvSplit[0] = f.getAbsolutePath();
Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Thu Mar 20 04:19:34 2008
@@ -1062,6 +1062,33 @@
<code></property></code>
</p>
+ <p>When the job starts, the localized job directory
+ <code> ${mapred.local.dir}/taskTracker/jobcache/$jobid/</code>
+ has the following directories: </p>
+ <ul>
+ <li> A job-specific shared directory, created at location
+ <code>${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+ This directory is exposed to the users through
+ <code>job.local.dir </code>. The tasks can use this space as scratch
+ space and share files among them. The directory can accessed through
+ api <a href="ext:api/org/apache/hadoop/mapred/jobconf/getjoblocaldir">
+ JobConf.getJobLocalDir()</a>. It is available as System property also.
+ So,users can call <code>System.getProperty("job.local.dir")</code>;
+ </li>
+ <li>A jars directory, which has the job jar file and expanded jar </li>
+ <li>A job.xml file, the generic job configuration </li>
+ <li>Each task has directory <code>task-id</code> which again has the
+ following structure
+ <ul>
+ <li>A job.xml file, task localized job configuration </li>
+ <li>A directory for intermediate output files</li>
+ <li>The working directory of the task.
+ And work directory has a temporary directory
+ to create temporary files</li>
+ </ul>
+ </li>
+ </ul>
+
<p>The <a href="#DistributedCache">DistributedCache</a> can also be used
as a rudimentary software distribution mechanism for use in the map
and/or reduce tasks. It can be used to distribute both jars and
Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml Thu Mar 20 04:19:34 2008
@@ -154,6 +154,7 @@
<setcompressmapoutput href="#setCompressMapOutput(boolean)" />
<setmapoutputcompressiontype href="#setMapOutputCompressionType(org.apache.hadoop.io.SequenceFile.CompressionType)" />
<setmapoutputcompressorclass href="#setMapOutputCompressorClass(java.lang.Class)" />
+ <getjoblocaldir href="#getJobLocalDir()" />
</jobconf>
<jobconfigurable href="JobConfigurable.html">
<configure href="#configure(org.apache.hadoop.mapred.JobConf)" />
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Thu Mar 20 04:19:34 2008
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -115,13 +116,14 @@
* @param conf the jobconf
* @throws IOException if something goes wrong writing
*/
- private static void fillInMissingMapOutputs(FileSystem fs,
+ private static void fillInMissingMapOutputs(FileSystem fs,
+ String jobId,
String taskId,
int numMaps,
JobConf conf) throws IOException {
Class keyClass = conf.getMapOutputKeyClass();
Class valueClass = conf.getMapOutputValueClass();
- MapOutputFile namer = new MapOutputFile();
+ MapOutputFile namer = new MapOutputFile(jobId);
namer.setConf(conf);
for(int i=0; i<numMaps; i++) {
Path f = namer.getInputFile(i, taskId);
@@ -156,8 +158,13 @@
// setup the local and user working directories
FileSystem local = FileSystem.getLocal(conf);
- File taskDir = new File(jobFilename.getParent());
- File workDirName = new File(taskDir.getParent(), "work");
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ File workDirName = new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getJobCacheSubdir()
+ + Path.SEPARATOR + jobId
+ + Path.SEPARATOR + taskId
+ + Path.SEPARATOR + "work",
+ conf). toString());
local.setWorkingDirectory(new Path(workDirName.toString()));
FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
@@ -179,7 +186,7 @@
taskId, partition, splitClass, split);
} else {
int numMaps = conf.getNumMapTasks();
- fillInMissingMapOutputs(local, taskId, numMaps, conf);
+ fillInMissingMapOutputs(local, jobId, taskId, numMaps, conf);
task = new ReduceTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), taskId,
partition, numMaps);
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Mar 20 04:19:34 2008
@@ -1334,6 +1334,25 @@
public void setJobEndNotificationURI(String uri) {
set("job.end.notification.url", uri);
}
+
+ /**
+ * Get job-specific shared directory for use as scratch space
+ *
+ * <p>
+ * When a job starts, a shared directory is created at location
+ * <code>
+ * ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
+ * This directory is exposed to the users through
+ * <code>job.local.dir </code>.
+ * So, the tasks can use this space
+ * as scratch space and share files among them. </p>
+ * This value is available as System property also.
+ *
+ * @return The localized job specific shared directory
+ */
+ public String getJobLocalDir() {
+ return get("job.local.dir");
+ }
/**
* Find a jar that contains a class of the same name, if any.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Mar 20 04:19:34 2008
@@ -78,7 +78,7 @@
public Job(String jobid, JobConf conf) throws IOException {
this.file = new Path(conf.getSystemDir(), jobid + "/job.xml");
this.id = jobid;
- this.mapoutputFile = new MapOutputFile();
+ this.mapoutputFile = new MapOutputFile(jobid);
this.mapoutputFile.setConf(conf);
this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Thu Mar 20 04:19:34 2008
@@ -30,6 +30,15 @@
class MapOutputFile {
private JobConf conf;
+ private String jobDir;
+
+ MapOutputFile() {
+ }
+
+ MapOutputFile(String jobId) {
+ this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+ }
+
private LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
@@ -38,7 +47,9 @@
*/
public Path getOutputFile(String mapTaskId)
throws IOException {
- return lDirAlloc.getLocalPathToRead(mapTaskId+"/file.out", conf);
+ return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/file.out", conf);
}
/** Create a local map output file name.
@@ -47,7 +58,9 @@
*/
public Path getOutputFileForWrite(String mapTaskId, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(mapTaskId+"/file.out", size, conf);
+ return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/file.out", size, conf);
}
/** Return the path to a local map output index file created earlier
@@ -55,7 +68,9 @@
*/
public Path getOutputIndexFile(String mapTaskId)
throws IOException {
- return lDirAlloc.getLocalPathToRead(mapTaskId + "/file.out.index", conf);
+ return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/file.out.index", conf);
}
/** Create a local map output index file name.
@@ -64,7 +79,9 @@
*/
public Path getOutputIndexFileForWrite(String mapTaskId, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(mapTaskId + "/file.out.index",
+ return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/file.out.index",
size, conf);
}
@@ -74,8 +91,10 @@
*/
public Path getSpillFile(String mapTaskId, int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(mapTaskId+"/spill" +spillNumber+".out",
- conf);
+ return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/spill"
+ + spillNumber + ".out", conf);
}
/** Create a local map spill file name.
@@ -85,9 +104,10 @@
*/
public Path getSpillFileForWrite(String mapTaskId, int spillNumber,
long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(mapTaskId+
- "/spill" +spillNumber+".out",
- size, conf);
+ return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/spill" +
+ spillNumber + ".out", size, conf);
}
/** Return a local map spill index file created earlier
@@ -96,8 +116,10 @@
*/
public Path getSpillIndexFile(String mapTaskId, int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(
- mapTaskId+"/spill" +spillNumber+".out.index", conf);
+ return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/spill" +
+ spillNumber + ".out.index", conf);
}
/** Create a local map spill index file name.
@@ -107,8 +129,10 @@
*/
public Path getSpillIndexFileForWrite(String mapTaskId, int spillNumber,
long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(
- mapTaskId+"/spill" +spillNumber+".out.index", size, conf);
+ return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+ mapTaskId + Path.SEPARATOR +
+ "output" + "/spill" + spillNumber +
+ ".out.index", size, conf);
}
/** Return a local reduce input file created earlier
@@ -118,7 +142,9 @@
public Path getInputFile(int mapId, String reduceTaskId)
throws IOException {
// TODO *oom* should use a format here
- return lDirAlloc.getLocalPathToRead(reduceTaskId + "/map_"+mapId+".out",
+ return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
+ reduceTaskId + Path.SEPARATOR +
+ "output" + "/map_" + mapId + ".out",
conf);
}
@@ -130,21 +156,16 @@
public Path getInputFileForWrite(int mapId, String reduceTaskId, long size)
throws IOException {
// TODO *oom* should use a format here
- return lDirAlloc.getLocalPathForWrite(reduceTaskId + "/map_"+mapId+".out",
+ return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
+ reduceTaskId + Path.SEPARATOR +
+ "output" + "/map_" + mapId + ".out",
size, conf);
}
/** Removes all of the files related to a task. */
public void removeAll(String taskId) throws IOException {
- conf.deleteLocalFiles(taskId);
- }
-
- /**
- * Removes all contents of temporary storage. Called upon
- * startup, to remove any leftovers from previous run.
- */
- public void cleanupStorage() throws IOException {
- conf.deleteLocalFiles();
+ conf.deleteLocalFiles(jobDir + Path.SEPARATOR +
+ taskId + Path.SEPARATOR + "output");
}
public void setConf(Configuration conf) {
@@ -154,4 +175,9 @@
this.conf = new JobConf(conf);
}
}
+
+ public void setJobId(String jobId) {
+ this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+ }
+
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Thu Mar 20 04:19:34 2008
@@ -48,14 +48,16 @@
private int mapId;
private String host;
private int port;
+ private String jobId;
/** RPC constructor **/
public MapOutputLocation() {
}
/** Construct a location. */
- public MapOutputLocation(String mapTaskId, int mapId,
+ public MapOutputLocation(String jobId, String mapTaskId, int mapId,
String host, int port) {
+ this.jobId = jobId;
this.mapTaskId = mapTaskId;
this.mapId = mapId;
this.host = host;
@@ -80,22 +82,24 @@
public int getPort() { return port; }
public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, mapTaskId);
+ out.writeUTF(jobId);
+ out.writeUTF(mapTaskId);
out.writeInt(mapId);
- UTF8.writeString(out, host);
+ out.writeUTF(host);
out.writeInt(port);
}
public void readFields(DataInput in) throws IOException {
- this.mapTaskId = UTF8.readString(in);
+ this.jobId = in.readUTF();
+ this.mapTaskId = in.readUTF();
this.mapId = in.readInt();
- this.host = UTF8.readString(in);
+ this.host = in.readUTF();
this.port = in.readInt();
}
public String toString() {
- return "http://" + host + ":" + port + "/mapOutput?map=" +
- mapTaskId;
+ return "http://" + host + ":" + port + "/mapOutput?job=" + jobId +
+ "&map=" + mapTaskId;
}
/**
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Mar 20 04:19:34 2008
@@ -46,7 +46,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InMemoryFileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -807,7 +806,10 @@
// a temp filename. If this file gets created in ramfs, we're fine,
// else, we will check the localFS to find a suitable final location
// for this path
- Path filename = new Path("/" + reduceId + "/map_" +
+ Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
+ Path.SEPARATOR + getJobId() +
+ Path.SEPARATOR + reduceId +
+ Path.SEPARATOR + "output" + "/map_" +
loc.getMapId() + ".out");
// a working filename that will be unique to this attempt
Path tmpFilename = new Path(filename + "-" + id);
@@ -903,13 +905,7 @@
// add the jars and directories to the classpath
String jar = conf.getJar();
if (jar != null) {
- LocalDirAllocator lDirAlloc =
- new LocalDirAllocator("mapred.local.dir");
- File jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + getJobId()
- + Path.SEPARATOR
- + "work", conf).toString());
+ File jobCacheDir = new File(new Path(jar).getParent().toString());
File[] libs = new File(jobCacheDir, "lib").listFiles();
if (libs != null) {
@@ -1484,7 +1480,8 @@
maxFetchRetriesPerMap =
getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
}
- knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
+ knownOutputs.add(new MapOutputLocation(reduceTask.getJobId(),
+ taskId, mId, host, port));
}
break;
case FAILED:
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Mar 20 04:19:34 2008
@@ -114,6 +114,7 @@
TaskStatus.Phase.MAP :
TaskStatus.Phase.SHUFFLE,
counters);
+ this.mapOutputFile.setJobId(jobId);
}
////////////////////////////////////////////
@@ -186,6 +187,7 @@
taskOutputPath = null;
}
taskStatus.readFields(in);
+ this.mapOutputFile.setJobId(jobId);
}
public String toString() { return taskId; }
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Mar 20 04:19:34 2008
@@ -54,7 +54,7 @@
this.t = t;
this.tracker = tracker;
this.conf = conf;
- this.mapOutputFile = new MapOutputFile();
+ this.mapOutputFile = new MapOutputFile(t.getJobId());
this.mapOutputFile.setConf(conf);
}
@@ -91,19 +91,20 @@
//before preparing the job localize
//all the archives
- File workDir = new File(t.getJobFile()).getParentFile();
String taskid = t.getTaskId();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File jobCacheDir = null;
- try {
- jobCacheDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + t.getJobId()
- + Path.SEPARATOR
- + "work", conf).toString());
- } catch (IOException ioe) {
- LOG.warn("work directory doesnt exist");
+ if (conf.getJar() != null) {
+ jobCacheDir = new File(
+ new Path(conf.getJar()).getParent().toString());
}
+ File workDir = new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getJobCacheSubdir()
+ + Path.SEPARATOR + t.getJobId()
+ + Path.SEPARATOR + t.getTaskId()
+ + Path.SEPARATOR + "work",
+ conf). toString());
+
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
FileStatus fileStatus;
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Mar 20 04:19:34 2008
@@ -152,7 +152,6 @@
private static final String JOBCACHE = "jobcache";
private JobConf originalConf;
private JobConf fConf;
- private MapOutputFile mapOutputFile;
private int maxCurrentMapTasks;
private int maxCurrentReduceTasks;
private int failures;
@@ -448,7 +447,7 @@
// Clear out temporary files that might be lying around
DistributedCache.purgeCache(this.fConf);
- this.mapOutputFile.cleanupStorage();
+ cleanupStorage();
this.justStarted = true;
this.jobClient = (InterTrackerProtocol)
@@ -465,7 +464,15 @@
taskTrackerName);
mapEventsFetcher.start();
}
-
+
+ /**
+ * Removes all contents of temporary storage. Called upon
+ * startup, to remove any leftovers from previous run.
+ */
+ public void cleanupStorage() throws IOException {
+ this.fConf.deleteLocalFiles();
+ }
+
// Object on wait which MapEventsFetcherThread is going to wait.
private Object waitingOn = new Object();
@@ -638,13 +645,11 @@
jarFileSize = -1;
}
}
- // Here we check for double the size of jobfile to accommodate for
- // localize task file and we check four times the size of jarFileSize to
- // accommodate for unjarring the jar file in work directory
+
Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
+ Path.SEPARATOR + jobId
+ Path.SEPARATOR + "job.xml"),
- 2 * jobFileSize + 5 * jarFileSize, fConf);
+ jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
synchronized (rjob) {
if (!rjob.localized) {
@@ -667,18 +672,30 @@
JobConf localJobConf = new JobConf(localJobFile);
// create the 'work' directory
- File workDir = new File(new File(localJobFile.toString()).getParent(),
- "work");
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + workDir.toString());
- }
+ // job-specific shared directory for use as scratch space
+ Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
+ + Path.SEPARATOR + jobId
+ + Path.SEPARATOR + "work"), fConf);
+ if (!localFs.mkdirs(workDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + workDir.toString());
}
+ System.setProperty("job.local.dir", workDir.toString());
+ localJobConf.set("job.local.dir", workDir.toString());
- // unjar the job.jar files in workdir
+ // copy Jar file to the local FS and unjar it.
String jarFile = localJobConf.getJar();
if (jarFile != null) {
- localJarFile = new Path(jobDir,"job.jar");
+ // Here we check for and we check five times the size of jarFileSize
+ // to accommodate for unjarring the jar file in work directory
+ localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
+ getJobCacheSubdir()
+ + Path.SEPARATOR + jobId
+ + Path.SEPARATOR + "jars",
+ 5 * jarFileSize, fConf), "job.jar");
+ if (!localFs.mkdirs(localJarFile.getParent())) {
+ throw new IOException("Mkdirs failed to create jars directory ");
+ }
fs.copyToLocalFile(new Path(jarFile), localJarFile);
localJobConf.setJar(localJarFile.toString());
OutputStream out = localFs.create(localJobFile);
@@ -687,8 +704,9 @@
} finally {
out.close();
}
-
- RunJar.unJar(new File(localJarFile.toString()), workDir);
+ // also unjar the job.jar files
+ RunJar.unJar(new File(localJarFile.toString()),
+ new File(localJarFile.getParent().toString()));
}
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
@@ -763,7 +781,7 @@
this.running = false;
// Clear local storage
- this.mapOutputFile.cleanupStorage();
+ cleanupStorage();
// Shutdown the fetcher thread
this.mapEventsFetcher.interrupt();
@@ -782,8 +800,6 @@
maxCurrentReduceTasks = conf.getInt(
"mapred.tasktracker.reduce.tasks.maximum", 2);
this.jobTrackAddr = JobTracker.getAddress(conf);
- this.mapOutputFile = new MapOutputFile();
- this.mapOutputFile.setConf(conf);
String infoAddr =
NetUtils.getServerAddress(conf,
"tasktracker.http.bindAddress",
@@ -1370,7 +1386,11 @@
Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
task.getTaskId()), defaultJobConf );
FileSystem localFs = FileSystem.getLocal(fConf);
-
+ if (!localFs.mkdirs(localTaskDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + localTaskDir.toString());
+ }
+
// create symlink for ../work if it already doesnt exist
String workDir = lDirAlloc.getLocalPathToRead(
TaskTracker.getJobCacheSubdir()
@@ -1384,9 +1404,17 @@
FileUtil.symLink(workDir, link);
// create the working-directory of the task
- if (!localFs.mkdirs(localTaskDir)) {
- throw new IOException("Mkdirs failed to create " + localTaskDir.toString());
+ Path cwd = lDirAlloc.getLocalPathForWrite(
+ TaskTracker.getJobCacheSubdir()
+ + Path.SEPARATOR + task.getJobId()
+ + Path.SEPARATOR + task.getTaskId()
+ + Path.SEPARATOR + "work",
+ defaultJobConf);
+ if (!localFs.mkdirs(cwd)) {
+ throw new IOException("Mkdirs failed to create "
+ + cwd.toString());
}
+
Path localTaskFile = new Path(localTaskDir, "job.xml");
task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
@@ -1598,7 +1626,19 @@
} catch(IOException e){
LOG.warn("Exception finding task's stdout/err/syslog files");
}
- File workDir = new File(task.getJobFile()).getParentFile();
+ File workDir = null;
+ try {
+ workDir = new File(lDirAlloc.getLocalPathToRead(
+ TaskTracker.getJobCacheSubdir()
+ + Path.SEPARATOR + task.getJobId()
+ + Path.SEPARATOR + task.getTaskId()
+ + Path.SEPARATOR + "work",
+ localJobConf). toString());
+ } catch (IOException e) {
+ LOG.warn("Working Directory of the task " + task.getTaskId() +
+ "doesnt exist. Throws expetion " +
+ StringUtils.stringifyException(e));
+ }
// Build the command
File stdout = TaskLog.getTaskLogFile(task.getTaskId(),
TaskLog.LogName.DEBUGOUT);
@@ -2216,6 +2256,12 @@
) throws ServletException, IOException {
String mapId = request.getParameter("map");
String reduceId = request.getParameter("reduce");
+ String jobId = request.getParameter("job");
+
+ if (jobId == null) {
+ throw new IOException("job parameter is required");
+ }
+
if (mapId == null || reduceId == null) {
throw new IOException("map and reduce parameters are required");
}
@@ -2241,11 +2287,15 @@
// Index file
Path indexFileName = lDirAlloc.getLocalPathToRead(
- mapId+"/file.out.index", conf);
+ TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
+ jobId + Path.SEPARATOR +
+ mapId + "/output" + "/file.out.index", conf);
// Map-output file
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
- mapId+"/file.out", conf);
+ TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
+ jobId + Path.SEPARATOR +
+ mapId + "/output" + "/file.out", conf);
/**
* Read the index file to get the information about where
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Thu Mar 20 04:19:34 2008
@@ -256,12 +256,14 @@
private JobConf conf;
private boolean compressInput;
private String taskId;
+ private String jobId;
private boolean first = true;
public void configure(JobConf conf) {
this.conf = conf;
compressInput = conf.getCompressMapOutput();
taskId = conf.get("mapred.task.id");
+ jobId = conf.get("mapred.job.id");
}
public void reduce(WritableComparable key, Iterator values,
@@ -269,7 +271,9 @@
) throws IOException {
if (first) {
first = false;
- Path input = conf.getLocalPath(taskId+"/map_0.out");
+ MapOutputFile mapOutputFile = new MapOutputFile(jobId);
+ mapOutputFile.setConf(conf);
+ Path input = mapOutputFile.getInputFile(0, taskId);
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=639247&r1=639246&r2=639247&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Thu Mar 20 04:19:34 2008
@@ -144,12 +144,14 @@
String name = contents[fileIdx];
if (!("taskTracker".equals(contents[fileIdx]))) {
LOG.debug("Looking at " + name);
- int idx = neededDirs.indexOf(name);
assertTrue("Spurious directory " + name + " found in " +
- localDir, idx != -1);
- assertTrue("Matching output directory not found " + name +
- " in " + trackerDir,
- new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory());
+ localDir, false);
+ }
+ }
+ for (int idx = 0; idx < neededDirs.size(); ++idx) {
+ String name = neededDirs.get(idx);
+ if (new File(new File(new File(trackerDir, "jobcache"),
+ jobIds[idx]), name).isDirectory()) {
found[idx] = true;
numNotDel++;
}