You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/26 22:10:19 UTC

svn commit: r641577 - in /hadoop/core/trunk: ./ bin/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/test/testshell/

Author: dhruba
Date: Wed Mar 26 14:10:17 2008
New Revision: 641577

URL: http://svn.apache.org/viewvc?rev=641577&view=rev
Log:
HADOOP-1622.  Allow multiple jar files for map reduce.
(Mahadev Konar via dhruba)


Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java   (with props)
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java   (with props)
    hadoop/core/trunk/src/test/testshell/
    hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java   (with props)
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/bin/hadoop
    hadoop/core/trunk/build.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 26 14:10:17 2008
@@ -78,6 +78,9 @@
     HADOOP-2951.  Add a contrib module that provides a utility to
     build or update Lucene indexes using Map/Reduce.  (Ning Li via cutting)
 
+    HADOOP-1622.  Allow multiple jar files for map reduce.
+    (Mahadev Konar via dhruba)
+
   IMPROVEMENTS
 
     HADOOP-2655. Copy on write for data and metadata files in the 

Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Wed Mar 26 14:10:17 2008
@@ -212,7 +212,7 @@
 elif [ "$COMMAND" = "version" ] ; then
   CLASS=org.apache.hadoop.util.VersionInfo
 elif [ "$COMMAND" = "jar" ] ; then
-  CLASS=org.apache.hadoop.util.RunJar
+  CLASS=org.apache.hadoop.mapred.JobShell
 elif [ "$COMMAND" = "distcp" ] ; then
   CLASS=org.apache.hadoop.util.CopyFiles
 elif [ "$COMMAND" = "daemonlog" ] ; then

Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Wed Mar 26 14:10:17 2008
@@ -83,6 +83,7 @@
   <property name="test.log.dir" value="${test.build.dir}/logs"/>
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.testjar" value="${test.build.dir}/testjar"/>
+  <property name="test.build.testshell" value="${test.build.dir}/testshell"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
   <property name="test.include" value="Test*"/>
   <property name="test.classpath.id" value="test.classpath"/>
@@ -192,6 +193,7 @@
     <mkdir dir="${test.build.dir}"/>
     <mkdir dir="${test.build.classes}"/>
     <mkdir dir="${test.build.testjar}"/>
+    <mkdir dir="${test.build.testshell}"/>
     <tempfile property="touch.temp.file" destDir="${java.io.tmpdir}"/>
     <touch millis="0" file="${touch.temp.file}">
       <fileset dir="${conf.dir}" includes="**/*.template"/>
@@ -498,6 +500,23 @@
     <jar jarfile="${test.build.testjar}/testjob.jar"
      basedir="${test.build.testjar}">
     </jar>
+    <javac 
+     encoding="${build.encoding}"
+     srcdir="${test.src.dir}/testshell"
+     includes="*.java"
+     destdir="${test.build.testshell}"
+     debug="${javac.debug}"
+     optimize="${javac.optimize}"
+     target="${javac.version}"
+     source="${javac.version}"
+     deprecation="${javac.deprecation}">
+      <compilerarg line="${javac.args} ${javac.args.warnings}"/>
+      <classpath refid="test.classpath"/>
+     </javac>
+     <delete file="${test.build.testshell}/testshell.jar"/>
+     <jar jarfile="${test.build.testshell}/testshell.jar"
+      basedir="${test.build.testshell}">
+     </jar>
                                                               
     <delete dir="${test.cache.data}"/>
     <mkdir dir="${test.cache.data}"/>

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Mar 26 14:10:17 2008
@@ -28,11 +28,14 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLConnection;
+import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -48,6 +51,7 @@
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
@@ -155,7 +159,7 @@
   private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
