You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/31 21:13:33 UTC

svn commit: r469635 - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/filecache...

Author: cutting
Date: Tue Oct 31 12:13:31 2006
New Revision: 469635

URL: http://svn.apache.org/viewvc?view=rev&rev=469635
Log:
HADOOP-576.  Enable contrib/streaming to use the file cache.  Contributed by Mahadev.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/src/contrib/build-contrib.xml
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 31 12:13:31 2006
@@ -87,6 +87,10 @@
     reduce tasks which use MapFile to still report progress while
     writing blocks to the filesystem.  (cutting)
 
+24. HADOOP-576.  Enable contrib/streaming to use the file cache.  Also
+    extend the cache to permit symbolic links to cached items, rather
+    than local file copies.  (Mahadev Konar via cutting)
+
 
 Release 0.7.2 - 2006-10-18
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Tue Oct 31 12:13:31 2006
@@ -350,8 +350,8 @@
 
   <target name="test-contrib" depends="compile-core, compile-core-test">
     <subant target="test">
-        <fileset file="${basedir}/src/contrib/build.xml"/>
-    </subant>  	
+       <fileset file="${basedir}/src/contrib/build.xml"/>
+    </subant> 
   </target>   
   
   <target name="test" depends="test-core, test-contrib">

Modified: lucene/hadoop/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/build-contrib.xml?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build-contrib.xml (original)
+++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Tue Oct 31 12:13:31 2006
@@ -24,9 +24,10 @@
   <property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
   <property name="build.classes" location="${build.dir}/classes"/>
   <property name="build.test" location="${build.dir}/test"/>
+  <property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
   <!-- all jars together -->
   <property name="deploy.dir" location="${hadoop.root}/build/"/>
-
+  <property name="minimr.dir" value="${hadoop.root}/build/minimr"/>
   <property name="javac.deprecation" value="off"/>
   <property name="javac.debug" value="on"/>
 
@@ -51,6 +52,7 @@
   <path id="test.classpath">
     <pathelement location="${build.test}" />
     <pathelement location="${hadoop.root}/build/test/classes"/>
+    <pathelement location="${minimr.dir}" />
     <pathelement location="${hadoop.root}/src/test"/>
     <pathelement location="${conf.dir}"/>
     <pathelement location="${hadoop.root}/build"/>
@@ -69,7 +71,8 @@
     <mkdir dir="${build.dir}"/>
     <mkdir dir="${build.classes}"/>
     <mkdir dir="${build.test}"/>
-
+    <mkdir dir="${hadoop.log.dir}"/>
+    <mkdir dir="${minimr.dir}"/>
     <antcall target="init-contrib"/>
   </target>
 
@@ -131,8 +134,10 @@
   <!-- ================================================================== -->
   <target name="test" depends="compile-test, deploy" if="test.available">
     <echo message="contrib: ${name}"/>
+    <delete dir="${hadoop.log.dir}"/>
+    <mkdir dir="${hadoop.log.dir}"/>
     <junit
-      printsummary="withOutAndErr" haltonfailure="no" fork="yes"
+      printsummary="withOutAndErr" showoutput="no" haltonfailure="no" fork="yes"
       errorProperty="tests.failed" failureProperty="tests.failed">
       
       <sysproperty key="test.build.data" value="${build.test}/data"/>
@@ -145,7 +150,7 @@
       
       <sysproperty key="fs.default.name" value="${fs.default.name}"/>
       <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
-   
+      <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/> 
       <classpath refid="test.classpath"/>
       <formatter type="plain" />
       <batchtest todir="${build.test}" unless="testcase">

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java Tue Oct 31 12:13:31 2006
@@ -22,8 +22,12 @@
 import java.net.InetAddress;
 import java.util.*;
 
