You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/03 21:33:28 UTC
svn commit: r534970 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Author: cutting
Date: Thu May 3 12:33:27 2007
New Revision: 534970
URL: http://svn.apache.org/viewvc?view=rev&rev=534970
Log:
HADOOP-1315. Clean up contrib/streaming, switching it to use more core classes and removing unused classes. Contributed by Runping.
Removed:
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu May 3 12:33:27 2007
@@ -316,6 +316,9 @@
that killed the heartbeat monitoring thread.
(Dhruba Borthakur via cutting)
+94. HADOOP-1315. Clean up contrib/streaming, switching it to use core
+ classes more and removing unused code. (Runping Qi via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu May 3 12:33:27 2007
@@ -19,8 +19,6 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.net.Socket;
-import java.net.URI;
import java.nio.charset.CharacterCodingException;
import java.io.IOException;
import java.util.Date;
@@ -29,14 +27,11 @@
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Properties;
-import java.util.regex.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.PhasedFileSystem;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.util.StringUtils;
@@ -45,7 +40,6 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
/** Shared functionality for PipeMapper, PipeReducer.
@@ -60,60 +54,12 @@
*/
abstract String getPipeCommand(JobConf job);
- /*
- */
- abstract String getKeyColPropName();
-
abstract char getFieldSeparator();
abstract int getNumOfKeyFields();
-
- /** Write output as side-effect files rather than as map outputs.
- This is useful to do "Map" tasks rather than "MapReduce" tasks. */
- boolean getUseSideEffect() {
- return false;
- }
abstract boolean getDoPipe();
- /**
- * @returns how many TABS before the end of the key part
- * usually: 1 or "ALL"
- * used for tool output of both Map and Reduce
- * configured via tool's argv: splitKeyVal=ALL or 1..
- * although it is interpreted here, not by tool
- */
- int getKeyColsFromPipeCommand(String cmd) {
- String key = getKeyColPropName();
- Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
- Matcher match = kcPat.matcher(cmd);
- String kc;
- if (!match.matches()) {
- kc = null;
- } else {
- kc = match.group(1);
- }
-
- int cols;
- if (kc == null) {
- // default value is 1 and the Stream applications could instead
- // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
- cols = 1;
- } else if (kc.equals("ALL")) {
- cols = ALL_COLS;
- } else {
- try {
- cols = Integer.parseInt(kc);
- } catch (NumberFormatException nf) {
- cols = Integer.MAX_VALUE;
- }
- }
-
- System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
-
- return cols;
- }
-
final static int OUTSIDE = 1;
final static int SINGLEQ = 2;
final static int DOUBLEQ = 3;
@@ -164,54 +110,15 @@
return (String[]) argList.toArray(new String[0]);
}
- OutputStream getURIOutputStream(URI uri, boolean allowSocket) throws IOException {
- final String SOCKET = "socket";
- if (uri.getScheme().equals(SOCKET)) {
- if (!allowSocket) {
- throw new IOException(SOCKET + " not allowed on outputstream " + uri);
- }
- final Socket sock = new Socket(uri.getHost(), uri.getPort());
- OutputStream out = new FilterOutputStream(sock.getOutputStream()) {
- public void close() throws IOException {
- sock.close();
- super.close();
- }
- };
- return out;
- } else {
- // a FSDataOutputStreamm, localFS or HDFS.
- // localFS file may be set up as a FIFO.
- return sideFs_.create(new Path(uri.getSchemeSpecificPart()));
- }
- }
-
- String getSideEffectFileName() {
- FileSplit split = StreamUtil.getCurrentSplit(job_);
- return new String(split.getPath().getName() + "-" + split.getStart() +
- "-" + split.getLength());
- }
-
public void configure(JobConf job) {
try {
String argv = getPipeCommand(job);
- keyCols_ = getKeyColsFromPipeCommand(argv);
-
- debug_ = (job.get("stream.debug") != null);
- if (debug_) {
- System.out.println("PipeMapRed: stream.debug=true");
- }
-
joinDelay_ = job.getLong("stream.joindelay.milli", 0);
job_ = job;
fs_ = FileSystem.get(job_);
- if (job_.getBoolean("stream.sideoutput.localfs", false)) {
- //sideFs_ = new LocalFileSystem(job_);
- sideFs_ = FileSystem.getLocal(job_);
- } else {
- sideFs_ = fs_;
- }
+
String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t");
String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t");
this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0);
@@ -219,22 +126,12 @@
this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1);
this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
- if (debug_) {
- System.out.println("kind :" + this.getClass());
- System.out.println("split :" + StreamUtil.getCurrentSplit(job_));
- System.out.println("fs :" + fs_.toString());
- System.out.println("sideFs :" + sideFs_.toString());
- }
doPipe_ = getDoPipe();
if (!doPipe_) return;
setStreamJobDetails(job);
- setStreamProperties();
-
- if (debugFailEarly_) {
- throw new RuntimeException("debugFailEarly_");
- }
+
String[] argvSplit = splitArgs(argv);
String prog = argvSplit[0];
File currentDir = new File(".").getAbsoluteFile();
@@ -245,39 +142,6 @@
FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
}
- if (job_.getInputValueClass().equals(BytesWritable.class)) {
- // TODO expose as separate config:
- // job or semistandard inputformat property
- optUseKey_ = false;
- }
-
- optSideEffect_ = getUseSideEffect();
-
- if (optSideEffect_) {
- // during work: use a completely unique filename to avoid HDFS namespace conflicts
- // after work: rename to a filename that depends only on the workload (the FileSplit)
- // it's a friendly name and in case of reexecution it will clobber.
- // reexecution can be due to: other job, failed task and speculative task
- // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then:
- // client has renamed outputPath and saved the argv's original output path as:
- if (useSingleSideOutputURI_) {
- finalOutputURI = new URI(sideOutputURI_);
- sideEffectPathFinal_ = null; // in-place, no renaming to final
- } else {
- sideFs_ = new PhasedFileSystem(sideFs_, job);
- String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath()
- String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale
- sideEffectPathFinal_ = new Path(sideOutputPath, fileName);
- finalOutputURI = new URI(sideEffectPathFinal_.toString()); // implicit dfs:
- }
- // apply default scheme
- if (finalOutputURI.getScheme() == null) {
- finalOutputURI = new URI("file", finalOutputURI.getSchemeSpecificPart(), null);
- }
- boolean allowSocket = useSingleSideOutputURI_;
- sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket);
- }
-
//
// argvSplit[0]:
// An absolute path should be a preexisting valid path on all TaskTrackers
@@ -295,8 +159,6 @@
f = null;
}
logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
- logprintln("sideEffectURI_=" + finalOutputURI);
-
Environment childEnv = (Environment) StreamUtil.env().clone();
addJobConfToEnvironment(job_, childEnv);
addEnvironment(childEnv, job_.get("stream.addenvironment"));
@@ -327,34 +189,6 @@
logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
}
taskId_ = StreamUtil.getTaskInfo(job_);
- debugFailEarly_ = isDebugFail("early");
- debugFailDuring_ = isDebugFail("during");
- debugFailLate_ = isDebugFail("late");
-
- sideOutputURI_ = job_.get("stream.sideoutput.uri");
- useSingleSideOutputURI_ = (sideOutputURI_ != null);
- }
-
- boolean isDebugFail(String kind) {
- String execidlist = job_.get("stream.debugfail.reexec." + kind);
- if (execidlist == null) {
- return false;
- }
- String[] e = execidlist.split(",");
- for (int i = 0; i < e.length; i++) {
- int ei = Integer.parseInt(e[i]);
- if (taskId_.execid == ei) {
- return true;
- }
- }
- return false;
- }
-
- void setStreamProperties() {
- String s = System.getProperty("stream.port");
- if (s != null) {
- reportPortPlusOne_ = Integer.parseInt(s);
- }
}
void logStackTrace(Exception e) {
@@ -442,7 +276,6 @@
if (log_ != null) {
StreamUtil.exec("/bin/rm " + LOGNAME, log_);
}
- // TODO socket-based aggregator (in JobTrackerInfoServer)
}
void startOutputThreads(OutputCollector output, Reporter reporter) {
@@ -474,14 +307,7 @@
* @throws IOException
*/
void splitKeyVal(byte[] line, Text key, Text val) throws IOException {
- int pos = -1;
- if (keyCols_ != ALL_COLS) {
- pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
- }
- LOG.info("FieldSeparator: " + this.getFieldSeparator());
- LOG.info("NumOfKeyFields: " + this.getNumOfKeyFields());
- LOG.info("Line: " + new String (line));
- LOG.info("Pos: " + pos);
+ int pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
try {
if (pos == -1) {
key.set(line);
@@ -508,15 +334,10 @@
Text val = new Text();
// 3/4 Tool to Hadoop
while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) {
- // 4/4 Hadoop out
- if (optSideEffect_) {
- sideEffectOut_.write(answer);
- sideEffectOut_.write('\n');
- sideEffectOut_.flush();
- } else {
- splitKeyVal(answer, key, val);
- output.collect(key, val);
- }
+
+ splitKeyVal(answer, key, val);
+ output.collect(key, val);
+
numRecWritten_++;
long now = System.currentTimeMillis();
if (now-lastStdoutReport > reporterOutDelay_) {
@@ -584,18 +405,6 @@
} catch (IOException io) {
}
waitOutputThreads();
- try {
- if (optSideEffect_) {
- logprintln("closing " + finalOutputURI);
- if (sideEffectOut_ != null) sideEffectOut_.close();
- logprintln("closed " + finalOutputURI);
- if (!useSingleSideOutputURI_) {
- ((PhasedFileSystem)sideFs_).commit();
- }
- }
- } catch (IOException io) {
- io.printStackTrace();
- }
if (sim != null) sim.destroy();
} catch (RuntimeException e) {
logStackTrace(e);
@@ -692,18 +501,11 @@
long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
- int keyCols_;
- final static int ALL_COLS = Integer.MAX_VALUE;
-
long reporterOutDelay_ = 10*1000L;
long reporterErrDelay_ = 10*1000L;
long joinDelay_;
JobConf job_;
FileSystem fs_;
- FileSystem sideFs_;
-
- // generic MapRed parameters passed on by hadoopStreaming
- int reportPortPlusOne_;
boolean doPipe_;
boolean debug_;
@@ -723,17 +525,6 @@
String mapredKey_;
int numExceptions_;
StreamUtil.TaskId taskId_;
-
- boolean optUseKey_ = true;
-
- private boolean optSideEffect_;
- private URI finalOutputURI;
- private Path sideEffectPathFinal_;
-
- private boolean useSingleSideOutputURI_;
- private String sideOutputURI_;
-
- private OutputStream sideEffectOut_;
protected volatile Throwable outerrThreadsThrowable;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu May 3 12:33:27 2007
@@ -25,8 +25,10 @@
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
@@ -36,6 +38,8 @@
*/
public class PipeMapper extends PipeMapRed implements Mapper {
+ private boolean ignoreKey = false;
+
String getPipeCommand(JobConf job) {
String str = job.get("stream.map.streamprocessor");
if (str == null) {
@@ -50,17 +54,15 @@
}
}
- String getKeyColPropName() {
- return "mapKeyCols";
- }
-
- boolean getUseSideEffect() {
- return StreamUtil.getUseMapSideEffect(job_);
- }
-
boolean getDoPipe() {
return true;
}
+
+ public void configure(JobConf job) {
+ super.configure(job);
+ String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
+ this.ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+ }
// Do NOT declare default constructor
// (MapRed creates it reflectively)
@@ -86,7 +88,7 @@
// 2/4 Hadoop to Tool
if (numExceptions_ == 0) {
- if (optUseKey_) {
+ if (!this.ignoreKey) {
write(key);
clientOut_.write('\t');
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Thu May 3 12:33:27 2007
@@ -57,10 +57,6 @@
return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
}
- String getKeyColPropName() {
- return "reduceKeyCols";
- }
-
public void reduce(WritableComparable key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Thu May 3 12:33:27 2007
@@ -22,7 +22,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,7 +40,6 @@
import org.apache.commons.cli2.option.PropertyOption;
import org.apache.commons.cli2.resource.ResourceConstants;
import org.apache.commons.cli2.util.HelpFormatter;
-import org.apache.commons.cli2.validation.FileValidator;
import org.apache.commons.cli2.validation.InvalidArgumentException;
import org.apache.commons.cli2.validation.Validator;
import org.apache.commons.logging.*;
@@ -61,6 +59,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
@@ -73,8 +72,7 @@
protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
final static String REDUCE_NONE = "NONE";
- private boolean reducerNone_;
-
+
/** -----------Streaming CLI Implementation **/
private DefaultOptionBuilder builder =
new DefaultOptionBuilder("-","-", false);
@@ -214,7 +212,6 @@
inputSpecs_.addAll(cmdLine.getValues("-input"));
output_ = (String) cmdLine.getValue("-output");
- mapsideoutURI_ = (String) cmdLine.getValue("-mapsideoutput");
mapCmd_ = (String)cmdLine.getValue("-mapper");
comCmd_ = (String)cmdLine.getValue("-combiner");
@@ -450,20 +447,17 @@
System.out.println("Options:");
System.out.println(" -input <path> DFS input file(s) for the Map step");
System.out.println(" -output <path> DFS output directory for the Reduce step");
- System.out.println(" -mapper <cmd> The streaming command to run");
- System.out.println(" -combiner <cmd> The streaming command to run");
- System.out.println(" -reducer <cmd> The streaming command to run");
+ System.out.println(" -mapper <cmd|JavaClassName> The streaming command to run");
+ System.out.println(" -combiner <JavaClassName> Combiner has to be a Java class");
+ System.out.println(" -reducer <cmd|JavaClassName> The streaming command to run");
System.out.println(" -file <file> File/dir to be shipped in the Job jar file");
- //Only advertise the standard way: [--config dir] in our launcher
- //System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
- //System.out.println(" -config <file> Optional. One or more paths to xml config files");
System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration");
System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration");
System.out.println(" -additionalconfspec specfile Optional.");
- System.out.println(" -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat Optional.");
- System.out.println(" -outputformat specfile Optional.");
- System.out.println(" -partitioner specfile Optional.");
- System.out.println(" -numReduceTasks specfile Optional.");
+ System.out.println(" -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.");
+ System.out.println(" -outputformat TextOutputFormat(default)|JavaClassName Optional.");
+ System.out.println(" -partitioner JavaClassName Optional.");
+ System.out.println(" -numReduceTasks <num> Optional.");
System.out.println(" -inputreader <spec> Optional.");
System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property");
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
@@ -478,10 +472,7 @@
System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
System.out.println("Default Map input format: a line is a record in UTF-8");
System.out.println(" the key part ends at first TAB, the rest of the line is the value");
- System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
- System.out
- .println(" comma-separated name-values can be specified to configure the InputFormat");
- System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
+ System.out.println("Custom input format: -inputformat package.MyInputFormat ");
System.out.println("Map output format, reduce input/output format:");
System.out.println(" Format defined by what the mapper command outputs. Line-oriented");
System.out.println();
@@ -489,34 +480,21 @@
System.out.println(" working directory when the mapper and reducer are run.");
System.out.println(" The location of this working directory is unspecified.");
System.out.println();
- //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
- //System.out.println(" Hadoop clusters. ");
- //System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml");
- //System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
- //System.out.println();
+ System.out.println("To set the number of reduce tasks (num. of output files):");
+ System.out.println(" -jobconf mapred.reduce.tasks=10");
System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
- System.out.println(" Use -reducer " + REDUCE_NONE);
+ System.out.println(" Use -numReduceTasks 0");
System.out
.println(" A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
System.out
.println(" This speeds up processing, This also feels more like \"in-place\" processing");
System.out.println(" because the input filename and the map input order are preserved");
- System.out.println("To specify a single side-effect output file");
- System.out.println(" -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated
- System.out.println(" If the jobtracker is local this is a local file");
- System.out.println(" This currently requires -reducer NONE");
+ System.out.println(" This equivalent -reducer NONE");
System.out.println();
- System.out.println("To set the number of reduce tasks (num. of output files):");
- System.out.println(" -jobconf mapred.reduce.tasks=10");
System.out.println("To speed up the last reduces:");
System.out.println(" -jobconf mapred.speculative.execution=true");
- System.out.println(" Do not use this along -reducer " + REDUCE_NONE);
System.out.println("To name the job (appears in the JobTracker Web UI):");
System.out.println(" -jobconf mapred.job.name='My Job' ");
- System.out.println("To specify that line-oriented input is in gzip format:");
- System.out
- .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
- System.out.println(" -jobconf stream.recordreader.compression=gzip ");
System.out.println("To change the local temp directory:");
System.out.println(" -jobconf dfs.data.dir=/tmp/dfs");
System.out.println(" -jobconf stream.tmpdir=/tmp/streaming");
@@ -681,8 +659,6 @@
config_.addFinalResource(new Path(pathName));
}
- testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge"));
-
// general MapRed job properties
jobConf_ = new JobConf(config_);
@@ -695,25 +671,32 @@
// (to resolve local vs. dfs drive letter differences)
// (mapred.working.dir will be lazily initialized ONCE and depends on FS)
for (int i = 0; i < inputSpecs_.size(); i++) {
- addInputSpec((String) inputSpecs_.get(i), i);
+ jobConf_.addInputPath(new Path(((String) inputSpecs_.get(i))));
}
- jobConf_.setBoolean("stream.inputtagged", inputTagged_);
jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
String defaultPackage = this.getClass().getPackage().getName();
Class c;
Class fmt = null;
if (inReaderSpec_ == null && inputFormatSpec_ == null) {
- fmt = KeyValueTextInputFormat.class;
+ fmt = TextInputFormat.class;
} else if (inputFormatSpec_ != null) {
- if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName())
- || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) {
- fmt = KeyValueTextInputFormat.class;
- } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName())
- || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) {
- fmt = SequenceFileInputFormat.class;
- } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName())
- || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) {
+ if (inputFormatSpec_.equals(TextInputFormat.class.getName())
+ || inputFormatSpec_.equals(TextInputFormat.class.getCanonicalName())) {
+ fmt = TextInputFormat.class;
+ } else if (inputFormatSpec_.equals(KeyValueTextInputFormat.class
+ .getName())
+ || inputFormatSpec_.equals(KeyValueTextInputFormat.class
+ .getCanonicalName())) {
+ } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class
+ .getName())
+ || inputFormatSpec_
+ .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class
+ .getCanonicalName())) {
+ } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
+ .getName())
+ || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
+ .getCanonicalName())) {
fmt = SequenceFileAsTextInputFormat.class;
} else {
c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
@@ -725,14 +708,7 @@
}
}
if (fmt == null) {
- if (testMerge_ && false == hasSimpleInputSpecs_) {
- // this ignores -inputreader
- fmt = MergerInputFormat.class;
- } else {
- // need to keep this case to support custom -inputreader
- // and their parameters ,n=v,n=v
- fmt = StreamInputFormat.class;
- }
+ fmt = StreamInputFormat.class;
}
jobConf_.setInputFormat(fmt);
@@ -757,13 +733,10 @@
c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
if (c != null) {
jobConf_.setCombinerClass(c);
- } else {
- jobConf_.setCombinerClass(PipeCombiner.class);
- jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(comCmd_, "UTF-8"));
- }
+ }
}
- reducerNone_ = false;
+ boolean reducerNone_ = false;
if (redCmd_ != null) {
reducerNone_ = redCmd_.equals(REDUCE_NONE);
if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
@@ -801,9 +774,7 @@
}
setUserJobConfProps(false);
- // output setup is done late so we can customize for reducerNone_
- //jobConf_.setOutputDir(new File(output_));
- setOutputSpec();
+ jobConf_.setOutputPath(new Path(output_));
fmt = null;
if (outputFormatSpec_!= null) {
c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage);
@@ -812,11 +783,7 @@
}
}
if (fmt == null) {
- if (testMerge_) {
- fmt = MuxOutputFormat.class;
- } else {
- fmt = TextOutputFormat.class;
- }
+ fmt = TextOutputFormat.class;
}
jobConf_.setOutputFormat(fmt);
@@ -831,6 +798,9 @@
int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
jobConf_.setNumReduceTasks(numReduceTasks);
}
+ if (reducerNone_) {
+ jobConf_.setNumReduceTasks(0);
+ }
// last, allow user to override anything
// (although typically used with properties we didn't touch)
@@ -880,78 +850,6 @@
}
msg("====");
}
-
- /** InputSpec-s encode: a glob pattern x additional column files x additional joins */
- protected void addInputSpec(String inSpec, int index) {
- if (!testMerge_) {
- jobConf_.addInputPath(new Path(inSpec));
- } else {
- CompoundDirSpec spec = new CompoundDirSpec(inSpec, true);
- msg("Parsed -input:\n" + spec.toTableString());
- if (index == 0) {
- hasSimpleInputSpecs_ = (spec.paths_.length == 0);
- msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_);
- }
- String primary = spec.primarySpec();
- if (!seenPrimary_.add(primary)) {
- // this won't detect glob overlaps and noncanonical path variations
- fail("Primary used in multiple -input spec: " + primary);
- }
- jobConf_.addInputPath(new Path(primary));
- // during Job execution, will reparse into a CompoundDirSpec
- jobConf_.set("stream.inputspecs." + index, inSpec);
- }
- }
-
- /** uses output_ and mapsideoutURI_ */
- protected void setOutputSpec() throws IOException {
- CompoundDirSpec spec = new CompoundDirSpec(output_, false);
- msg("Parsed -output:\n" + spec.toTableString());
- String primary = spec.primarySpec();
- String channel0;
- // TODO simplify cases, encapsulate in a StreamJobConf
- if (!reducerNone_) {
- channel0 = primary;
- } else {
- if (mapsideoutURI_ != null) {
- // user can override in case this is in a difft filesystem..
- try {
- URI uri = new URI(mapsideoutURI_);
- if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs")
- if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) {
- fail("Must be absolute: " + mapsideoutURI_);
- }
- } else if (uri.getScheme().equals("socket")) {
- // ok
- } else {
- fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_);
- }
- } catch (URISyntaxException e) {
- throw (IOException) new IOException().initCause(e);
- }
- }
- // an empty reduce output named "part-00002" will go here and not collide.
- channel0 = primary + ".NONE";
- // the side-effect of the first split of an input named "part-00002"
- // will go in this directory
- jobConf_.set("stream.sideoutput.dir", primary);
- // oops if user overrides low-level this isn't set yet :-(
- boolean localjt = StreamUtil.isLocalJobTracker(jobConf_);
- // just a guess user may prefer remote..
- jobConf_.setBoolean("stream.sideoutput.localfs", localjt);
- }
- // a path in fs.name.default filesystem
- System.out.println(channel0);
- System.out.println(new Path(channel0));
- jobConf_.setOutputPath(new Path(channel0));
- // will reparse remotely
- jobConf_.set("stream.outputspec", output_);
- if (null != mapsideoutURI_) {
- // a path in "jobtracker's filesystem"
- // overrides sideoutput.dir
- jobConf_.set("stream.sideoutput.uri", mapsideoutURI_);
- }
- }
protected String getJobTrackerHostPort() {
return jobConf_.get("mapred.job.tracker");
@@ -1099,15 +997,12 @@
// command-line arguments
protected ArrayList inputSpecs_ = new ArrayList(); // <String>
- protected boolean inputTagged_ = false;
protected TreeSet seenPrimary_ = new TreeSet(); // <String>
protected boolean hasSimpleInputSpecs_;
protected ArrayList packageFiles_ = new ArrayList(); // <String>
protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
- //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>();
protected String output_;
- protected String mapsideoutURI_;
protected String mapCmd_;
protected String comCmd_;
protected String redCmd_;
@@ -1124,8 +1019,6 @@
protected String partitionerSpec_;
protected String numReduceTasksSpec_;
protected String additionalConfSpec_;
-
- protected boolean testMerge_;
// Use to communicate config to the external processes (ex env.var.HADOOP_USER)
// encoding "a=b c=d"
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Thu May 3 12:33:27 2007
@@ -453,20 +453,6 @@
return res;
}
- static boolean getUseMapSideEffect(JobConf job) {
- String reduce = job.get("stream.reduce.streamprocessor");
- if (reduce == null) {
- return false;
- }
- try {
- reduce = URLDecoder.decode(reduce, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- System.err.println("stream.reduce.streamprocessor in jobconf not found");
- return false;
- }
- return StreamJob.REDUCE_NONE.equals(reduce);
- }
-
public static void touch(File file) throws IOException {
file = file.getAbsoluteFile();
FileOutputStream out = new FileOutputStream(file);
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Thu May 3 12:33:27 2007
@@ -19,10 +19,8 @@
package org.apache.hadoop.streaming;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.PushbackInputStream;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.LineRecordReader;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Thu May 3 12:33:27 2007
@@ -38,10 +38,11 @@
protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
// map behaves like "/usr/bin/tr . \\n"; (split words into lines)
protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
- // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
+ // reduce behave like /usr/bin/uniq. But also prepend lines with R.
+ // command-line combiner does not have any effect any more.
protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"});
protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
- protected String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
+ protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
private StreamJob job;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Thu May 3 12:33:27 2007
@@ -41,7 +41,7 @@
// test that some JobConf properties are exposed as expected
// Note the dots translated to underscore:
// property names have been escaped in PipeMapRed.safeEnvVarName()
- expect("mapred_input_format_class", "org.apache.hadoop.mapred.KeyValueTextInputFormat");
+ expect("mapred_input_format_class", "org.apache.hadoop.mapred.TextInputFormat");
expect("mapred_job_tracker", "local");
//expect("mapred_local_dir", "build/test/mapred/local");
expectDefined("mapred_local_dir");