-
+  private static Configuration commandLineConfig;
   static long MAX_JOBPROFILE_AGE = 1000 * 2;
 
   /**
@@ -341,7 +345,24 @@
     setConf(conf);
     init(conf);
   }
-    
+  
+  /**
+   * set the command line config in the jobclient. these are
+   * parameters paassed from the command line and stored in 
+   * conf
+   * @param conf the configuration object to set.
+   */
+  static void setCommandLineConfig(Configuration conf) {
+    commandLineConfig = conf;
+  }
+  
+  /**
+   * return the command line configuration
+   */
+  public static Configuration getCommandLineConfig() {
+    return commandLineConfig;
+  }
+  
   /**
    * Connect to the default {@link JobTracker}.
    * @param conf the job configuration.
@@ -417,50 +438,87 @@
     }
     return fs;
   }
-
-  /**
-   * Submit a job to the MR system.
-   * 
-   * This returns a handle to the {@link RunningJob} which can be used to track
-   * the running-job.
-   * 
-   * @param jobFile the job configuration.
-   * @return a handle to the {@link RunningJob} which can be used to track the
-   *         running-job.
-   * @throws FileNotFoundException
-   * @throws InvalidJobConfException
-   * @throws IOException
+  
+  /* see if two file systems are the same or not
+   *
    */
-  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
-                                                     InvalidJobConfException, 
-                                                     IOException {
-    // Load in the submitted job details
-    JobConf job = new JobConf(jobFile);
-    return submitJob(job);
+  private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+    URI srcUri = srcFs.getUri();
+    URI dstUri = destFs.getUri();
+    if (srcUri.getScheme() == null) {
+      return false;
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false;
+    }
+    String srcHost = srcUri.getHost();    
+    String dstHost = dstUri.getHost();
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+      } catch(UnknownHostException ue) {
+        return false;
+      }
+      if (!srcHost.equals(dstHost)) {
+        return false;
+      }
+    }
+    else if (srcHost == null && dstHost != null) {
+      return false;
+    }
+    else if (srcHost != null && dstHost == null) {
+      return false;
+    }
+    //check for ports
+    if (srcUri.getPort() != dstUri.getPort()) {
+      return false;
+    }
+    return true;
   }
-    
-  // job files are world-wide readable and owner writable
-  final private static FsPermission JOB_FILE_PERMISSION = 
-    FsPermission.createImmutable((short) 0644); // rw-r--r--
 
-  // system directories are world-wide readable and owner readable
-  final static FsPermission SYSTEM_DIR_PERMISSION =
-    FsPermission.createImmutable((short) 0733); // rwx-wx-wx
-   
+  // copies a file to the jobtracker filesystem and returns the path where it
+  // was copied to
+  private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath, 
+                               JobConf job, short replication) throws IOException {
+    //check if we do not need to copy the files
+    // is jt using the same file system.
+    // just checking for uri strings... doing no dns lookups 
+    // to see if the filesystems are the same. This is not optimal.
+    // but avoids name resolution.
+    
+    FileSystem remoteFs = null;
+    remoteFs = originalPath.getFileSystem(job);
+    if (compareFs(remoteFs, jtFs)) {
+      return originalPath;
+    }
+    // this might have name collisions. copy will throw an exception
+    //parse the original path to create new path
+    Path newPath = new Path(parentDir, originalPath.getName());
+    FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job);
+    jtFs.setReplication(newPath, replication);
+    return newPath;
+  }
+ 
   /**
-   * Submit a job to the MR system.
-   * This returns a handle to the {@link RunningJob} which can be used to track
-   * the running-job.
-   * 
-   * @param job the job configuration.
-   * @return a handle to the {@link RunningJob} which can be used to track the
-   *         running-job.
-   * @throws FileNotFoundException
-   * @throws InvalidJobConfException
+   * configure the jobconf of the user with the command line options of 
+   * -libjars, -files, -archives
+   * @param conf
    * @throws IOException
    */