-/*
- * If we move to Java 1.5, we can get rid of this class and just use System.getenv
+/**
+ * This is a class used to get the current environment
+ * on the host machines running the map/reduce. This class
+ * assumes that setting the environment in streaming is 
+ * allowed on windows/ix/linuz/freebsd/sunos/solaris/hp-ux
+ * @author michel
  */
 public class Environment extends Properties {
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Tue Oct 31 12:13:31 2006
@@ -21,6 +21,8 @@
 import java.io.IOException;
 
 /** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
+ * or bin/hadoop har hadoop-streaming.jar args.
+ * It passes all the args to StreamJob which handles all the arguments.
  */
 public class HadoopStreaming {
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java Tue Oct 31 12:13:31 2006
@@ -23,6 +23,16 @@
 import java.util.jar.*;
 import java.util.zip.ZipException;
 
+/**
+ * This class is the main class for generating job.jar
+ * for Hadoop Streaming jobs. It includes the files specified 
+ * with the -file option and includes them in the jar. Also,
+ * hadoop-streaming is a user level appplication, so all the classes
+ * with hadoop-streaming that are needed in the job are also included
+ * in the job.jar.
+ * @author michel
+ *
+ */
 public class JarBuilder {
 
   public JarBuilder() {

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java Tue Oct 31 12:13:31 2006
@@ -30,7 +30,9 @@
  *
  * Note: not specifying ownerOnly maps to ownerOnly = false
  * From man chmod: If no user specs are given, the effect is as if `a' were given. 
- * 
+ * This class is mainly used to change permissions when files are unjarred from the 
+ * job.jar. The executable specified in the mappper/reducer is set to be executable 
+ * using this class.
  */
 public class MustangFile extends File {
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Oct 31 12:13:31 2006
@@ -39,7 +39,8 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
-
+import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.util.*;
 /** All the client-side work happens here.
  * (Jar packaging, MapRed job submission and monitoring)
  * @author Michel Tourn
@@ -54,7 +55,13 @@
     argv_ = argv;
     mayExit_ = mayExit;
   }
-
+  
+  /**
+   * This is the method that actually 
+   * intializes the job conf and submits the job
+   * to the jobtracker
+   * @throws IOException
+   */
   public void go() throws IOException {
     init();
 
@@ -65,7 +72,7 @@
     setJobConf();
     submitAndMonitorJob();
   }
-
+  
   protected void init() {
     try {
       env_ = new Environment();
@@ -157,6 +164,10 @@
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
   }
 
+  /**
+   * This method parses the command line args
+   * to a hadoop streaming job
+   */
   void parseArgv() {
     if (argv_.length == 0) {
       exitUsage(false);
@@ -219,7 +230,22 @@
       } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
         i++;
         inReaderSpec_ = s;
-      } else {
+      } else if((s = optionArg(argv_, i, "-cacheArchive", false)) != null) {
+    	  i++;
+    	  if (cacheArchives == null)
+    		  cacheArchives = s;
+    	  else
+    		  cacheArchives = cacheArchives + "," + s;    	  
+      } else if((s = optionArg(argv_, i, "-cacheFile", false)) != null) {
+        i++;
+        System.out.println(" the val of s is " + s);
+        if (cacheFiles == null)
+          cacheFiles = s;
+        else
+          cacheFiles = cacheFiles + "," + s;
+        System.out.println(" the val of cachefiles is " + cacheFiles);
+      }
+      else {
         System.err.println("Unexpected argument: " + argv_[i]);
         exitUsage(false);
       }
@@ -269,6 +295,8 @@
     System.out.println("  -inputreader <spec>  Optional.");
     System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
     System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
+    System.out.println("  -cacheFile fileNameURI");
+    System.out.println("  -cacheArchive fileNameURI");
     System.out.println("  -verbose");
     System.out.println();
     if (!detailed) {
@@ -392,7 +420,7 @@
     // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
     // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
     String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir
-System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
+    System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
     
     if (runtimeClasses == null) {
       runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
@@ -435,6 +463,11 @@
     return jobJarName;
   }
 
+  /**
+   * This method sets the user jobconf variable specified
+   * by user using -jobconf key=value
+   * @param doEarlyProps
+   */
   protected void setUserJobConfProps(boolean doEarlyProps) {
     Iterator it = userJobConfProps_.keySet().iterator();
     while (it.hasNext()) {
@@ -448,7 +481,17 @@
       }
     }
   }
-
+  
+  /**
+   * get the uris of all the files/caches
+   */
+  protected void getURIs(String lcacheArchives, String lcacheFiles) {
+    String archives[] = StringUtils.getStrings(lcacheArchives);
+    String files[] = StringUtils.getStrings(lcacheFiles);
+    fileURIs = StringUtils.stringToURI(files);
+    archiveURIs = StringUtils.stringToURI(archives);
+  }
+  
   protected void setJobConf() throws IOException {
     msg("hadoopAliasConf_ = " + hadoopAliasConf_);
     config_ = new Configuration();
@@ -548,7 +591,8 @@
         jobConf_.set(k, v);
       }
     }
-
+    
+    setUserJobConfProps(false);
     // output setup is done late so we can customize for reducerNone_
     //jobConf_.setOutputDir(new File(output_));
     setOutputSpec();
@@ -561,20 +605,36 @@
 
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
-    setUserJobConfProps(false);
 
     jar_ = packageJobJar();
     if (jar_ != null) {
       jobConf_.setJar(jar_);
     }
-
+    
+    if ((cacheArchives != null) || (cacheFiles != null)){
+      getURIs(cacheArchives, cacheFiles);
+      boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
+      if (!b)
+        fail(LINK_URI);
+      DistributedCache.createSymlink(jobConf_);
+    }
+    // set the jobconf for the caching parameters
+    if (cacheArchives != null)
+      DistributedCache.setCacheArchives(archiveURIs, jobConf_);
+    if (cacheFiles != null)
+      DistributedCache.setCacheFiles(fileURIs, jobConf_);
+    
     if(verbose_) {
       listJobConfProperties();
     }
-    
+   
     msg("submitting to jobconf: " + getJobTrackerHostPort());
   }
 
+  /**
+   * Prints out the jobconf properties on stdout
+   * when verbose is specified.
+   */
   protected void listJobConfProperties()
   {
     msg("==== JobConf properties:");
@@ -765,6 +825,10 @@
   protected String comCmd_;
   protected String redCmd_;
   protected String cluster_;
+  protected String cacheFiles;
+  protected String cacheArchives;
+  protected URI[] fileURIs;
+  protected URI[] archiveURIs;
   protected ArrayList configPath_ = new ArrayList(); // <String>
   protected String hadoopAliasConf_;
   protected String inReaderSpec_;
@@ -780,5 +844,6 @@
 
   protected RunningJob running_;
   protected String jobId_;
-
+  protected static String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
+      "Please specify a different link name for all of your caching URIs";
 }

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=auto&rev=469635
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Tue Oct 31 12:13:31 2006
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+/**
+ * This test case tests the symlink creation
+ * utility provided by distributed caching 
+ * @author mahadev
+ *
+ */
+public class TestSymLink extends TestCase
+{
+  String INPUT_FILE = "/testing-streaming/input.txt";
+  String OUTPUT_DIR = "/testing-streaming/out";
+  String CACHE_FILE = "/testing-streaming/cache.txt";
+  String input = "check to see if we can read this none reduce";
+  String map = "xargs cat ";
+  String reduce = "cat";
+  String mapString = "testlink\n";
+  String cacheString = "This is just the cache string";
+  StreamJob job;
+
+  public TestSymLink() throws IOException
+  {
+  }
+
+  public void testSymLink()
+  {
+    try {
+      boolean mayExit = false;
+      int jobTrackerPort = 60050;
+      MiniMRCluster mr = null;
+      MiniDFSCluster dfs = null; 
+      FileSystem fileSys = null;
+      try{
+        Configuration conf = new Configuration();
+        dfs = new MiniDFSCluster(8050, conf, false);
+        fileSys = dfs.getFileSystem();
+        String namenode = fileSys.getName();
+        mr  = new MiniMRCluster(jobTrackerPort, 60060, 1, namenode, true, 3);
+        // During tests, the default Configuration will use a local mapred
+        // So don't specify -config or -cluster
+        String strJobtracker = "mapred.job.tracker=" + "localhost:" + jobTrackerPort;
+        String strNamenode = "fs.default.name=" + namenode;
+        String argv[] = new String[] {
+            "-input", INPUT_FILE,
+            "-output", OUTPUT_DIR,
+            "-mapper", map,
+            "-reducer", reduce,
+            //"-verbose",
+            //"-jobconf", "stream.debug=set"
+            "-jobconf", strNamenode,
+            "-jobconf", strJobtracker,
+            "-cacheFile", "dfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
+        };
+
+        fileSys.delete(new Path(OUTPUT_DIR));
+        fileSys.mkdirs(new Path(OUTPUT_DIR));
+        
+        DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
+        file.writeBytes(mapString);
+        file.close();
+        file = fileSys.create(new Path(CACHE_FILE));
+        file.writeBytes(cacheString);
+        file.close();
+          
+        job = new StreamJob(argv, mayExit);      
+        job.go();
+        String line = null;
+        Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
+        for (int i = 0; i < fileList.length; i++){
+          System.out.println(fileList[i].toString());
+          BufferedReader bread =
+            new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
+          line = bread.readLine();
+          System.out.println(line);
+        }
+        assertEquals(cacheString + "\t", line);
+      } finally{
+        if (fileSys != null) { fileSys.close(); }
+        if (dfs != null) { dfs.shutdown(); }
+        if (mr != null) { mr.shutdown();}
+      }
+      
+    } catch(Exception e) {
+      failTrace(e);
+    }
+  }
+
+  void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreaming().testCommandLine();
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Tue Oct 31 12:13:31 2006
@@ -302,14 +302,7 @@
    */
   public String[] getStrings(String name) {
     String valueString = get(name);
-    if (valueString == null)
-      return null;
-    StringTokenizer tokenizer = new StringTokenizer (valueString,",");
-    List values = new ArrayList();
-    while (tokenizer.hasMoreTokens()) {
-      values.add(tokenizer.nextToken());
-    }
-    return (String[])values.toArray(new String[values.size()]);
+    return StringUtils.getStrings(valueString);
   }
 
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Tue Oct 31 12:13:31 2006
@@ -43,7 +43,7 @@
   /**
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema 
+   * new URI(dfs://hostname:port/absoulte_path_to_file#LINKNAME). If no schema 
    * or hostname:port is provided the file is assumed to be in the filesystem
    * being used in the Configuration
    * @param conf The Confguration file which contains the filesystem
@@ -55,12 +55,14 @@
    * @param md5 this is a mere checksum to verufy if you are using the right cache. 
    * You need to pass the md5 of the crc file in DFS. This is matched against the one
    * calculated in this api and if it does not match, the cache is not localized.
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * @throws IOException
    */
   public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
-      boolean isArchive, String md5) throws IOException {
+      boolean isArchive, String md5, Path currentWorkDir) throws IOException {
     String cacheId = makeRelative(cache, conf);
     CacheStatus lcacheStatus;
     Path localizedPath;
@@ -80,7 +82,7 @@
       }
     }
     synchronized (lcacheStatus) {
-      localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5);
+      localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5, currentWorkDir);
     }
     // try deleting stuff if you can
     long size = FileUtil.getDU(new File(baseDir.toString()));
@@ -157,15 +159,26 @@
 
   // the methoed which actually copies the caches locally and unjars/unzips them
   private static Path localizeCache(URI cache, CacheStatus cacheStatus,
-      Configuration conf, boolean isArchive, String md5) throws IOException {
+      Configuration conf, boolean isArchive, String md5, Path currentWorkDir) throws IOException {
     boolean b = true;
+    boolean doSymlink = getSymlink(conf);
     FileSystem dfs = getFileSystem(cache, conf);
     b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
     if (b) {
-      if (isArchive)
+      if (isArchive) {
+        if (doSymlink)
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
+            currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
+        
         return cacheStatus.localLoadPath;
-      else
+      }
+      else {
+        if (doSymlink)
+          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
+              currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
+       
         return cacheFilePath(cacheStatus.localLoadPath);
+      }
     } else {
       // remove the old archive
       // if the old archive cannot be removed since it is being used by another
@@ -179,7 +192,6 @@
       localFs.delete(cacheStatus.localLoadPath);
       Path parchive = new Path(cacheStatus.localLoadPath,
                                new Path(cacheStatus.localLoadPath.getName()));
-
       localFs.mkdirs(cacheStatus.localLoadPath);
       String cacheId = cache.getPath();
       dfs.copyToLocalFile(new Path(cacheId), parchive);
@@ -199,14 +211,23 @@
         // else will not do anyhting
         // and copy the file into the dir as it is
       }
+      // create a symlink if #NAME is specified as fragment in the
+      // symlink
       cacheStatus.currentStatus = true;
       cacheStatus.md5 = checkSum;
     }
-    if (isArchive)
+    if (isArchive){
+      if (doSymlink)
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
+            currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
       return cacheStatus.localLoadPath;
-    else
+    }
+    else {
+      if (doSymlink)
+        FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
+            currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
       return cacheFilePath(cacheStatus.localLoadPath);
-
+    }
   }
 
   // Checks if the cache has already been localized and is fresh
@@ -451,6 +472,75 @@
     String files = conf.get("mapred.cache.files");
     conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
         + uri.toString());
+  }
+  
+  /**
+   * This method allows you to create symlinks in the current working directory
+   * of the task to all the cache files/archives
+   * @param conf the jobconf 
+   */
+  public static void createSymlink(Configuration conf){
+    conf.set("mapred.create.symlink", "yes");
+  }
+  
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @param conf the jobconf
+   * @return true if symlinks are to be created- else return false
+   */
+  public static boolean getSymlink(Configuration conf){
+    String result = conf.get("mapred.create.symlink");
+    if ("yes".equals(result)){
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This method checks if there is a conflict in the fragment names 
+   * of the uris. Also makes sure that each uri has a fragment. It 
+   * is only to be called if you want to create symlinks for 
+   * the various archives and files.
+   * @param uriFiles The uri array of urifiles
+   * @param uriArchives the uri array of uri archives
+   */
+  public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
+    if ((uriFiles == null) && (uriArchives == null)){
+      return true;
+    }
+    if (uriFiles != null){
+      for (int i = 0; i < uriFiles.length; i++){
+        String frag1 = uriFiles[i].getFragment();
+        if (frag1 == null)
+          return false;
+        for (int j=i+1; j < uriFiles.length; i++){
+          String frag2 = uriFiles[j].getFragment();
+          if (frag2 == null)
+            return false;
+          if (frag1.equalsIgnoreCase(frag2))
+            return false;
+        }
+        if (uriArchives != null){
+          for (int j = 0; j < uriArchives.length; j++){
+            String frag2 = uriArchives[j].getFragment();
+            if (frag2 == null){
+              return false;
+            }
+            if (frag1.equalsIgnoreCase(frag2))
+              return false;
+            for (int k=j+1; k < uriArchives.length; k++){
+              String frag3 = uriArchives[k].getFragment();
+              if (frag3 == null)
+                return false;
+              if (frag2.equalsIgnoreCase(frag3))
+                  return false;
+            }
+          }
+        }
+      }
+    }
+    return true;
   }
 
   private static class CacheStatus {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Oct 31 12:13:31 2006
@@ -301,4 +301,23 @@
       zipFile.close();
     }
   }
+  
+  /**
+   * Create a soft link between a src and destination
+   * only on a local disk. HDFS does not support this
+   * @param target the target for symlink 
+   * @param destination the symlink
+   * @return value returned by the command
+   */
+  public static int symLink(String target, String linkname) throws IOException{
+   String cmd = "ln -s " + target + " " + linkname;
+   Process p = Runtime.getRuntime().exec( cmd, null );
+   int returnVal = -1;
+   try{
+     returnVal = p.waitFor();
+   } catch(InterruptedException e){
+     //do nothing as of yet
+   }
+   return returnVal;
+ }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Oct 31 12:13:31 2006
@@ -80,7 +80,7 @@
       
       //before preparing the job localize 
       //all the archives
-      
+      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       if ((archives != null) || (files != null)) {
@@ -88,7 +88,8 @@
           String[] md5 = DistributedCache.getArchiveMd5(conf);
           Path[] p = new Path[archives.length];
           for (int i = 0; i < archives.length;i++){
-            p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]);
+            p[i] = DistributedCache.getLocalCache(archives[i], conf, 
+                conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i], new Path(workDir.getAbsolutePath()));
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
         }
@@ -97,7 +98,7 @@
           Path[] p = new Path[files.length];
           for (int i = 0; i < files.length;i++){
            p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
-              .getCacheSubdir()), false, md5[i]);
+              .getCacheSubdir()), false, md5[i], new Path(workDir.getAbsolutePath()));
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
@@ -123,7 +124,6 @@
       // start with same classpath as parent process
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(sep);
-      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
       workDir.mkdirs();
 	  
       String jar = conf.getJar();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=469635&r1=469634&r2=469635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Tue Oct 31 12:13:31 2006
@@ -24,7 +24,10 @@
 import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.DecimalFormat;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
+import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.*;
 
@@ -253,4 +256,19 @@
     return buf.toString();
   }
   
-}
+  /**
+   * returns an arraylist of strings  
+   * @param str the comma seperated string values
+   * @return the arraylist of the comma seperated string values
+   */
+  public static String[] getStrings(String str){
+    if (str == null)
+      return null;
+    StringTokenizer tokenizer = new StringTokenizer (str,",");
+    List values = new ArrayList();
+    while (tokenizer.hasMoreTokens()) {
+      values.add(tokenizer.nextToken());
+    }
+    return (String[])values.toArray(new String[values.size()]);
+  }
+}
\ No newline at end of file