You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/03/10 02:50:47 UTC
svn commit: r1299138 - in /hadoop/common/branches/branch-1.0: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/ src...
Author: acmurthy
Date: Sat Mar 10 01:50:45 2012
New Revision: 1299138
URL: http://svn.apache.org/viewvc?rev=1299138&view=rev
Log:
Merge -c 1299136 from branch-1 to branch-1.0 to fix HADOOP-1722. Allow hadoop streaming to handle non-utf8 byte array. Contributed by Klaas Bosteels and Matthias Lehmann.
Added:
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
- copied from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/
- copied from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/
- copied from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java
- copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java
Modified:
hadoop/common/branches/branch-1.0/CHANGES.txt
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sat Mar 10 01:50:45 2012
@@ -12,6 +12,9 @@ Release 1.0.2 - unreleased
MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
via acmurthy)
+ HADOOP-1722. Allow hadoop streaming to handle non-utf8 byte array. (Klaas
+ Bosteels and Matthias Lehmann via acmurthy)
+
BUG FIXES
HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Sat Mar 10 01:50:45 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.streaming;
+import java.util.Arrays;
+
import org.apache.hadoop.util.ToolRunner;
/** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
@@ -27,11 +29,28 @@ import org.apache.hadoop.util.ToolRunner
public class HadoopStreaming {
public static void main(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("No Arguments Given!");
+ System.exit(1);
+ }
int returnStatus = 0;
- StreamJob job = new StreamJob();
- returnStatus = ToolRunner.run(job, args);
+ String cmd = args[0];
+ String[] remainingArgs = Arrays.copyOfRange(args, 1, args.length);
+ if (cmd.equalsIgnoreCase("dumptb")) {
+ DumpTypedBytes dumptb = new DumpTypedBytes();
+ returnStatus = ToolRunner.run(dumptb, remainingArgs);
+ } else if (cmd.equalsIgnoreCase("loadtb")) {
+ LoadTypedBytes loadtb = new LoadTypedBytes();
+ returnStatus = ToolRunner.run(loadtb, remainingArgs);
+ } else if (cmd.equalsIgnoreCase("streamjob")) {
+ StreamJob job = new StreamJob();
+ returnStatus = ToolRunner.run(job, remainingArgs);
+ } else { // for backward compatibility
+ StreamJob job = new StreamJob();
+ returnStatus = ToolRunner.run(job, args);
+ }
if (returnStatus != 0) {
- System.err.println("Streaming Job Failed!");
+ System.err.println("Streaming Command Failed!");
System.exit(returnStatus);
}
}
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Sat Mar 10 01:50:45 2012
@@ -19,8 +19,6 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.nio.charset.CharacterCodingException;
-import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.Iterator;
@@ -30,16 +28,20 @@ import java.util.Properties;
import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.io.TextInputWriter;
+import org.apache.hadoop.streaming.io.TextOutputReader;
import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.UTF8ByteArrayUtils;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.fs.FileSystem;
@@ -49,15 +51,49 @@ public abstract class PipeMapRed {
protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
- /** The command to be spawned as a subprocess.
- * Mapper/Reducer operations will delegate to it
+ /**
+ * Returns the Configuration.
*/
- abstract String getPipeCommand(JobConf job);
-
- abstract byte[] getFieldSeparator();
+ public Configuration getConfiguration() {
+ return job_;
+ }
+
+ /**
+ * Returns the DataOutput to which the client input is written.
+ */
+ public DataOutput getClientOutput() {
+ return clientOut_;
+ }
+
+ /**
+ * Returns the DataInput from which the client output is read.
+ */
+ public DataInput getClientInput() {
+ return clientIn_;
+ }
+
+ /**
+ * Returns the input separator to be used.
+ */
+ public abstract byte[] getInputSeparator();
+
+ /**
+ * Returns the field separator to be used.
+ */
+ public abstract byte[] getFieldSeparator();
- abstract int getNumOfKeyFields();
+ /**
+ * Returns the number of key fields.
+ */
+ public abstract int getNumOfKeyFields();
+
+ /**
+ * Returns the command to be spawned as a subprocess.
+ * Mapper/Reducer operations will delegate to it
+ */
+ abstract String getPipeCommand(JobConf job);
+
abstract boolean getDoPipe();
final static int OUTSIDE = 1;
@@ -120,7 +156,19 @@ public abstract class PipeMapRed {
job_ = job;
fs_ = FileSystem.get(job_);
-
+
+ mapInputWriterClass_ =
+ job_.getClass("stream.map.input.writer.class",
+ TextInputWriter.class, InputWriter.class);
+ mapOutputReaderClass_ =
+ job_.getClass("stream.map.output.reader.class",
+ TextOutputReader.class, OutputReader.class);
+ reduceInputWriterClass_ =
+ job_.getClass("stream.reduce.input.writer.class",
+ TextInputWriter.class, InputWriter.class);
+ reduceOutputReaderClass_ =
+ job_.getClass("stream.reduce.output.reader.class",
+ TextOutputReader.class, OutputReader.class);
nonZeroExitIsFailure_ = job_.getBoolean("stream.non.zero.exit.is.failure", true);
doPipe_ = getDoPipe();
@@ -280,13 +328,16 @@ public abstract class PipeMapRed {
}
}
- void startOutputThreads(OutputCollector output, Reporter reporter) {
- outThread_ = new MROutputThread(output, reporter);
+ void startOutputThreads(OutputCollector output, Reporter reporter)
+ throws IOException {
+ inWriter_ = createInputWriter();
+ outReader_ = createOutputReader();
+ outThread_ = new MROutputThread(outReader_, output, reporter);
outThread_.start();
errThread_.setReporter(reporter);
}
-
- void waitOutputThreads() {
+
+ void waitOutputThreads() throws IOException {
try {
if (outThread_ == null) {
// This happens only when reducer has empty input(So reduce() is not
@@ -328,58 +379,46 @@ public abstract class PipeMapRed {
//ignore
}
}
-
- /**
- * Split a line into key and value.
- * @param line: a byte array of line containing UTF-8 bytes
- * @param key: key of a record
- * @param val: value of a record
- * @throws IOException
- */
- void splitKeyVal(byte[] line, int length, Text key, Text val)
- throws IOException {
- int numKeyFields = getNumOfKeyFields();
- byte[] separator = getFieldSeparator();
-
- // Need to find numKeyFields separators
- int pos = UTF8ByteArrayUtils.findBytes(line, 0, length, separator);
- for(int k=1; k<numKeyFields && pos!=-1; k++) {
- pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length,
- length, separator);
- }
- try {
- if (pos == -1) {
- key.set(line, 0, length);
- val.set("");
- } else {
- StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos, separator.length);
- }
- } catch (CharacterCodingException e) {
- LOG.warn(StringUtils.stringifyException(e));
- }
+
+
+ abstract InputWriter createInputWriter() throws IOException;
+
+ InputWriter createInputWriter(Class<? extends InputWriter> inputWriterClass)
+ throws IOException {
+ InputWriter inputWriter =
+ ReflectionUtils.newInstance(inputWriterClass, job_);
+ inputWriter.initialize(this);
+ return inputWriter;
+ }
+
+ abstract OutputReader createOutputReader() throws IOException;
+
+ OutputReader createOutputReader(Class<? extends OutputReader> outputReaderClass)
+ throws IOException {
+ OutputReader outputReader =
+ ReflectionUtils.newInstance(outputReaderClass, job_);
+ outputReader.initialize(this);
+ return outputReader;
}
-
+
+
class MROutputThread extends Thread {
- MROutputThread(OutputCollector output, Reporter reporter) {
+ MROutputThread(OutputReader outReader, OutputCollector outCollector,
+ Reporter reporter) {
setDaemon(true);
- this.output = output;
+ this.outReader = outReader;
+ this.outCollector = outCollector;
this.reporter = reporter;
}
public void run() {
- LineReader lineReader = null;
try {
- Text key = new Text();
- Text val = new Text();
- Text line = new Text();
- lineReader = new LineReader((InputStream)clientIn_, job_);
// 3/4 Tool to Hadoop
- while (lineReader.readLine(line) > 0) {
- answer = line.getBytes();
- splitKeyVal(answer, line.getLength(), key, val);
- output.collect(key, val);
- line.clear();
+ while (outReader.readKeyValue()) {
+ Object key = outReader.getCurrentKey();
+ Object value = outReader.getCurrentValue();
+ outCollector.collect(key, value);
numRecWritten_++;
long now = System.currentTimeMillis();
if (now-lastStdoutReport > reporterOutDelay_) {
@@ -394,21 +433,11 @@ public abstract class PipeMapRed {
logflush();
}
}
- if (lineReader != null) {
- lineReader.close();
- }
- if (clientIn_ != null) {
- clientIn_.close();
- clientIn_ = null;
- LOG.info("MROutputThread done");
- }
} catch (Throwable th) {
outerrThreadsThrowable = th;
LOG.warn(StringUtils.stringifyException(th));
+ } finally {
try {
- if (lineReader != null) {
- lineReader.close();
- }
if (clientIn_ != null) {
clientIn_.close();
clientIn_ = null;
@@ -419,9 +448,9 @@ public abstract class PipeMapRed {
}
}
- OutputCollector output;
- Reporter reporter;
- byte[] answer;
+ OutputReader outReader = null;
+ OutputCollector outCollector = null;
+ Reporter reporter = null;
long lastStdoutReport = 0;
}
@@ -540,9 +569,10 @@ public abstract class PipeMapRed {
clientOut_.flush();
clientOut_.close();
}
+ waitOutputThreads();
} catch (IOException io) {
+ LOG.warn(StringUtils.stringifyException(io));
}
- waitOutputThreads();
if (sim != null) sim.destroy();
logprintln("mapRedFinished");
} catch (RuntimeException e) {
@@ -579,7 +609,7 @@ public abstract class PipeMapRed {
//s += envline("PWD"); // =/home/crawler/hadoop/trunk
s += "last Hadoop input: |" + mapredKey_ + "|\n";
if (outThread_ != null) {
- s += "last tool output: |" + outThread_.answer + "|\n";
+ s += "last tool output: |" + outReader_.getLastOutput() + "|\n";
}
s += "Date: " + new Date() + "\n";
// s += envline("HADOOP_HOME");
@@ -611,37 +641,12 @@ public abstract class PipeMapRed {
return msg;
}
- /**
- * Write a value to the output stream using UTF-8 encoding
- * @param value output value
- * @throws IOException
- */
- void write(Object value) throws IOException {
- byte[] bval;
- int valSize;
- if (value instanceof BytesWritable) {
- BytesWritable val = (BytesWritable) value;
- bval = val.getBytes();
- valSize = val.getLength();
- } else if (value instanceof Text) {
- Text val = (Text) value;
- bval = val.getBytes();
- valSize = val.getLength();
- } else {
- String sval = value.toString();
- bval = sval.getBytes("UTF-8");
- valSize = bval.length;
- }
- clientOut_.write(bval, 0, valSize);
- }
-
long startTime_;
long numRecRead_ = 0;
long numRecWritten_ = 0;
long numRecSkipped_ = 0;
long nextRecReadLog_ = 1;
-
long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
long reporterOutDelay_ = 10*1000L;
@@ -656,9 +661,15 @@ public abstract class PipeMapRed {
boolean debugFailDuring_;
boolean debugFailLate_;
+ Class<? extends InputWriter> mapInputWriterClass_;
+ Class<? extends OutputReader> mapOutputReaderClass_;
+ Class<? extends InputWriter> reduceInputWriterClass_;
+ Class<? extends OutputReader> reduceOutputReaderClass_;
boolean nonZeroExitIsFailure_;
Process sim;
+ InputWriter inWriter_;
+ OutputReader outReader_;
MROutputThread outThread_;
String jobLog_;
MRErrorThread errThread_;
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Sat Mar 10 01:50:45 2012
@@ -27,6 +27,9 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.io.TextInputWriter;
import org.apache.hadoop.util.StringUtils;
/** A generic Mapper bridge.
@@ -66,9 +69,11 @@ public class PipeMapper extends PipeMapR
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
- String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
- ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
-
+ if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
+ String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
+ ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+ }
+
try {
mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
@@ -99,11 +104,9 @@ public class PipeMapper extends PipeMapR
// 2/4 Hadoop to Tool
if (numExceptions_ == 0) {
if (!this.ignoreKey) {
- write(key);
- clientOut_.write(getInputSeparator());
+ inWriter_.writeKey(key);
}
- write(value);
- clientOut_.write('\n');
+ inWriter_.writeValue(value);
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
@@ -132,18 +135,29 @@ public class PipeMapper extends PipeMapR
mapRedFinished();
}
- byte[] getInputSeparator() {
+ @Override
+ public byte[] getInputSeparator() {
return mapInputFieldSeparator;
}
@Override
- byte[] getFieldSeparator() {
+ public byte[] getFieldSeparator() {
return mapOutputFieldSeparator;
}
@Override
- int getNumOfKeyFields() {
+ public int getNumOfKeyFields() {
return numOfMapOutputKeyFields;
}
+ @Override
+ InputWriter createInputWriter() throws IOException {
+ return super.createInputWriter(mapInputWriterClass_);
+ }
+
+ @Override
+ OutputReader createOutputReader() throws IOException {
+ return super.createOutputReader(mapOutputReaderClass_);
+ }
+
}
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Sat Mar 10 01:50:45 2012
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.Writable;
@@ -97,10 +99,8 @@ public class PipeReducer extends PipeMap
+ StringUtils.stringifyException(
outerrThreadsThrowable));
}
- write(key);
- clientOut_.write(getInputSeparator());
- write(val);
- clientOut_.write('\n');
+ inWriter_.writeKey(key);
+ inWriter_.writeValue(val);
} else {
// "identity reduce"
output.collect(key, val);
@@ -137,18 +137,29 @@ public class PipeReducer extends PipeMap
mapRedFinished();
}
- byte[] getInputSeparator() {
+ @Override
+ public byte[] getInputSeparator() {
return reduceInputFieldSeparator;
}
@Override
- byte[] getFieldSeparator() {
+ public byte[] getFieldSeparator() {
return reduceOutFieldSeparator;
}
@Override
- int getNumOfKeyFields() {
+ public int getNumOfKeyFields() {
return numOfReduceOutputKeyFields;
}
+
+ @Override
+ InputWriter createInputWriter() throws IOException {
+ return super.createInputWriter(reduceInputWriterClass_);
+ }
+
+ @Override
+ OutputReader createOutputReader() throws IOException {
+ return super.createOutputReader(reduceOutputReaderClass_);
+ }
}
Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Sat Mar 10 01:50:45 2012
@@ -47,7 +47,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -63,7 +62,11 @@ import org.apache.hadoop.mapred.TextInpu
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
+import org.apache.hadoop.streaming.io.IdentifierResolver;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -284,6 +287,7 @@ public class StreamJob implements Tool {
inReaderSpec_ = (String)cmdLine.getOptionValue("inputreader");
mapDebugSpec_ = (String)cmdLine.getOptionValue("mapdebug");
reduceDebugSpec_ = (String)cmdLine.getOptionValue("reducedebug");
+ ioSpec_ = (String)cmdLine.getOptionValue("io");
String[] car = cmdLine.getOptionValues("cacheArchive");
if (null != car && car.length > 0){
@@ -408,6 +412,8 @@ public class StreamJob implements Tool {
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
Option cacheArchive = createOption("cacheArchive",
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
+ Option io = createOption("io",
+ "Optional.", "spec", 1, false);
// boolean properties
@@ -437,6 +443,7 @@ public class StreamJob implements Tool {
addOption(cmdenv).
addOption(cacheFile).
addOption(cacheArchive).
+ addOption(io).
addOption(verbose).
addOption(info).
addOption(debug).
@@ -467,6 +474,7 @@ public class StreamJob implements Tool {
"To run this script when a map task fails ");
System.out.println(" -reducedebug <path> Optional." +
" To run this script when a reduce task fails ");
+ System.out.println(" -io <identifier> Optional.");
System.out.println(" -verbose");
System.out.println();
GenericOptionsParser.printGenericCommandUsage(System.out);
@@ -689,9 +697,38 @@ public class StreamJob implements Tool {
jobConf_.setInputFormat(fmt);
- jobConf_.setOutputKeyClass(Text.class);
- jobConf_.setOutputValueClass(Text.class);
-
+ if (ioSpec_ != null) {
+ jobConf_.set("stream.map.input", ioSpec_);
+ jobConf_.set("stream.map.output", ioSpec_);
+ jobConf_.set("stream.reduce.input", ioSpec_);
+ jobConf_.set("stream.reduce.output", ioSpec_);
+ }
+
+ Class<? extends IdentifierResolver> idResolverClass =
+ jobConf_.getClass("stream.io.identifier.resolver.class",
+ IdentifierResolver.class, IdentifierResolver.class);
+ IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);
+
+ idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.map.input.writer.class",
+ idResolver.getInputWriterClass(), InputWriter.class);
+
+ idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.reduce.input.writer.class",
+ idResolver.getInputWriterClass(), InputWriter.class);
+
+ idResolver.resolve(jobConf_.get("stream.map.output", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.map.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
+
+ idResolver.resolve(jobConf_.get("stream.reduce.output", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.reduce.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
if (mapCmd_ != null) {
@@ -962,6 +999,7 @@ public class StreamJob implements Tool {
protected String additionalConfSpec_;
protected String mapDebugSpec_;
protected String reduceDebugSpec_;
+ protected String ioSpec_;
// Use to communicate config to the external processes (ex env.var.HADOOP_USER)
// encoding "a=b c=d"