You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/03/26 22:10:19 UTC
svn commit: r641577 - in /hadoop/core/trunk: ./ bin/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/test/testshell/
Author: dhruba
Date: Wed Mar 26 14:10:17 2008
New Revision: 641577
URL: http://svn.apache.org/viewvc?rev=641577&view=rev
Log:
HADOOP-1622. Allow multiple jar files for map reduce.
(Mahadev Konar via dhruba)
Added:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java (with props)
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java (with props)
hadoop/core/trunk/src/test/testshell/
hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java (with props)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/bin/hadoop
hadoop/core/trunk/build.xml
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 26 14:10:17 2008
@@ -78,6 +78,9 @@
HADOOP-2951. Add a contrib module that provides a utility to
build or update Lucene indexes using Map/Reduce. (Ning Li via cutting)
+ HADOOP-1622. Allow multiple jar files for map reduce.
+ (Mahadev Konar via dhruba)
+
IMPROVEMENTS
HADOOP-2655. Copy on write for data and metadata files in the
Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Wed Mar 26 14:10:17 2008
@@ -212,7 +212,7 @@
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.hadoop.util.VersionInfo
elif [ "$COMMAND" = "jar" ] ; then
- CLASS=org.apache.hadoop.util.RunJar
+ CLASS=org.apache.hadoop.mapred.JobShell
elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.util.CopyFiles
elif [ "$COMMAND" = "daemonlog" ] ; then
Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Wed Mar 26 14:10:17 2008
@@ -83,6 +83,7 @@
<property name="test.log.dir" value="${test.build.dir}/logs"/>
<property name="test.build.classes" value="${test.build.dir}/classes"/>
<property name="test.build.testjar" value="${test.build.dir}/testjar"/>
+ <property name="test.build.testshell" value="${test.build.dir}/testshell"/>
<property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
<property name="test.include" value="Test*"/>
<property name="test.classpath.id" value="test.classpath"/>
@@ -192,6 +193,7 @@
<mkdir dir="${test.build.dir}"/>
<mkdir dir="${test.build.classes}"/>
<mkdir dir="${test.build.testjar}"/>
+ <mkdir dir="${test.build.testshell}"/>
<tempfile property="touch.temp.file" destDir="${java.io.tmpdir}"/>
<touch millis="0" file="${touch.temp.file}">
<fileset dir="${conf.dir}" includes="**/*.template"/>
@@ -498,6 +500,23 @@
<jar jarfile="${test.build.testjar}/testjob.jar"
basedir="${test.build.testjar}">
</jar>
+ <javac
+ encoding="${build.encoding}"
+ srcdir="${test.src.dir}/testshell"
+ includes="*.java"
+ destdir="${test.build.testshell}"
+ debug="${javac.debug}"
+ optimize="${javac.optimize}"
+ target="${javac.version}"
+ source="${javac.version}"
+ deprecation="${javac.deprecation}">
+ <compilerarg line="${javac.args} ${javac.args.warnings}"/>
+ <classpath refid="test.classpath"/>
+ </javac>
+ <delete file="${test.build.testshell}/testshell.jar"/>
+ <jar jarfile="${test.build.testshell}/testshell.jar"
+ basedir="${test.build.testshell}">
+ </jar>
<delete dir="${test.cache.data}"/>
<mkdir dir="${test.cache.data}"/>
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=641577&r1=641576&r2=641577&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Mar 26 14:10:17 2008
@@ -28,11 +28,14 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
+import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
@@ -48,6 +51,7 @@
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
@@ -155,7 +159,7 @@
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
-
+ private static Configuration commandLineConfig;
static long MAX_JOBPROFILE_AGE = 1000 * 2;
/**
@@ -341,7 +345,24 @@
setConf(conf);
init(conf);
}
-
+
+ /**
+ * set the command line config in the jobclient. these are
+ * parameters paassed from the command line and stored in
+ * conf
+ * @param conf the configuration object to set.
+ */
+ static void setCommandLineConfig(Configuration conf) {
+ commandLineConfig = conf;
+ }
+
+ /**
+ * return the command line configuration
+ */
+ public static Configuration getCommandLineConfig() {
+ return commandLineConfig;
+ }
+
/**
* Connect to the default {@link JobTracker}.
* @param conf the job configuration.
@@ -417,50 +438,87 @@
}
return fs;
}
-
- /**
- * Submit a job to the MR system.
- *
- * This returns a handle to the {@link RunningJob} which can be used to track
- * the running-job.
- *
- * @param jobFile the job configuration.
- * @return a handle to the {@link RunningJob} which can be used to track the
- * running-job.
- * @throws FileNotFoundException
- * @throws InvalidJobConfException
- * @throws IOException
+
+ /* see if two file systems are the same or not
+ *
*/
- public RunningJob submitJob(String jobFile) throws FileNotFoundException,
- InvalidJobConfException,
- IOException {
- // Load in the submitted job details
- JobConf job = new JobConf(jobFile);
- return submitJob(job);
+ private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
+ URI srcUri = srcFs.getUri();
+ URI dstUri = destFs.getUri();
+ if (srcUri.getScheme() == null) {
+ return false;
+ }
+ if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+ return false;
+ }
+ String srcHost = srcUri.getHost();
+ String dstHost = dstUri.getHost();
+ if ((srcHost != null) && (dstHost != null)) {
+ try {
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ } catch(UnknownHostException ue) {
+ return false;
+ }
+ if (!srcHost.equals(dstHost)) {
+ return false;
+ }
+ }
+ else if (srcHost == null && dstHost != null) {
+ return false;
+ }
+ else if (srcHost != null && dstHost == null) {
+ return false;
+ }
+ //check for ports
+ if (srcUri.getPort() != dstUri.getPort()) {
+ return false;
+ }
+ return true;
}
-
- // job files are world-wide readable and owner writable
- final private static FsPermission JOB_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
- // system directories are world-wide readable and owner readable
- final static FsPermission SYSTEM_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0733); // rwx-wx-wx
-
+ // copies a file to the jobtracker filesystem and returns the path where it
+ // was copied to
+ private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath,
+ JobConf job, short replication) throws IOException {
+ //check if we do not need to copy the files
+ // is jt using the same file system.
+ // just checking for uri strings... doing no dns lookups
+ // to see if the filesystems are the same. This is not optimal.
+ // but avoids name resolution.
+
+ FileSystem remoteFs = null;
+ remoteFs = originalPath.getFileSystem(job);
+ if (compareFs(remoteFs, jtFs)) {
+ return originalPath;
+ }
+ // this might have name collisions. copy will throw an exception
+ //parse the original path to create new path
+ Path newPath = new Path(parentDir, originalPath.getName());
+ FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job);
+ jtFs.setReplication(newPath, replication);
+ return newPath;
+ }
+
/**
- * Submit a job to the MR system.
- * This returns a handle to the {@link RunningJob} which can be used to track
- * the running-job.
- *
- * @param job the job configuration.
- * @return a handle to the {@link RunningJob} which can be used to track the
- * running-job.
- * @throws FileNotFoundException
- * @throws InvalidJobConfException
+ * configure the jobconf of the user with the command line options of
+ * -libjars, -files, -archives
+ * @param conf
* @throws IOException
*/
- public RunningJob submitJob(JobConf job) throws FileNotFoundException,
- InvalidJobConfException, IOException {
+ private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile)
+ throws IOException {
+ // get all the command line arguments into the
+ // jobconf passed in by the user conf
+ Configuration commandConf = JobClient.getCommandLineConfig();
+ String files = null;
+ String libjars = null;
+ String archives = null;
+ if (commandConf != null) {
+ files = commandConf.get("tmpfiles");
+ libjars = commandConf.get("tmpjars");
+ archives = commandConf.get("tmparchives");
+ }
/*
* set this user's id in job configuration, so later job files can be
* accessed using this user's id
@@ -482,17 +540,66 @@
//
// Create a number of filenames in the JobTracker's fs namespace
- String jobId = jobSubmitClient.getNewJobId();
- Path submitJobDir = new Path(job.getSystemDir(), jobId);
FileSystem fs = getFs();
LOG.debug("default FileSystem: " + fs.getUri());
- fs.delete(submitJobDir, true);
- FileSystem.mkdirs(fs, submitJobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
- Path submitJobFile = new Path(submitJobDir, "job.xml");
- Path submitJarFile = new Path(submitJobDir, "job.jar");
- Path submitSplitFile = new Path(submitJobDir, "job.split");
-
- // set the timestamps of the archives and files
+ fs.delete(submitJobDir, true);
+ submitJobDir = fs.makeQualified(submitJobDir);
+ submitJobDir = new Path(submitJobDir.toUri().getPath());
+ FsPermission mapredSysPerms = new FsPermission(SYSTEM_DIR_PERMISSION);
+ FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
+ Path filesDir = new Path(submitJobDir, "files");
+ Path archivesDir = new Path(submitJobDir, "archives");
+ Path libjarsDir = new Path(submitJobDir, "libjars");
+ short replication = (short)job.getInt("mapred.submit.replication", 10);
+ // add all the command line files/ jars and archive
+ // first copy them to jobtrackers filesystem
+
+ if (files != null) {
+ FileSystem.mkdirs(fs, filesDir, mapredSysPerms);
+ String[] fileArr = files.split(",");
+ for (String tmpFile: fileArr) {
+ Path tmp = new Path(tmpFile);
+ Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);
+ try {
+ URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
+ DistributedCache.addCacheFile(pathURI, job);
+ } catch(URISyntaxException ue) {
+ //should not throw a uri exception
+ throw new IOException("Failed to create uri for " + tmpFile);
+ }
+ DistributedCache.createSymlink(job);
+ }
+ }
+
+ if (libjars != null) {
+ FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);
+ String[] libjarsArr = libjars.split(",");
+ for (String tmpjars: libjarsArr) {
+ Path tmp = new Path(tmpjars);
+ Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
+ DistributedCache.addArchiveToClassPath(newPath, job);
+ }
+ }
+
+
+ if (archives != null) {
+ FileSystem.mkdirs(fs, archivesDir, mapredSysPerms);
+ String[] archivesArr = archives.split(",");
+ for (String tmpArchives: archivesArr) {
+ Path tmp = new Path(tmpArchives);
+ Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);
+ try {
+ URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
+ DistributedCache.addCacheArchive(pathURI, job);
+ } catch(URISyntaxException ue) {
+ //should not throw an uri excpetion
+ throw new IOException("Failed to create uri for " + tmpArchives);
+ }
+ DistributedCache.createSymlink(job);
+ }
+ }
+
+ // set the timestamps of the archives and files
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
StringBuffer archiveTimestamps =
@@ -516,7 +623,6 @@
}
String originalJarPath = job.getJar();
- short replication = (short)job.getInt("mapred.submit.replication", 10);
if (originalJarPath != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
@@ -538,6 +644,63 @@
job.setWorkingDirectory(fs.getWorkingDirectory());
}
+ }
+
+ /**
+ * Submit a job to the MR system.
+ *
+ * This returns a handle to the {@link RunningJob} which can be used to track
+ * the running-job.
+ *
+ * @param jobFile the job configuration.
+ * @return a handle to the {@link RunningJob} which can be used to track the
+ * running-job.
+ * @throws FileNotFoundException
+ * @throws InvalidJobConfException
+ * @throws IOException
+ */
+ public RunningJob submitJob(String jobFile) throws FileNotFoundException,
+ InvalidJobConfException,
+ IOException {
+ // Load in the submitted job details
+ JobConf job = new JobConf(jobFile);
+ return submitJob(job);
+ }
+
+ // job files are world-wide readable and owner writable
+ final private static FsPermission JOB_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ // system directories are world-wide readable and owner readable
+ final static FsPermission SYSTEM_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0733); // rwx-wx-wx
+
+ /**
+ * Submit a job to the MR system.
+ * This returns a handle to the {@link RunningJob} which can be used to track
+ * the running-job.
+ *
+ * @param job the job configuration.
+ * @return a handle to the {@link RunningJob} which can be used to track the
+ * running-job.
+ * @throws FileNotFoundException
+ * @throws InvalidJobConfException
+ * @throws IOException
+ */
+ public RunningJob submitJob(JobConf job) throws FileNotFoundException,
+ InvalidJobConfException, IOException {
+ /*
+ * configure the command line options correctly on the submitting dfs
+ */
+
+ String jobId = jobSubmitClient.getNewJobId();
+ Path submitJobDir = new Path(job.getSystemDir(), jobId);
+ Path submitJarFile = new Path(submitJobDir, "job.jar");
+ Path submitSplitFile = new Path(submitJobDir, "job.split");
+ configureCommandLineOptions(job, submitJobDir, submitJarFile);
+ Path submitJobFile = new Path(submitJobDir, "job.xml");
+
+
// Check the input specification
job.getInputFormat().validateInput(job);
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java?rev=641577&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java Wed Mar 26 14:10:17 2008
@@ -0,0 +1,223 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.*;
+
+
+/** Provide command line parsing for JobSubmission
+ * job submission looks like
+ * hadoop jar -libjars <comma seperated jars> -archives <comma seperated archives>
+ * -files <comma seperated files> inputjar args
+ */
+public class JobShell extends Configured implements Tool {
+ private FileSystem localFs;
+ protected static final Log LOG = LogFactory.getLog(JobShell.class.getName());
+ /**
+ * cli implementation for
+ * job shell.
+ */
+ private CommandLineParser parser = new GnuParser();
+ private Options opts = new Options();
+
+ public JobShell() {this(null);};
+
+ public JobShell(Configuration conf) {
+ super(conf);
+ setUpOptions();
+ }
+
+ /**
+ * a method to create an option
+ */
+ @SuppressWarnings("static-access")
+ private Option createOption(String name, String desc,
+ String argName, int max) {
+ return OptionBuilder.withArgName(argName).
+ hasArg().withDescription(desc).
+ create(name);
+ }
+
+
+ /**
+ * set up options
+ * specific to jobshell
+ */
+ private void setUpOptions() {
+ Option libjar = createOption("libjars",
+ "comma seperated jar files to " +
+ "include in the classpath.",
+ "paths",
+ 1);
+ Option file = createOption("files",
+ "comma seperated files to be copied to the " +
+ "map reduce cluster.",
+ "paths",
+ 1);
+ Option archives = createOption("archives",
+ "comma seperated archives to be unarchives" +
+ " on the compute machines.",
+ "paths",
+ 1);
+ opts.addOption(libjar);
+ opts.addOption(file);
+ opts.addOption(archives);
+ }
+
+ protected void init() throws IOException {
+ getConf().setQuietMode(true);
+ }
+
+ /**
+ * takes input as a comma separated list of files
+ * and verifies if they exist. It defaults for file:///
+ * if the files specified do not have a scheme.
+ * it returns the paths uri converted defaulting to file:///.
+ * So an input of /home/user/file1,/home/user/file2 would return
+ * file:///home/user/file1,file:///home/user/file2
+ * @param files
+ * @return
+ */
+ private String validateFiles(String files) throws IOException {
+ if (files == null)
+ return null;
+ String[] fileArr = files.split(",");
+ String[] finalArr = new String[fileArr.length];
+ for (int i =0; i < fileArr.length; i++) {
+ String tmp = fileArr[i];
+ String finalPath;
+ Path path = new Path(tmp);
+ URI pathURI = path.toUri();
+ if (pathURI.getScheme() == null) {
+ //default to the local file system
+ //check if the file exists or not first
+ if (!localFs.exists(path)) {
+ throw new FileNotFoundException("File " + tmp + " does not exist.");
+ }
+ finalPath = path.makeQualified(localFs).toString();
+ }
+ else {
+ // check if the file exists in this file system
+ // we need to recreate this filesystem object to copy
+ // these files to the file system jobtracker is running
+ // on.
+ FileSystem fs = path.getFileSystem(getConf());
+ if (!fs.exists(path)) {
+ throw new FileNotFoundException("File " + tmp + " does not exist.");
+ }
+ finalPath = path.makeQualified(fs).toString();
+ try {
+ fs.close();
+ } catch(IOException e){};
+ }
+ finalArr[i] = finalPath;
+ }
+ return StringUtils.arrayToString(finalArr);
+ }
+
+ /**
+ * run method from Tool
+ */
+ public int run(String argv[]) throws Exception {
+ int exitCode = -1;
+ Configuration conf = getConf();
+ localFs = FileSystem.getLocal(conf);
+ CommandLine cmdLine = null;
+ try{
+ try {
+ cmdLine = parser.parse(opts, argv, true);
+ } catch(Exception ie) {
+ LOG.error(ie.getMessage());
+ printUsage();
+ }
+ // get the options and set it in this
+ // objects conf and then update the jobclient
+ // with this config
+ if (cmdLine != null) {
+ String allFiles = (String) cmdLine.getOptionValue("files");
+ String alllibJars = (String) cmdLine.getOptionValue("libjars");
+ String allArchives = (String) cmdLine.getOptionValue("archives");
+ if (allFiles != null) {
+ String allFilesVal = validateFiles(allFiles);
+ conf.set("tmpfiles", allFilesVal);
+ }
+ if (alllibJars != null) {
+ String alllibJarsVal = validateFiles(alllibJars);
+ conf.set("tmpjars", alllibJarsVal);
+ }
+ if (allArchives != null) {
+ String allArchivesVal = validateFiles(allArchives);
+ conf.set("tmparchives", allArchivesVal);
+ }
+ JobClient.setCommandLineConfig(conf);
+ try {
+ // pass the rest of arguments to Runjar
+ String[] args = cmdLine.getArgs();
+ if (args.length == 0) {
+ printUsage();
+ return -1;
+ }
+ RunJar.main(cmdLine.getArgs());
+ exitCode = 0;
+ } catch(Throwable th) {
+ System.err.println(StringUtils.stringifyException(th));
+ }
+ }
+
+ } catch(RuntimeException re) {
+ exitCode = -1;
+ System.err.println(re.getLocalizedMessage());
+ }
+ return exitCode;
+ }
+
+ private void printUsage() {
+ System.out.println("Usage: $HADOOP_HOME/bin/hadoop \\");
+ System.out.println(" [--config dir] jar \\");
+ System.out.println(" [-libjars <comma seperated list of jars>] \\");
+ System.out.println(" [-archives <comma seperated list of archives>] \\");
+ System.out.println(" [-files <comma seperated list of files>] \\");
+ System.out.println(" jarFile [mainClass] args");
+ }
+
+ public static void main(String[] argv) throws Exception {
+ JobShell jshell = new JobShell();
+ int res;
+ res = ToolRunner.run(jshell, argv);
+ System.exit(res);
+ }
+}
Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java?rev=641577&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java Wed Mar 26 14:10:17 2008
@@ -0,0 +1,80 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+
+/**
+ * check for the jobshell options of
+ * -libjars -files -archives
+ */
+
+public class TestJobShell extends TestCase {
+ // Input output paths for this..
+ // these are all dummy and does not test
+ // much in map reduce except for the command line
+ // params
+ static final Path input = new Path("/test/input/");
+ static final Path output = new Path("/test/output");
+ public void testJobShell() throws Exception {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fs = null;
+ Path testFile = new Path(input, "testfile");
+ try {
+ Configuration conf = new Configuration();
+ //start the mini mr and dfs cluster.
+ dfs = new MiniDFSCluster(conf, 2 , true, null);
+ fs = dfs.getFileSystem();
+ FSDataOutputStream stream = fs.create(testFile);
+ stream.write("teststring".getBytes());
+ stream.close();
+ mr = new MiniMRCluster(2, fs.getUri().toString(), 1);
+ JobConf jconf = mr.createJobConf();
+ JobShell jshell = new JobShell();
+ File f = new File("files_tmp");
+ FileOutputStream fstream = new FileOutputStream(f);
+ fstream.write("somestrings".getBytes());
+ fstream.close();
+ String[] args = new String[8];
+ args[0] = "-files";
+ args[1] = "files_tmp";
+ args[2] = "-libjars";
+ /// the testjob.jar as a temporary jar file
+ // rather than creating its own
+ args[3] = "build/test/testjar/testjob.jar";
+ args[4] = "build/test/testshell/testshell.jar";
+ args[5] = "testshell.ExternalMapReduce";
+ args[6] = input.toString();
+ args[7] = output.toString();
+ int ret = ToolRunner.run(jconf, jshell, args);
+ assertTrue("not failed ", ret != -1);
+ } finally {
+ if (dfs != null) {dfs.shutdown();};
+ if (mr != null) {mr.shutdown();};
+ }
+ }
+}
\ No newline at end of file
Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL
Added: hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java?rev=641577&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java (added)
+++ hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java Wed Mar 26 14:10:17 2008
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package testshell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * will be in an external jar and used for
+ * test in TestJobShell.java.
+ */
+public class ExternalMapReduce
+ implements Mapper<WritableComparable, Writable,
+ WritableComparable, IntWritable>,
+ Reducer<WritableComparable, Writable,
+ WritableComparable, IntWritable> {
+
+ public void configure(JobConf job) {
+ // do nothing
+ }
+
+ public void close()
+ throws IOException {
+
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //check for classpath
+ String classpath = System.getProperty("java.class.path");
+ if (classpath.indexOf("testjob.jar") == -1) {
+ throw new IOException("failed to find in the library " + classpath);
+ }
+ File f = new File("files_tmp");
+ //check for files
+ if (!f.exists()) {
+ throw new IOException("file file_tmpfile not found");
+ }
+ }
+
+ public void reduce(WritableComparable key, Iterator<Writable> values,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //do nothing
+ }
+
+ public static int main(String[] argv) throws IOException {
+ if (argv.length < 2) {
+ System.out.println("ExternalMapReduce <input> <output>");
+ return -1;
+ }
+ Path outDir = new Path(argv[1]);
+ Path input = new Path(argv[0]);
+ Configuration commandConf = JobClient.getCommandLineConfig();
+ JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
+ testConf.setJobName("external job");
+ testConf.setInputPath(input);
+ testConf.setOutputPath(outDir);
+ testConf.setMapperClass(ExternalMapReduce.class);
+ testConf.setReducerClass(ExternalMapReduce.class);
+ testConf.setNumReduceTasks(1);
+ JobClient.runJob(testConf);
+ return 0;
+ }
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package testshell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * will be in an external jar and used for
+ * test in TestJobShell.java.
+ */
+public class ExternalMapReduce
+ implements Mapper<WritableComparable, Writable,
+ WritableComparable, IntWritable>,
+ Reducer<WritableComparable, Writable,
+ WritableComparable, IntWritable> {
+
+ public void configure(JobConf job) {
+ // do nothing
+ }
+
+ public void close()
+ throws IOException {
+
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //check for classpath
+ String classpath = System.getProperty("java.class.path");
+ if (classpath.indexOf("testjob.jar") == -1) {
+ throw new IOException("failed to find in the library " + classpath);
+ }
+ File f = new File("files_tmp");
+ //check for files
+ if (!f.exists()) {
+ throw new IOException("file file_tmpfile not found");
+ }
+ }
+
+ public void reduce(WritableComparable key, Iterator<Writable> values,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //do nothing
+ }
+
+ public static int main(String[] argv) throws IOException {
+ if (argv.length < 2) {
+ System.out.println("ExternalMapReduce <input> <output>");
+ return -1;
+ }
+ Path outDir = new Path(argv[1]);
+ Path input = new Path(argv[0]);
+ Configuration commandConf = JobClient.getCommandLineConfig();
+ JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
+ testConf.setJobName("external job");
+ testConf.setInputPath(input);
+ testConf.setOutputPath(outDir);
+ testConf.setMapperClass(ExternalMapReduce.class);
+ testConf.setReducerClass(ExternalMapReduce.class);
+ testConf.setNumReduceTasks(1);
+ JobClient.runJob(testConf);
+ return 0;
+ }
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package testshell;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * will be in an external jar and used for
+ * test in TestJobShell.java.
+ */
+public class ExternalMapReduce
+ implements Mapper<WritableComparable, Writable,
+ WritableComparable, IntWritable>,
+ Reducer<WritableComparable, Writable,
+ WritableComparable, IntWritable> {
+
+ public void configure(JobConf job) {
+ // do nothing
+ }
+
+ public void close()
+ throws IOException {
+
+ }
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //check for classpath
+ String classpath = System.getProperty("java.class.path");
+ if (classpath.indexOf("testjob.jar") == -1) {
+ throw new IOException("failed to find in the library " + classpath);
+ }
+ File f = new File("files_tmp");
+ //check for files
+ if (!f.exists()) {
+ throw new IOException("file file_tmpfile not found");
+ }
+ }
+
+ public void reduce(WritableComparable key, Iterator<Writable> values,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //do nothing
+ }
+
+ public static int main(String[] argv) throws IOException {
+ if (argv.length < 2) {
+ System.out.println("ExternalMapReduce <input> <output>");
+ return -1;
+ }
+ Path outDir = new Path(argv[1]);
+ Path input = new Path(argv[0]);
+ Configuration commandConf = JobClient.getCommandLineConfig();
+ JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
+ testConf.setJobName("external job");
+ testConf.setInputPath(input);
+ testConf.setOutputPath(outDir);
+ testConf.setMapperClass(ExternalMapReduce.class);
+ testConf.setReducerClass(ExternalMapReduce.class);
+ testConf.setNumReduceTasks(1);
+ JobClient.runJob(testConf);
+ return 0;
+ }
+}
Propchange: hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
------------------------------------------------------------------------------
svn:keywords = Id Revision HeadURL