-  public RunningJob submitJob(JobConf job) throws FileNotFoundException, 
-                                                  InvalidJobConfException, IOException {
+  private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile) 
+    throws IOException {
+    // get all the command line arguments into the 
+    // jobconf passed in by the user conf
+    Configuration commandConf = JobClient.getCommandLineConfig();
+    String files = null;
+    String libjars = null;
+    String archives = null;
+    if (commandConf != null) {
+      files = commandConf.get("tmpfiles");
+      libjars = commandConf.get("tmpjars");
+      archives = commandConf.get("tmparchives");
+    }
     /*
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
@@ -482,17 +540,66 @@
     //
 
     // Create a number of filenames in the JobTracker's fs namespace
-    String jobId = jobSubmitClient.getNewJobId();
-    Path submitJobDir = new Path(job.getSystemDir(), jobId);
     FileSystem fs = getFs();
     LOG.debug("default FileSystem: " + fs.getUri());
-    fs.delete(submitJobDir, true);    
-    FileSystem.mkdirs(fs, submitJobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
-    Path submitJobFile = new Path(submitJobDir, "job.xml");
-    Path submitJarFile = new Path(submitJobDir, "job.jar");
-    Path submitSplitFile = new Path(submitJobDir, "job.split");
-        
-    // set the timestamps of the archives and files
+    fs.delete(submitJobDir, true);
+    submitJobDir = fs.makeQualified(submitJobDir);
+    submitJobDir = new Path(submitJobDir.toUri().getPath());
+    FsPermission mapredSysPerms = new FsPermission(SYSTEM_DIR_PERMISSION);
+    FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
+    Path filesDir = new Path(submitJobDir, "files");
+    Path archivesDir = new Path(submitJobDir, "archives");
+    Path libjarsDir = new Path(submitJobDir, "libjars");
+    short replication = (short)job.getInt("mapred.submit.replication", 10);
+    // add all the command line files/ jars and archive
+    // first copy them to jobtrackers filesystem 
+    
+    if (files != null) {
+      FileSystem.mkdirs(fs, filesDir, mapredSysPerms);
+      String[] fileArr = files.split(",");
+      for (String tmpFile: fileArr) {
+        Path tmp = new Path(tmpFile);
+        Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);
+        try {
+          URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
+          DistributedCache.addCacheFile(pathURI, job);
+        } catch(URISyntaxException ue) {
+          //should not throw a uri exception 
+          throw new IOException("Failed to create uri for " + tmpFile);
+        }
+        DistributedCache.createSymlink(job);
+      }
+    }
+    
+    if (libjars != null) {
+      FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);
+      String[] libjarsArr = libjars.split(",");
+      for (String tmpjars: libjarsArr) {
+        Path tmp = new Path(tmpjars);
+        Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
+        DistributedCache.addArchiveToClassPath(newPath, job);
+      }
+    }
+    
+    
+    if (archives != null) {
+     FileSystem.mkdirs(fs, archivesDir, mapredSysPerms); 
+     String[] archivesArr = archives.split(",");
+     for (String tmpArchives: archivesArr) {
+       Path tmp = new Path(tmpArchives);
+       Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);
+       try {
+         URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
+         DistributedCache.addCacheArchive(pathURI, job);
+       } catch(URISyntaxException ue) {
+         //should not throw an uri excpetion
+         throw new IOException("Failed to create uri for " + tmpArchives);
+       }
+       DistributedCache.createSymlink(job);
+     }
+    }
+    
+    //  set the timestamps of the archives and files
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     if (tarchives != null) {
       StringBuffer archiveTimestamps = 
@@ -516,7 +623,6 @@
     }
        
     String originalJarPath = job.getJar();
-    short replication = (short)job.getInt("mapred.submit.replication", 10);
 
     if (originalJarPath != null) {           // copy jar to JobTracker's fs
       // use jar name if job is not named. 
@@ -538,6 +644,63 @@
       job.setWorkingDirectory(fs.getWorkingDirectory());          
     }
 
+  }
+  
+  /**
+   * Submit a job to the MR system.
+   * 
+   * This returns a handle to the {@link RunningJob} which can be used to track
+   * the running-job.
+   * 
+   * @param jobFile the job configuration.
+   * @return a handle to the {@link RunningJob} which can be used to track the
+   *         running-job.
+   * @throws FileNotFoundException
+   * @throws InvalidJobConfException
+   * @throws IOException
+   */
+  public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
+                                                     InvalidJobConfException, 
+                                                     IOException {
+    // Load in the submitted job details
+    JobConf job = new JobConf(jobFile);
+    return submitJob(job);
+  }
+    
+  // job files are world-wide readable and owner writable
+  final private static FsPermission JOB_FILE_PERMISSION = 
+    FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  // system directories are world-wide readable and owner readable
+  final static FsPermission SYSTEM_DIR_PERMISSION =
+    FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+   
+  /**
+   * Submit a job to the MR system.
+   * This returns a handle to the {@link RunningJob} which can be used to track
+   * the running-job.
+   * 
+   * @param job the job configuration.
+   * @return a handle to the {@link RunningJob} which can be used to track the
+   *         running-job.
+   * @throws FileNotFoundException
+   * @throws InvalidJobConfException
+   * @throws IOException
+   */
+  public RunningJob submitJob(JobConf job) throws FileNotFoundException, 
+                                  InvalidJobConfException, IOException {
+    /*
+     * configure the command line options correctly on the submitting dfs
+     */
+    
+    String jobId = jobSubmitClient.getNewJobId();
+    Path submitJobDir = new Path(job.getSystemDir(), jobId);
+    Path submitJarFile = new Path(submitJobDir, "job.jar");
+    Path submitSplitFile = new Path(submitJobDir, "job.split");
+    configureCommandLineOptions(job, submitJobDir, submitJarFile);
+    Path submitJobFile = new Path(submitJobDir, "job.xml");
+    
+    
     // Check the input specification 
     job.getInputFormat().validateInput(job);
 

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java?rev=641577&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java Wed Mar 26 14:10:17 2008
@@ -0,0 +1,223 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.hadoop.mapred;
+
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.*;
+
+
+/** Provide command line parsing for JobSubmission 
+ *  job submission looks like 
+ *  hadoop jar -libjars <comma seperated jars> -archives <comma seperated archives> 
+ *  -files <comma seperated files> inputjar args
+ */
+public class JobShell extends Configured implements Tool {
+  private FileSystem localFs;
+  protected static final Log LOG = LogFactory.getLog(JobShell.class.getName());
+  /**
+   * cli implementation for 
+   * job shell.
+   */
+  private CommandLineParser parser = new GnuParser();
+  private Options opts = new Options();
+ 
+  public JobShell() {this(null);};
+  
+  public JobShell(Configuration conf) {
+    super(conf);
+    setUpOptions();
+  }
+  
+  /**
+   * a method to create an option
+   */
+  @SuppressWarnings("static-access")
+  private Option createOption(String name, String desc, 
+                              String argName, int max) {
+    return OptionBuilder.withArgName(argName).
+                         hasArg().withDescription(desc).
+                         create(name);
+  }
+  
+  
+  /**
+   * set up options 
+   * specific to jobshell
+   */
+  private void setUpOptions() {
+    Option libjar = createOption("libjars",
+                                "comma seperated jar files to " +
+                                "include in the classpath.",
+                                "paths",
+                                1);
+    Option file = createOption("files",
+                               "comma seperated files to be copied to the " +
+                               "map reduce cluster.",
+                               "paths",
+                               1);
+    Option archives = createOption("archives",
+                                   "comma seperated archives to be unarchives" +
+                                   " on the compute machines.",
+                                   "paths",
+                                   1);
+    opts.addOption(libjar);
+    opts.addOption(file);
+    opts.addOption(archives);
+  }
+  
+  protected void init() throws IOException {
+    getConf().setQuietMode(true);
+  }
+  
+  /**
+   * takes input as a comma separated list of files
+   * and verifies if they exist. It defaults for file:///
+   * if the files specified do not have a scheme.
+   * it returns the paths uri converted defaulting to file:///.
+   * So an input of  /home/user/file1,/home/user/file2 would return
+   * file:///home/user/file1,file:///home/user/file2
+   * @param files
+   * @return
+   */
+  private String validateFiles(String files) throws IOException {
+    if (files == null) 
+      return null;
+    String[] fileArr = files.split(",");
+    String[] finalArr = new String[fileArr.length];
+    for (int i =0; i < fileArr.length; i++) {
+      String tmp = fileArr[i];
+      String finalPath;
+      Path path = new Path(tmp);
+      URI pathURI =  path.toUri();
+      if (pathURI.getScheme() == null) {
+        //default to the local file system
+        //check if the file exists or not first
+        if (!localFs.exists(path)) {
+          throw new FileNotFoundException("File " + tmp + " does not exist.");
+        }
+        finalPath = path.makeQualified(localFs).toString();
+      }
+      else {
+        // check if the file exists in this file system
+        // we need to recreate this filesystem object to copy
+        // these files to the file system jobtracker is running
+        // on.
+        FileSystem fs = path.getFileSystem(getConf());
+        if (!fs.exists(path)) {
+          throw new FileNotFoundException("File " + tmp + " does not exist.");
+        }
+        finalPath = path.makeQualified(fs).toString();
+        try {
+          fs.close();
+        } catch(IOException e){};
+      }
+      finalArr[i] = finalPath;
+    }
+    return StringUtils.arrayToString(finalArr);
+  }
+  
+  /**
+   * run method from Tool
+   */
+  public int run(String argv[]) throws Exception {
+    int exitCode = -1;
+    Configuration conf = getConf();
+    localFs = FileSystem.getLocal(conf);
+    CommandLine cmdLine = null;
+    try{
+      try {
+        cmdLine = parser.parse(opts, argv, true);
+      } catch(Exception ie) {
+        LOG.error(ie.getMessage());
+        printUsage();
+      }
+      // get the options and set it in this 
+      // objects conf and then update the jobclient
+      // with this config
+      if (cmdLine != null) {
+        String allFiles = (String) cmdLine.getOptionValue("files");
+        String alllibJars = (String) cmdLine.getOptionValue("libjars");
+        String allArchives = (String) cmdLine.getOptionValue("archives");
+        if (allFiles != null) {
+          String allFilesVal = validateFiles(allFiles);
+          conf.set("tmpfiles", allFilesVal);
+        }
+        if (alllibJars != null) {
+          String alllibJarsVal = validateFiles(alllibJars);
+          conf.set("tmpjars", alllibJarsVal);
+        }
+        if (allArchives != null) {
+          String allArchivesVal = validateFiles(allArchives);
+          conf.set("tmparchives", allArchivesVal);
+        }
+        JobClient.setCommandLineConfig(conf);
+        try {
+          // pass the rest of arguments to Runjar
+          String[] args = cmdLine.getArgs();
+          if (args.length == 0) {
+           printUsage();
+           return -1;
+          }
+          RunJar.main(cmdLine.getArgs());
+          exitCode = 0;
+        } catch(Throwable th) {
+          System.err.println(StringUtils.stringifyException(th));
+        }
+      }
+      
+    } catch(RuntimeException re) {
+      exitCode = -1;
+      System.err.println(re.getLocalizedMessage());
+    }
+    return exitCode;
+  }
+  
+  private void printUsage() {
+    System.out.println("Usage: $HADOOP_HOME/bin/hadoop \\");
+    System.out.println("       [--config dir] jar \\");
+    System.out.println("       [-libjars <comma seperated list of jars>] \\");
+    System.out.println("       [-archives <comma seperated list of archives>] \\");
+    System.out.println("       [-files <comma seperated list of files>] \\");
+    System.out.println("       jarFile [mainClass] args");
+  }
+  
+  public static void main(String[] argv) throws Exception {
+    JobShell jshell = new JobShell();
+    int res;
+    res = ToolRunner.run(jshell, argv);
+    System.exit(res);
+  }
+}

Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java?rev=641577&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java Wed Mar 26 14:10:17 2008
@@ -0,0 +1,80 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+
+/**
+ * check for the jobshell options of 
+ * -libjars -files -archives
+ */
+
+public class TestJobShell extends TestCase {
+  // Input output paths for this.. 
+  // these are all dummy and does not test
+  // much in map reduce except for the command line
+  // params 
+  static final Path input = new Path("/test/input/");
+  static final Path output = new Path("/test/output");
+  public void testJobShell() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fs = null;
+    Path testFile = new Path(input, "testfile");
+    try {
+      Configuration conf = new Configuration();
+      //start the mini mr and dfs cluster.
+      dfs = new MiniDFSCluster(conf, 2 , true, null);
+      fs = dfs.getFileSystem();
+      FSDataOutputStream stream = fs.create(testFile);
+      stream.write("teststring".getBytes());
+      stream.close();
+      mr = new MiniMRCluster(2, fs.getUri().toString(), 1);
+      JobConf jconf = mr.createJobConf();
+      JobShell jshell = new JobShell();
+      File f = new File("files_tmp");
+      FileOutputStream fstream = new FileOutputStream(f);
+      fstream.write("somestrings".getBytes());
+      fstream.close();
+      String[] args = new String[8];
+      args[0] = "-files";
+      args[1] = "files_tmp";
+      args[2] = "-libjars";
+      /// the testjob.jar as a temporary jar file 
+      // rather than creating its own
+      args[3] = "build/test/testjar/testjob.jar";
+      args[4] = "build/test/testshell/testshell.jar";
+      args[5] = "testshell.ExternalMapReduce";
+      args[6] = input.toString();
+      args[7] = output.toString();
+      int ret = ToolRunner.run(jconf, jshell, args);
+      assertTrue("not failed ", ret != -1);
+    } finally {
+      if (dfs != null) {dfs.shutdown();};
+      if (mr != null) {mr.shutdown();};
+    }
+  }
+}
\ No newline at end of file

Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Added: hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java?rev=641577&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java (added)
+++ hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java Wed Mar 26 14:10:17 2008
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package testshell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * will be in an external jar and used for 
+ * test in TestJobShell.java.
+ */
+public class ExternalMapReduce
+  implements Mapper<WritableComparable, Writable,
+                    WritableComparable, IntWritable>,
+             Reducer<WritableComparable, Writable,
+                     WritableComparable, IntWritable> {
+
+  public void configure(JobConf job) {
+    // do nothing
+  }
+
+  public void close()
+    throws IOException {
+
+  }
+
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector<WritableComparable, IntWritable> output,
+                  Reporter reporter)
+    throws IOException {
+    //check for classpath
+    String classpath = System.getProperty("java.class.path");
+    if (classpath.indexOf("testjob.jar") == -1) {
+      throw new IOException("failed to find in the library " + classpath);
+    }
+    File f = new File("files_tmp");
+    //check for files 
+    if (!f.exists()) {
+      throw new IOException("file file_tmpfile not found");
+    }
+  }
+
+  public void reduce(WritableComparable key, Iterator<Writable> values,
+                     OutputCollector<WritableComparable, IntWritable> output,
+                     Reporter reporter)
+    throws IOException {
+   //do nothing
+  }
+  
+  public static int main(String[] argv) throws IOException {
+    if (argv.length < 2) {
+      System.out.println("ExternalMapReduce <input> <output>");
+      return -1;
+    }
+    Path outDir = new Path(argv[1]);
+    Path input = new Path(argv[0]);
+    Configuration commandConf = JobClient.getCommandLineConfig();
+    JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
+    testConf.setJobName("external job");
+    testConf.setInputPath(input);
+    testConf.setOutputPath(outDir);
+    testConf.setMapperClass(ExternalMapReduce.class);
+    testConf.setReducerClass(ExternalMapReduce.class);
+    testConf.setNumReduceTasks(1);
+    JobClient.runJob(testConf);
+    return 0;
+  }
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package testshell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * will be in an external jar and used for 
+ * test in TestJobShell.java.
+ */
+public class ExternalMapReduce
+  implements Mapper<WritableComparable, Writable,
+                    WritableComparable, IntWritable>,
+             Reducer<WritableComparable, Writable,
+                     WritableComparable, IntWritable> {
+
+  public void configure(JobConf job) {
+    // do nothing
+  }
+
+  public void close()
+    throws IOException {
+
+  }
+
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector<WritableComparable, IntWritable> output,
+                  Reporter reporter)
+    throws IOException {
+    //check for classpath
+    String classpath = System.getProperty("java.class.path");
+    if (classpath.indexOf("testjob.jar") == -1) {
+      throw new IOException("failed to find in the library " + classpath);
+    }
+    File f = new File("files_tmp");
+    //check for files 
+    if (!f.exists()) {
+      throw new IOException("file file_tmpfile not found");
+    }
+  }
+
+  public void reduce(WritableComparable key, Iterator<Writable> values,
+                     OutputCollector<WritableComparable, IntWritable> output,
+                     Reporter reporter)
+    throws IOException {
+   //do nothing
+  }
+  
+  public static int main(String[] argv) throws IOException {
+    if (argv.length < 2) {
+      System.out.println("ExternalMapReduce <input> <output>");
+      return -1;
+    }
+    Path outDir = new Path(argv[1]);
+    Path input = new Path(argv[0]);
+    Configuration commandConf = JobClient.getCommandLineConfig();
+    JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
+    testConf.setJobName("external job");
+    testConf.setInputPath(input);
+    testConf.setOutputPath(outDir);
+    testConf.setMapperClass(ExternalMapReduce.class);
+    testConf.setReducerClass(ExternalMapReduce.class);
+    testConf.setNumReduceTasks(1);
+    JobClient.runJob(testConf);
+    return 0;
+  }
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package testshell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * will be in an external jar and used for 
+ * test in TestJobShell.java.
+ */
+public class ExternalMapReduce
+  implements Mapper<WritableComparable, Writable,
+                    WritableComparable, IntWritable>,
+             Reducer<WritableComparable, Writable,
+                     WritableComparable, IntWritable> {
+
+  public void configure(JobConf job) {
+    // do nothing
+  }
+
+  public void close()
+    throws IOException {
+
+  }
+
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector<WritableComparable, IntWritable> output,
+                  Reporter reporter)
+    throws IOException {
+    //check for classpath
+    String classpath = System.getProperty("java.class.path");
+    if (classpath.indexOf("testjob.jar") == -1) {
+      throw new IOException("failed to find in the library " + classpath);
+    }
+    File f = new File("files_tmp");
+    //check for files 
+    if (!f.exists()) {
+      throw new IOException("file file_tmpfile not found");
+    }
+  }
+
+  public void reduce(WritableComparable key, Iterator<Writable> values,
+                     OutputCollector<WritableComparable, IntWritable> output,
+                     Reporter reporter)
+    throws IOException {
+   //do nothing
+  }
+  
+  public static int main(String[] argv) throws IOException {
+    if (argv.length < 2) {
+      System.out.println("ExternalMapReduce <input> <output>");
+      return -1;
+    }
+    Path outDir = new Path(argv[1]);
+    Path input = new Path(argv[0]);
+    Configuration commandConf = JobClient.getCommandLineConfig();
+    JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
+    testConf.setJobName("external job");
+    testConf.setInputPath(input);
+    testConf.setOutputPath(outDir);
+    testConf.setMapperClass(ExternalMapReduce.class);
+    testConf.setReducerClass(ExternalMapReduce.class);
+    testConf.setNumReduceTasks(1);
+    JobClient.runJob(testConf);
+    return 0;
+  }
+}

Propchange: hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL