You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/12 23:24:11 UTC
svn commit: r486372 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Tue Dec 12 14:24:09 2006
New Revision: 486372
URL: http://svn.apache.org/viewvc?view=rev&rev=486372
Log:
HADOOP-673. Give each task its own working directory again. Contributed by Mahadev.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Dec 12 14:24:09 2006
@@ -71,6 +71,9 @@
21. HADOOP-792. Fix 'dfs -mv' to return correct status.
(Dhruba Borthakur via cutting)
+22. HADOOP-673. Give each task its own working directory again.
+ (Mahadev Konar via cutting)
+
Release 0.9.1 - 2006-12-06
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Dec 12 14:24:09 2006
@@ -234,10 +234,12 @@
String[] argvSplit = splitArgs(argv);
String prog = argvSplit[0];
String userdir = System.getProperty("user.dir");
+ File currentDir = new File(".").getAbsoluteFile();
+ File jobCacheDir = new File(currentDir.getParentFile().getParent(), "work");
if (new File(prog).isAbsolute()) {
// we don't own it. Hope it is executable
} else {
- new MustangFile(prog).setExecutable(true, true);
+ new MustangFile(new File(jobCacheDir, prog).toString()).setExecutable(true, true);
}
if (job_.getInputValueClass().equals(BytesWritable.class)) {
@@ -282,7 +284,7 @@
//
if (!new File(argvSplit[0]).isAbsolute()) {
PathFinder finder = new PathFinder("PATH");
- finder.prependPathComponent(".");
+ finder.prependPathComponent(jobCacheDir.toString());
File f = finder.getAbsolutePath(argvSplit[0]);
if (f != null) {
argvSplit[0] = f.getAbsolutePath();
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Dec 12 14:24:09 2006
@@ -620,8 +620,8 @@
boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
if (!b)
fail(LINK_URI);
- DistributedCache.createSymlink(jobConf_);
}
+ DistributedCache.createSymlink(jobConf_);
// set the jobconf for the caching parameters
if (cacheArchives != null)
DistributedCache.setCacheArchives(archiveURIs, jobConf_);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Tue Dec 12 14:24:09 2006
@@ -24,6 +24,7 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
+
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.net.URI;
@@ -108,6 +109,8 @@
String cacheId = makeRelative(cache, conf);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+ if (lcacheStatus == null)
+ return;
synchronized (lcacheStatus) {
lcacheStatus.refcount--;
}
@@ -320,7 +323,29 @@
return digest;
}
-
+
+ /**
+ * This method create symlinks for all files in a given dir in another directory
+ * @param conf the configuration
+ * @param jobCacheDir the target directory for creating symlinks
+ * @param workDir the directory in which the symlinks are created
+ * @throws IOException
+ */
+ public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
+ throws IOException{
+ if ((!jobCacheDir.isDirectory()) || (!workDir.isDirectory())){
+ return;
+ }
+ boolean createSymlink = getSymlink(conf);
+ if (createSymlink){
+ File[] list = jobCacheDir.listFiles();
+ for (int i=0; i < list.length; i++){
+ FileUtil.symLink(list[i].getAbsolutePath(),
+ new File(workDir, list[i].getName()).toString());
+ }
+ }
+ }
+
private static String getFileSysName(URI url) {
String fsname = url.getScheme();
if ("dfs".equals(fsname)) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Dec 12 14:24:09 2006
@@ -43,6 +43,16 @@
return false;
}
} else {
+ //try deleting the directory
+ // this might be a symlink
+ boolean b = false;
+ b = contents[i].delete();
+ if (b){
+ //this was indeed a symlink or an empty directory
+ continue;
+ }
+ // if not an empty directory or symlink let
+ // fullydelete handle it.
if (! fullyDelete(contents[i])) {
return false;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=486372&r1=486371&r2=486372
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Dec 12 14:24:09 2006
@@ -21,6 +21,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.util.*;
import java.io.*;
import java.util.Vector;
import java.net.URI;
@@ -82,7 +83,8 @@
//before preparing the job localize
//all the archives
- File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
+ File workDir = new File(t.getJobFile()).getParentFile();
+ File jobCacheDir = new File(workDir.getParent(), "work");
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
if ((archives != null) || (files != null)) {
@@ -104,8 +106,6 @@
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
-
- // sets the paths to local archives and paths
Path localTaskFile = new Path(t.getJobFile());
FileSystem localFs = FileSystem.getNamed("local", conf);
localFs.delete(localTaskFile);
@@ -116,6 +116,16 @@
out.close();
}
}
+
+ // create symlinks for all the files in job cache dir in current
+ // workingdir for streaming
+ try{
+ DistributedCache.createAllSymlink(conf, jobCacheDir,
+ workDir);
+ } catch(IOException ie){
+ // Do not exit even if symlinks have not been created.
+ LOG.warn(StringUtils.stringifyException(ie));
+ }
if (! prepare()) {
return;
@@ -135,7 +145,7 @@
String jar = conf.getJar();
if (jar != null) {
// if jar exists, it into workDir
- File[] libs = new File(workDir, "lib").listFiles();
+ File[] libs = new File(jobCacheDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.append(sep); // add libs from jar to classpath
@@ -143,11 +153,13 @@
}
}
classPath.append(sep);
- classPath.append(new File(workDir, "classes"));
+ classPath.append(new File(jobCacheDir, "classes"));
classPath.append(sep);
- classPath.append(workDir);
+ classPath.append(jobCacheDir);
+
}
-
+ classPath.append(sep);
+ classPath.append(workDir);
// Build exec child jmv args.
Vector vargs = new Vector(8);
File jvm = // use same jvm as parent