You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/03 15:57:56 UTC
svn commit: r662801 - in /hadoop/core/trunk: ./ bin/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/
src/test/org/apache/hadoop/mapred/ src/test/testshell/
Author: ddas
Date: Tue Jun 3 06:57:56 2008
New Revision: 662801
URL: http://svn.apache.org/viewvc?rev=662801&view=rev
Log:
HADOOP-3417. Removes the static configuration variable, commandLineConfig from JobClient. Moves the cli parsing from JobShell to GenericOptionsParser. Thus removes the class org.apache.hadoop.mapred.JobShell. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/bin/hadoop
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
hadoop/core/trunk/src/java/org/apache/hadoop/util/GenericOptionsParser.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662801&r1=662800&r2=662801&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jun 3 06:57:56 2008
@@ -397,6 +397,12 @@
HADOOP-3454. Fix Text::find to search only valid byte ranges. (Chad Whipkey
via cdouglas)
+ HADOOP-3417. Removes the static configuration variable, commandLineConfig from
+ JobClient. Moves the cli parsing from JobShell to GenericOptionsParser.
+ Thus removes the class org.apache.hadoop.mapred.JobShell.
+ (Amareshwari Sriramadasu via ddas)
+
+
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=662801&r1=662800&r2=662801&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Tue Jun 3 06:57:56 2008
@@ -226,7 +226,7 @@
CLASS=org.apache.hadoop.util.VersionInfo
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "jar" ] ; then
- CLASS=org.apache.hadoop.mapred.JobShell
+ CLASS=org.apache.hadoop.util.RunJar
elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.util.CopyFiles
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=662801&r1=662800&r2=662801&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Jun 3 06:57:56 2008
@@ -30,7 +30,6 @@
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -38,8 +37,6 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Random;
import javax.security.auth.login.LoginException;
@@ -60,9 +57,6 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -157,7 +151,6 @@
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient");
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
- private static Configuration commandLineConfig;
static long MAX_JOBPROFILE_AGE = 1000 * 2;
/**
@@ -357,23 +350,6 @@
}
/**
- * set the command line config in the jobclient. these are
- * parameters paassed from the command line and stored in
- * conf
- * @param conf the configuration object to set.
- */
- static void setCommandLineConfig(Configuration conf) {
- commandLineConfig = conf;
- }
-
- /**
- * return the command line configuration
- */
- public static Configuration getCommandLineConfig() {
- return commandLineConfig;
- }
-
- /**
* Connect to the default {@link JobTracker}.
* @param conf the job configuration.
* @throws IOException
@@ -499,15 +475,12 @@
throws IOException {
// get all the command line arguments into the
// jobconf passed in by the user conf
- Configuration commandConf = JobClient.getCommandLineConfig();
String files = null;
String libjars = null;
String archives = null;
- if (commandConf != null) {
- files = commandConf.get("tmpfiles");
- libjars = commandConf.get("tmpjars");
- archives = commandConf.get("tmparchives");
- }
+ files = job.get("tmpfiles");
+ libjars = job.get("tmpjars");
+ archives = job.get("tmparchives");
/*
* set this user's id in job configuration, so later job files can be
* accessed using this user's id
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java?rev=662801&r1=662800&r2=662801&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobShell.java Tue Jun 3 06:57:56 2008
@@ -1,223 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.RunJar;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.logging.*;
-
-
-/** Provide command line parsing for JobSubmission
- * job submission looks like
- * hadoop jar -libjars <comma seperated jars> -archives <comma seperated archives>
- * -files <comma seperated files> inputjar args
- */
-public class JobShell extends Configured implements Tool {
- private FileSystem localFs;
- protected static final Log LOG = LogFactory.getLog(JobShell.class.getName());
- /**
- * cli implementation for
- * job shell.
- */
- private CommandLineParser parser = new GnuParser();
- private Options opts = new Options();
-
- public JobShell() {this(null);};
-
- public JobShell(Configuration conf) {
- super(conf);
- setUpOptions();
- }
-
- /**
- * a method to create an option
- */
- @SuppressWarnings("static-access")
- private Option createOption(String name, String desc,
- String argName, int max) {
- return OptionBuilder.withArgName(argName).
- hasArg().withDescription(desc).
- create(name);
- }
-
-
- /**
- * set up options
- * specific to jobshell
- */
- private void setUpOptions() {
- Option libjar = createOption("libjars",
- "comma seperated jar files to " +
- "include in the classpath.",
- "paths",
- 1);
- Option file = createOption("files",
- "comma seperated files to be copied to the " +
- "map reduce cluster.",
- "paths",
- 1);
- Option archives = createOption("archives",
- "comma seperated archives to be unarchives" +
- " on the compute machines.",
- "paths",
- 1);
- opts.addOption(libjar);
- opts.addOption(file);
- opts.addOption(archives);
- }
-
- protected void init() throws IOException {
- getConf().setQuietMode(true);
- }
-
- /**
- * takes input as a comma separated list of files
- * and verifies if they exist. It defaults for file:///
- * if the files specified do not have a scheme.
- * it returns the paths uri converted defaulting to file:///.
- * So an input of /home/user/file1,/home/user/file2 would return
- * file:///home/user/file1,file:///home/user/file2
- * @param files
- * @return
- */
- private String validateFiles(String files) throws IOException {
- if (files == null)
- return null;
- String[] fileArr = files.split(",");
- String[] finalArr = new String[fileArr.length];
- for (int i =0; i < fileArr.length; i++) {
- String tmp = fileArr[i];
- String finalPath;
- Path path = new Path(tmp);
- URI pathURI = path.toUri();
- if (pathURI.getScheme() == null) {
- //default to the local file system
- //check if the file exists or not first
- if (!localFs.exists(path)) {
- throw new FileNotFoundException("File " + tmp + " does not exist.");
- }
- finalPath = path.makeQualified(localFs).toString();
- }
- else {
- // check if the file exists in this file system
- // we need to recreate this filesystem object to copy
- // these files to the file system jobtracker is running
- // on.
- FileSystem fs = path.getFileSystem(getConf());
- if (!fs.exists(path)) {
- throw new FileNotFoundException("File " + tmp + " does not exist.");
- }
- finalPath = path.makeQualified(fs).toString();
- try {
- fs.close();
- } catch(IOException e){};
- }
- finalArr[i] = finalPath;
- }
- return StringUtils.arrayToString(finalArr);
- }
-
- /**
- * run method from Tool
- */
- public int run(String argv[]) throws Exception {
- int exitCode = -1;
- Configuration conf = getConf();
- localFs = FileSystem.getLocal(conf);
- CommandLine cmdLine = null;
- try{
- try {
- cmdLine = parser.parse(opts, argv, true);
- } catch(Exception ie) {
- LOG.error(ie.getMessage());
- printUsage();
- }
- // get the options and set it in this
- // objects conf and then update the jobclient
- // with this config
- if (cmdLine != null) {
- String allFiles = (String) cmdLine.getOptionValue("files");
- String alllibJars = (String) cmdLine.getOptionValue("libjars");
- String allArchives = (String) cmdLine.getOptionValue("archives");
- if (allFiles != null) {
- String allFilesVal = validateFiles(allFiles);
- conf.set("tmpfiles", allFilesVal);
- }
- if (alllibJars != null) {
- String alllibJarsVal = validateFiles(alllibJars);
- conf.set("tmpjars", alllibJarsVal);
- }
- if (allArchives != null) {
- String allArchivesVal = validateFiles(allArchives);
- conf.set("tmparchives", allArchivesVal);
- }
- JobClient.setCommandLineConfig(conf);
- try {
- // pass the rest of arguments to Runjar
- String[] args = cmdLine.getArgs();
- if (args.length == 0) {
- printUsage();
- return -1;
- }
- RunJar.main(cmdLine.getArgs());
- exitCode = 0;
- } catch(Throwable th) {
- System.err.println(StringUtils.stringifyException(th));
- }
- }
-
- } catch(RuntimeException re) {
- exitCode = -1;
- System.err.println(re.getLocalizedMessage());
- }
- return exitCode;
- }
-
- private void printUsage() {
- System.out.println("Usage: $HADOOP_HOME/bin/hadoop \\");
- System.out.println(" [--config dir] jar \\");
- System.out.println(" [-libjars <comma seperated list of jars>] \\");
- System.out.println(" [-archives <comma seperated list of archives>] \\");
- System.out.println(" [-files <comma seperated list of files>] \\");
- System.out.println(" jarFile [mainClass] args");
- }
-
- public static void main(String[] argv) throws Exception {
- JobShell jshell = new JobShell();
- int res;
- res = ToolRunner.run(jshell, argv);
- System.exit(res);
- }
-}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/GenericOptionsParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/GenericOptionsParser.java?rev=662801&r1=662800&r2=662801&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/GenericOptionsParser.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/GenericOptionsParser.java Tue Jun 3 06:57:56 2008
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.util;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.PrintStream;
+import java.net.URI;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -49,6 +52,13 @@
* -D <property=value> use value for given property
* -fs <local|namenode:port> specify a namenode
* -jt <local|jobtracker:port> specify a job tracker
+ * -files <comma separated list of files> specify comma separated
+ * files to be copied to the map reduce cluster
+ * -libjars <comma separated list of jars> specify comma separated
+ * jar files to include in the classpath.
+ * -archives <comma separated list of archives> specify comma
+ * separated archives to be unarchived on the compute machines.
+
* </pre></blockquote></p>
*
* <p>The general command line syntax is:</p>
@@ -80,6 +90,10 @@
*
* $ bin/hadoop job -jt local -submit job.xml
* submit a job to local runner
+ *
+ * $ bin/hadoop jar -libjars testlib.jar
+ * -archives test.tgz -files file.txt inputjar args
+ * job submission with libjars, files and archives
* </pre></blockquote></p>
*
* @see Tool
@@ -167,12 +181,29 @@
.withArgPattern("=", 1)
.withDescription("use value for given property")
.create('D');
+ Option libjars = OptionBuilder.withArgName("paths")
+ .hasArg()
+ .withDescription("comma separated jar files to include in the classpath.")
+ .create("libjars");
+ Option files = OptionBuilder.withArgName("paths")
+ .hasArg()
+ .withDescription("comma separated files to be copied to the " +
+ "map reduce cluster")
+ .create("files");
+ Option archives = OptionBuilder.withArgName("paths")
+ .hasArg()
+ .withDescription("comma separated archives to be unarchived" +
+ " on the compute machines.")
+ .create("archives");
opts.addOption(fs);
opts.addOption(jt);
opts.addOption(oconf);
opts.addOption(property);
-
+ opts.addOption(libjars);
+ opts.addOption(files);
+ opts.addOption(archives);
+
return opts;
}
@@ -193,6 +224,22 @@
if (line.hasOption("conf")) {
conf.addResource(new Path(line.getOptionValue("conf")));
}
+ try {
+ if (line.hasOption("libjars")) {
+ conf.set("tmpjars",
+ validateFiles(line.getOptionValue("libjars"), conf));
+ }
+ if (line.hasOption("files")) {
+ conf.set("tmpfiles",
+ validateFiles(line.getOptionValue("files"), conf));
+ }
+ if (line.hasOption("archives")) {
+ conf.set("tmparchives",
+ validateFiles(line.getOptionValue("archives"), conf));
+ }
+ } catch (IOException ioe) {
+ System.err.println(StringUtils.stringifyException(ioe));
+ }
if (line.hasOption('D')) {
String[] property = line.getOptionValues('D');
for(int i=0; i<property.length-1; i=i+2) {
@@ -203,6 +250,55 @@
}
/**
+ * takes input as a comma separated list of files
+ * and verifies if they exist. It defaults for file:///
+ * if the files specified do not have a scheme.
+ * it returns the paths uri converted defaulting to file:///.
+ * So an input of /home/user/file1,/home/user/file2 would return
+ * file:///home/user/file1,file:///home/user/file2
+ * @param files
+ * @return
+ */
+ private String validateFiles(String files, Configuration conf) throws IOException {
+ if (files == null)
+ return null;
+ String[] fileArr = files.split(",");
+ String[] finalArr = new String[fileArr.length];
+ for (int i =0; i < fileArr.length; i++) {
+ String tmp = fileArr[i];
+ String finalPath;
+ Path path = new Path(tmp);
+ URI pathURI = path.toUri();
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (pathURI.getScheme() == null) {
+ //default to the local file system
+ //check if the file exists or not first
+ if (!localFs.exists(path)) {
+ throw new FileNotFoundException("File " + tmp + " does not exist.");
+ }
+ finalPath = path.makeQualified(localFs).toString();
+ }
+ else {
+ // check if the file exists in this file system
+ // we need to recreate this filesystem object to copy
+ // these files to the file system jobtracker is running
+ // on.
+ FileSystem fs = path.getFileSystem(conf);
+ if (!fs.exists(path)) {
+ throw new FileNotFoundException("File " + tmp + " does not exist.");
+ }
+ finalPath = path.makeQualified(fs).toString();
+ try {
+ fs.close();
+ } catch(IOException e){};
+ }
+ finalArr[i] = finalPath;
+ }
+ return StringUtils.arrayToString(finalArr);
+ }
+
+
+ /**
* Parse the user-specified options, get the generic options, and modify
* configuration accordingly
* @param conf Configuration to be modified
@@ -236,7 +332,14 @@
out.println("-conf <configuration file> specify an application configuration file");
out.println("-D <property=value> use value for given property");
out.println("-fs <local|namenode:port> specify a namenod");
- out.println("-jt <local|jobtracker:port> specify a job tracker\n");
+ out.println("-jt <local|jobtracker:port> specify a job tracker");
+ out.println("-files <comma separated list of fiels> " +
+ "specify comma separated files to be copied to the map reduce cluster");
+ out.println("-libjars <comma seperated list of jars> " +
+ "specify comma separated jar files to include in the classpath.");
+ out.println("-archives <comma separated list of archives> " +
+ "specify comma separated archives to be unarchived" +
+ " on the compute machines.\n");
out.println("The general command line syntax is");
out.println("bin/hadoop command [genericOptions] [commandOptions]\n");
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java?rev=662801&r1=662800&r2=662801&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobShell.java Tue Jun 3 06:57:56 2008
@@ -1,80 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.dfs.MiniDFSCluster;
-
-/**
- * check for the jobshell options of
- * -libjars -files -archives
- */
-
-public class TestJobShell extends TestCase {
- // Input output paths for this..
- // these are all dummy and does not test
- // much in map reduce except for the command line
- // params
- static final Path input = new Path("/test/input/");
- static final Path output = new Path("/test/output");
- public void testJobShell() throws Exception {
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fs = null;
- Path testFile = new Path(input, "testfile");
- try {
- Configuration conf = new Configuration();
- //start the mini mr and dfs cluster.
- dfs = new MiniDFSCluster(conf, 2 , true, null);
- fs = dfs.getFileSystem();
- FSDataOutputStream stream = fs.create(testFile);
- stream.write("teststring".getBytes());
- stream.close();
- mr = new MiniMRCluster(2, fs.getUri().toString(), 1);
- JobConf jconf = mr.createJobConf();
- JobShell jshell = new JobShell();
- File f = new File("files_tmp");
- FileOutputStream fstream = new FileOutputStream(f);
- fstream.write("somestrings".getBytes());
- fstream.close();
- String[] args = new String[8];
- args[0] = "-files";
- args[1] = "files_tmp";
- args[2] = "-libjars";
- /// the testjob.jar as a temporary jar file
- // rather than creating its own
- args[3] = "build/test/testjar/testjob.jar";
- args[4] = "build/test/testshell/testshell.jar";
- args[5] = "testshell.ExternalMapReduce";
- args[6] = input.toString();
- args[7] = output.toString();
- int ret = ToolRunner.run(jconf, jshell, args);
- assertTrue("not failed ", ret != -1);
- } finally {
- if (dfs != null) {dfs.shutdown();};
- if (mr != null) {mr.shutdown();};
- }
- }
-}
\ No newline at end of file
Modified: hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java?rev=662801&r1=662800&r2=662801&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java (original)
+++ hadoop/core/trunk/src/test/testshell/ExternalMapReduce.java Tue Jun 3 06:57:56 2008
@@ -18,11 +18,11 @@
package testshell;
-import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
@@ -31,20 +31,19 @@
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
/**
* will be in an external jar and used for
* test in TestJobShell.java.
*/
-public class ExternalMapReduce
- implements Mapper<WritableComparable, Writable,
- WritableComparable, IntWritable>,
- Reducer<WritableComparable, Writable,
- WritableComparable, IntWritable> {
+public class ExternalMapReduce extends Configured implements Tool {
public void configure(JobConf job) {
// do nothing
@@ -55,58 +54,69 @@
}
- public void map(WritableComparable key, Writable value,
- OutputCollector<WritableComparable, IntWritable> output,
- Reporter reporter)
- throws IOException {
- //check for classpath
- String classpath = System.getProperty("java.class.path");
- if (classpath.indexOf("testjob.jar") == -1) {
- throw new IOException("failed to find in the library " + classpath);
- }
- //fork off ls to see if the file exists.
- // java file.exists() will not work on
- // cygwin since it is a symlink
- String[] argv = new String[2];
- argv[0] = "ls";
- argv[1] = "files_tmp";
- Process p = Runtime.getRuntime().exec(argv);
- int ret = -1;
- try {
- ret = p.waitFor();
- } catch(InterruptedException ie) {
- //do nothing here.
+ public static class MapClass extends MapReduceBase
+ implements Mapper<WritableComparable, Writable,
+ WritableComparable, IntWritable> {
+ public void map(WritableComparable key, Writable value,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //check for classpath
+ String classpath = System.getProperty("java.class.path");
+ if (classpath.indexOf("testjob.jar") == -1) {
+ throw new IOException("failed to find in the library " + classpath);
+ }
+ //fork off ls to see if the file exists.
+ // java file.exists() will not work on
+ // cygwin since it is a symlink
+ String[] argv = new String[2];
+ argv[0] = "ls";
+ argv[1] = "files_tmp";
+ Process p = Runtime.getRuntime().exec(argv);
+ int ret = -1;
+ try {
+ ret = p.waitFor();
+ } catch(InterruptedException ie) {
+ //do nothing here.
+ }
+ if (ret != 0) {
+ throw new IOException("files_tmp does not exist");
+ }
}
- if (ret != 0) {
- throw new IOException("files_tmp does not exist");
- }
-
- //check for files
}
- public void reduce(WritableComparable key, Iterator<Writable> values,
- OutputCollector<WritableComparable, IntWritable> output,
- Reporter reporter)
- throws IOException {
- //do nothing
+ public static class Reduce extends MapReduceBase
+ implements Reducer<WritableComparable, Writable,
+ WritableComparable, IntWritable> {
+ public void reduce(WritableComparable key, Iterator<Writable> values,
+ OutputCollector<WritableComparable, IntWritable> output,
+ Reporter reporter)
+ throws IOException {
+ //do nothing
+ }
}
- public static int main(String[] argv) throws IOException {
+ public int run(String[] argv) throws IOException {
if (argv.length < 2) {
System.out.println("ExternalMapReduce <input> <output>");
return -1;
}
Path outDir = new Path(argv[1]);
Path input = new Path(argv[0]);
- Configuration commandConf = JobClient.getCommandLineConfig();
- JobConf testConf = new JobConf(commandConf, ExternalMapReduce.class);
+ JobConf testConf = new JobConf(getConf(), ExternalMapReduce.class);
testConf.setJobName("external job");
FileInputFormat.setInputPaths(testConf, input);
FileOutputFormat.setOutputPath(testConf, outDir);
- testConf.setMapperClass(ExternalMapReduce.class);
- testConf.setReducerClass(ExternalMapReduce.class);
+ testConf.setMapperClass(MapClass.class);
+ testConf.setReducerClass(Reduce.class);
testConf.setNumReduceTasks(1);
JobClient.runJob(testConf);
return 0;
}
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(),
+ new ExternalMapReduce(), args);
+ System.exit(res);
+ }
}