You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/13 05:12:12 UTC
svn commit: r743975 [1/2] - in /hadoop/core/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/
src/contrib/st...
Author: ddas
Date: Fri Feb 13 04:12:11 2009
New Revision: 743975
URL: http://svn.apache.org/viewvc?rev=743975&view=rev
Log:
HADOOP-1722. Adds support for TypedBytes and RawBytes in Streaming. Contributed by Klaas Bosteels.
Added:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=743975&r1=743974&r2=743975&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb 13 04:12:11 2009
@@ -98,6 +98,9 @@
HADOOP-4868. Splits the hadoop script into three parts - bin/hadoop,
bin/mapred and bin/hdfs. (Sharad Agarwal via ddas)
+ HADOOP-1722. Adds support for TypedBytes and RawBytes in Streaming.
+ (Klaas Bosteels via ddas)
+
OPTIMIZATIONS
BUG FIXES
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * An {@link InputFormat} that tries to deduce the types of the input files
+ * automatically. It can currently handle text and sequence files.
+ */
+public class AutoInputFormat extends FileInputFormat {
+
+ private TextInputFormat textInputFormat = new TextInputFormat();
+
+ private SequenceFileInputFormat seqFileInputFormat =
+ new SequenceFileInputFormat();
+
+ public void configure(JobConf job) {
+ textInputFormat.configure(job);
+ // SequenceFileInputFormat has no configure() method
+ }
+
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ FileSplit fileSplit = (FileSplit) split;
+ FileSystem fs = FileSystem.get(job);
+ FSDataInputStream is = fs.open(fileSplit.getPath());
+ byte[] header = new byte[3];
+ RecordReader reader = null;
+ try {
+ is.readFully(header);
+ } catch (EOFException eof) {
+ reader = textInputFormat.getRecordReader(split, job, reporter);
+ } finally {
+ is.close();
+ }
+ if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
+ reader = seqFileInputFormat.getRecordReader(split, job, reporter);
+ } else {
+ reader = textInputFormat.getRecordReader(split, job, reporter);
+ }
+ return reader;
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.typedbytes.TypedBytesOutput;
+import org.apache.hadoop.typedbytes.TypedBytesWritableOutput;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility program that fetches all files that match a given pattern and dumps
+ * their content to stdout as typed bytes. This works for all files that can be
+ * handled by {@link org.apache.hadoop.streaming.AutoInputFormat}.
+ */
+public class DumpTypedBytes implements Tool {
+
+ private Configuration conf;
+
+ public DumpTypedBytes(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public DumpTypedBytes() {
+ this(new Configuration());
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * The main driver for <code>DumpTypedBytes</code>.
+ */
+ public int run(String[] args) throws Exception {
+ Path pattern = new Path(args[0]);
+ FileSystem fs = pattern.getFileSystem(getConf());
+ fs.setVerifyChecksum(true);
+ for (Path p : FileUtil.stat2Paths(fs.globStatus(pattern), pattern)) {
+ List<FileStatus> inputFiles = new ArrayList<FileStatus>();
+ FileStatus status = fs.getFileStatus(p);
+ if (status.isDir()) {
+ FileStatus[] files = fs.listStatus(p);
+ Collections.addAll(inputFiles, files);
+ } else {
+ inputFiles.add(status);
+ }
+ return dumpTypedBytes(inputFiles);
+ }
+ return -1;
+ }
+
+ /**
+ * Dump given list of files to standard output as typed bytes.
+ */
+ @SuppressWarnings("unchecked")
+ private int dumpTypedBytes(List<FileStatus> files) throws IOException {
+ JobConf job = new JobConf(getConf());
+ DataOutputStream dout = new DataOutputStream(System.out);
+ AutoInputFormat autoInputFormat = new AutoInputFormat();
+ for (FileStatus fileStatus : files) {
+ FileSplit split = new FileSplit(fileStatus.getPath(), 0,
+ fileStatus.getLen() * fileStatus.getBlockSize(),
+ (String[]) null);
+ RecordReader recReader = null;
+ try {
+ recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
+ Object key = recReader.createKey();
+ Object value = recReader.createValue();
+ while (recReader.next(key, value)) {
+ if (key instanceof Writable) {
+ TypedBytesWritableOutput.get(dout).write((Writable) key);
+ } else {
+ TypedBytesOutput.get(dout).write(key);
+ }
+ if (value instanceof Writable) {
+ TypedBytesWritableOutput.get(dout).write((Writable) value);
+ } else {
+ TypedBytesOutput.get(dout).write(value);
+ }
+ }
+ } finally {
+ if (recReader != null) {
+ recReader.close();
+ }
+ }
+ }
+ dout.flush();
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ DumpTypedBytes dumptb = new DumpTypedBytes();
+ int res = ToolRunner.run(dumptb, args);
+ System.exit(res);
+ }
+
+}
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?rev=743975&r1=743974&r2=743975&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Fri Feb 13 04:12:11 2009
@@ -18,6 +18,8 @@
package org.apache.hadoop.streaming;
+import java.util.Arrays;
+
import org.apache.hadoop.util.ToolRunner;
/** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
@@ -27,11 +29,28 @@
public class HadoopStreaming {
public static void main(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("No Arguments Given!");
+ System.exit(1);
+ }
int returnStatus = 0;
- StreamJob job = new StreamJob();
- returnStatus = ToolRunner.run(job, args);
+ String cmd = args[0];
+ String[] remainingArgs = Arrays.copyOfRange(args, 1, args.length);
+ if (cmd.equalsIgnoreCase("dumptb")) {
+ DumpTypedBytes dumptb = new DumpTypedBytes();
+ returnStatus = ToolRunner.run(dumptb, remainingArgs);
+ } else if (cmd.equalsIgnoreCase("loadtb")) {
+ LoadTypedBytes loadtb = new LoadTypedBytes();
+ returnStatus = ToolRunner.run(loadtb, remainingArgs);
+ } else if (cmd.equalsIgnoreCase("streamjob")) {
+ StreamJob job = new StreamJob();
+ returnStatus = ToolRunner.run(job, remainingArgs);
+ } else { // for backward compatibility
+ StreamJob job = new StreamJob();
+ returnStatus = ToolRunner.run(job, args);
+ }
if (returnStatus != 0) {
- System.err.println("Streaming Job Failed!");
+ System.err.println("Streaming Command Failed!");
System.exit(returnStatus);
}
}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.typedbytes.TypedBytesInput;
+import org.apache.hadoop.typedbytes.TypedBytesWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Utility program that reads typed bytes from standard input and stores them in
+ * a sequence file for which the path is given as an argument.
+ */
+public class LoadTypedBytes implements Tool {
+
+ private Configuration conf;
+
+ public LoadTypedBytes(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public LoadTypedBytes() {
+ this(new Configuration());
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * The main driver for <code>LoadTypedBytes</code>.
+ */
+ public int run(String[] args) throws Exception {
+ Path path = new Path(args[0]);
+ FileSystem fs = path.getFileSystem(getConf());
+ if (fs.exists(path)) {
+ System.err.println("given path exists already!");
+ return -1;
+ }
+ TypedBytesInput tbinput = new TypedBytesInput(new DataInputStream(System.in));
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path,
+ TypedBytesWritable.class, TypedBytesWritable.class);
+ try {
+ TypedBytesWritable key = new TypedBytesWritable();
+ TypedBytesWritable value = new TypedBytesWritable();
+ byte[] rawKey = tbinput.readRaw();
+ while (rawKey != null) {
+ byte[] rawValue = tbinput.readRaw();
+ key.set(rawKey, 0, rawKey.length);
+ value.set(rawValue, 0, rawValue.length);
+ writer.append(key, value);
+ rawKey = tbinput.readRaw();
+ }
+ } finally {
+ writer.close();
+ }
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ LoadTypedBytes loadtb = new LoadTypedBytes();
+ int res = ToolRunner.run(loadtb, args);
+ System.exit(res);
+ }
+
+}
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=743975&r1=743974&r2=743975&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Feb 13 04:12:11 2009
@@ -19,8 +19,6 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.nio.charset.CharacterCodingException;
-import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.Iterator;
@@ -30,16 +28,20 @@
import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.io.TextInputWriter;
+import org.apache.hadoop.streaming.io.TextOutputReader;
import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.UTF8ByteArrayUtils;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.fs.FileSystem;
@@ -49,15 +51,49 @@
protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
- /** The command to be spawned as a subprocess.
- * Mapper/Reducer operations will delegate to it
+ /**
+ * Returns the Configuration.
*/
- abstract String getPipeCommand(JobConf job);
-
- abstract byte[] getFieldSeparator();
+ public Configuration getConfiguration() {
+ return job_;
+ }
+
+ /**
+ * Returns the DataOutput to which the client input is written.
+ */
+ public DataOutput getClientOutput() {
+ return clientOut_;
+ }
+
+ /**
+ * Returns the DataInput from which the client output is read.
+ */
+ public DataInput getClientInput() {
+ return clientIn_;
+ }
+
+ /**
+ * Returns the input separator to be used.
+ */
+ public abstract byte[] getInputSeparator();
+
+ /**
+ * Returns the field separator to be used.
+ */
+ public abstract byte[] getFieldSeparator();
- abstract int getNumOfKeyFields();
+ /**
+ * Returns the number of key fields.
+ */
+ public abstract int getNumOfKeyFields();
+
+ /**
+ * Returns the command to be spawned as a subprocess.
+ * Mapper/Reducer operations will delegate to it
+ */
+ abstract String getPipeCommand(JobConf job);
+
abstract boolean getDoPipe();
final static int OUTSIDE = 1;
@@ -120,7 +156,19 @@
job_ = job;
fs_ = FileSystem.get(job_);
-
+
+ mapInputWriterClass_ =
+ job_.getClass("stream.map.input.writer.class",
+ TextInputWriter.class, InputWriter.class);
+ mapOutputReaderClass_ =
+ job_.getClass("stream.map.output.reader.class",
+ TextOutputReader.class, OutputReader.class);
+ reduceInputWriterClass_ =
+ job_.getClass("stream.reduce.input.writer.class",
+ TextInputWriter.class, InputWriter.class);
+ reduceOutputReaderClass_ =
+ job_.getClass("stream.reduce.output.reader.class",
+ TextOutputReader.class, OutputReader.class);
nonZeroExitIsFailure_ = job_.getBoolean("stream.non.zero.exit.is.failure", true);
doPipe_ = getDoPipe();
@@ -280,13 +328,16 @@
}
}
- void startOutputThreads(OutputCollector output, Reporter reporter) {
- outThread_ = new MROutputThread(output, reporter);
+ void startOutputThreads(OutputCollector output, Reporter reporter)
+ throws IOException {
+ inWriter_ = createInputWriter();
+ outReader_ = createOutputReader();
+ outThread_ = new MROutputThread(outReader_, output, reporter);
outThread_.start();
errThread_.setReporter(reporter);
}
-
- void waitOutputThreads() {
+
+ void waitOutputThreads() throws IOException {
try {
if (outThread_ == null) {
// This happens only when reducer has empty input(So reduce() is not
@@ -325,58 +376,46 @@
//ignore
}
}
-
- /**
- * Split a line into key and value.
- * @param line: a byte array of line containing UTF-8 bytes
- * @param key: key of a record
- * @param val: value of a record
- * @throws IOException
- */
- void splitKeyVal(byte[] line, int length, Text key, Text val)
- throws IOException {
- int numKeyFields = getNumOfKeyFields();
- byte[] separator = getFieldSeparator();
-
- // Need to find numKeyFields separators
- int pos = UTF8ByteArrayUtils.findBytes(line, 0, length, separator);
- for(int k=1; k<numKeyFields && pos!=-1; k++) {
- pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length,
- length, separator);
- }
- try {
- if (pos == -1) {
- key.set(line, 0, length);
- val.set("");
- } else {
- StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos, separator.length);
- }
- } catch (CharacterCodingException e) {
- LOG.warn(StringUtils.stringifyException(e));
- }
+
+
+ abstract InputWriter createInputWriter() throws IOException;
+
+ InputWriter createInputWriter(Class<? extends InputWriter> inputWriterClass)
+ throws IOException {
+ InputWriter inputWriter =
+ ReflectionUtils.newInstance(inputWriterClass, job_);
+ inputWriter.initialize(this);
+ return inputWriter;
+ }
+
+ abstract OutputReader createOutputReader() throws IOException;
+
+ OutputReader createOutputReader(Class<? extends OutputReader> outputReaderClass)
+ throws IOException {
+ OutputReader outputReader =
+ ReflectionUtils.newInstance(outputReaderClass, job_);
+ outputReader.initialize(this);
+ return outputReader;
}
-
+
+
class MROutputThread extends Thread {
- MROutputThread(OutputCollector output, Reporter reporter) {
+ MROutputThread(OutputReader outReader, OutputCollector outCollector,
+ Reporter reporter) {
setDaemon(true);
- this.output = output;
+ this.outReader = outReader;
+ this.outCollector = outCollector;
this.reporter = reporter;
}
public void run() {
- LineReader lineReader = null;
try {
- Text key = new Text();
- Text val = new Text();
- Text line = new Text();
- lineReader = new LineReader((InputStream)clientIn_, job_);
// 3/4 Tool to Hadoop
- while (lineReader.readLine(line) > 0) {
- answer = line.getBytes();
- splitKeyVal(answer, line.getLength(), key, val);
- output.collect(key, val);
- line.clear();
+ while (outReader.readKeyValue()) {
+ Object key = outReader.getCurrentKey();
+ Object value = outReader.getCurrentValue();
+ outCollector.collect(key, value);
numRecWritten_++;
long now = System.currentTimeMillis();
if (now-lastStdoutReport > reporterOutDelay_) {
@@ -387,21 +426,11 @@
logflush();
}
}
- if (lineReader != null) {
- lineReader.close();
- }
- if (clientIn_ != null) {
- clientIn_.close();
- clientIn_ = null;
- LOG.info("MROutputThread done");
- }
} catch (Throwable th) {
outerrThreadsThrowable = th;
LOG.warn(StringUtils.stringifyException(th));
+ } finally {
try {
- if (lineReader != null) {
- lineReader.close();
- }
if (clientIn_ != null) {
clientIn_.close();
clientIn_ = null;
@@ -412,9 +441,9 @@
}
}
- OutputCollector output;
- Reporter reporter;
- byte[] answer;
+ OutputReader outReader = null;
+ OutputCollector outCollector = null;
+ Reporter reporter = null;
long lastStdoutReport = 0;
}
@@ -532,9 +561,10 @@
clientOut_.flush();
clientOut_.close();
}
+ waitOutputThreads();
} catch (IOException io) {
+ LOG.warn(StringUtils.stringifyException(io));
}
- waitOutputThreads();
if (sim != null) sim.destroy();
logprintln("mapRedFinished");
} catch (RuntimeException e) {
@@ -570,7 +600,7 @@
//s += envline("PWD"); // =/home/crawler/hadoop/trunk
s += "last Hadoop input: |" + mapredKey_ + "|\n";
if (outThread_ != null) {
- s += "last tool output: |" + outThread_.answer + "|\n";
+ s += "last tool output: |" + outReader_.getLastOutput() + "|\n";
}
s += "Date: " + new Date() + "\n";
// s += envline("HADOOP_HOME");
@@ -602,37 +632,12 @@
return msg;
}
- /**
- * Write a value to the output stream using UTF-8 encoding
- * @param value output value
- * @throws IOException
- */
- void write(Object value) throws IOException {
- byte[] bval;
- int valSize;
- if (value instanceof BytesWritable) {
- BytesWritable val = (BytesWritable) value;
- bval = val.getBytes();
- valSize = val.getLength();
- } else if (value instanceof Text) {
- Text val = (Text) value;
- bval = val.getBytes();
- valSize = val.getLength();
- } else {
- String sval = value.toString();
- bval = sval.getBytes("UTF-8");
- valSize = bval.length;
- }
- clientOut_.write(bval, 0, valSize);
- }
-
long startTime_;
long numRecRead_ = 0;
long numRecWritten_ = 0;
long numRecSkipped_ = 0;
long nextRecReadLog_ = 1;
-
long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
long reporterOutDelay_ = 10*1000L;
@@ -647,9 +652,15 @@
boolean debugFailDuring_;
boolean debugFailLate_;
+ Class<? extends InputWriter> mapInputWriterClass_;
+ Class<? extends OutputReader> mapOutputReaderClass_;
+ Class<? extends InputWriter> reduceInputWriterClass_;
+ Class<? extends OutputReader> reduceOutputReaderClass_;
boolean nonZeroExitIsFailure_;
Process sim;
+ InputWriter inWriter_;
+ OutputReader outReader_;
MROutputThread outThread_;
String jobLog_;
MRErrorThread errThread_;
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=743975&r1=743974&r2=743975&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Fri Feb 13 04:12:11 2009
@@ -27,6 +27,9 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.io.TextInputWriter;
import org.apache.hadoop.util.StringUtils;
/** A generic Mapper bridge.
@@ -66,9 +69,11 @@
//records input.
SkipBadRecords.setAutoIncrMapperProcCount(job, false);
skipping = job.getBoolean("mapred.skip.on", false);
- String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
- ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
-
+ if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
+ String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
+ ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+ }
+
try {
mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
@@ -99,11 +104,9 @@
// 2/4 Hadoop to Tool
if (numExceptions_ == 0) {
if (!this.ignoreKey) {
- write(key);
- clientOut_.write(getInputSeparator());
+ inWriter_.writeKey(key);
}
- write(value);
- clientOut_.write('\n');
+ inWriter_.writeValue(value);
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
@@ -132,18 +135,29 @@
mapRedFinished();
}
- byte[] getInputSeparator() {
+ @Override
+ public byte[] getInputSeparator() {
return mapInputFieldSeparator;
}
@Override
- byte[] getFieldSeparator() {
+ public byte[] getFieldSeparator() {
return mapOutputFieldSeparator;
}
@Override
- int getNumOfKeyFields() {
+ public int getNumOfKeyFields() {
return numOfMapOutputKeyFields;
}
+ @Override
+ InputWriter createInputWriter() throws IOException {
+ return super.createInputWriter(mapInputWriterClass_);
+ }
+
+ @Override
+ OutputReader createOutputReader() throws IOException {
+ return super.createOutputReader(mapOutputReaderClass_);
+ }
+
}
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=743975&r1=743974&r2=743975&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Fri Feb 13 04:12:11 2009
@@ -28,6 +28,8 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.Writable;
@@ -97,10 +99,8 @@
+ StringUtils.stringifyException(
outerrThreadsThrowable));
}
- write(key);
- clientOut_.write(getInputSeparator());
- write(val);
- clientOut_.write('\n');
+ inWriter_.writeKey(key);
+ inWriter_.writeValue(val);
} else {
// "identity reduce"
output.collect(key, val);
@@ -137,18 +137,29 @@
mapRedFinished();
}
- byte[] getInputSeparator() {
+ @Override
+ public byte[] getInputSeparator() {
return reduceInputFieldSeparator;
}
@Override
- byte[] getFieldSeparator() {
+ public byte[] getFieldSeparator() {
return reduceOutFieldSeparator;
}
@Override
- int getNumOfKeyFields() {
+ public int getNumOfKeyFields() {
return numOfReduceOutputKeyFields;
}
+
+ @Override
+ InputWriter createInputWriter() throws IOException {
+ return super.createInputWriter(reduceInputWriterClass_);
+ }
+
+ @Override
+ OutputReader createOutputReader() throws IOException {
+ return super.createOutputReader(reduceOutputReaderClass_);
+ }
}
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=743975&r1=743974&r2=743975&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Fri Feb 13 04:12:11 2009
@@ -52,7 +52,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -68,7 +67,11 @@
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
+import org.apache.hadoop.streaming.io.IdentifierResolver;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -286,6 +289,7 @@
inReaderSpec_ = (String)cmdLine.getValue("-inputreader");
mapDebugSpec_ = (String)cmdLine.getValue("-mapdebug");
reduceDebugSpec_ = (String)cmdLine.getValue("-reducedebug");
+ ioSpec_ = (String)cmdLine.getValue("-io");
List<String> car = cmdLine.getValues("-cacheArchive");
if (null != car && !car.isEmpty()){
@@ -454,6 +458,8 @@
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
Option cacheArchive = createOption("cacheArchive",
"File name URI", "fileNameURI", Integer.MAX_VALUE, false);
+ Option io = createOption("io",
+ "Optional.", "spec", 1, false);
// boolean properties
@@ -484,6 +490,7 @@
withOption(cmdenv).
withOption(cacheFile).
withOption(cacheArchive).
+ withOption(io).
withOption(verbose).
withOption(info).
withOption(debug).
@@ -517,6 +524,7 @@
"To run this script when a map task fails ");
System.out.println(" -reducedebug <path> Optional." +
" To run this script when a reduce task fails ");
+ System.out.println(" -io <identifier> Optional.");
System.out.println(" -verbose");
System.out.println();
GenericOptionsParser.printGenericCommandUsage(System.out);
@@ -739,9 +747,38 @@
jobConf_.setInputFormat(fmt);
- jobConf_.setOutputKeyClass(Text.class);
- jobConf_.setOutputValueClass(Text.class);
-
+ if (ioSpec_ != null) {
+ jobConf_.set("stream.map.input", ioSpec_);
+ jobConf_.set("stream.map.output", ioSpec_);
+ jobConf_.set("stream.reduce.input", ioSpec_);
+ jobConf_.set("stream.reduce.output", ioSpec_);
+ }
+
+ Class<? extends IdentifierResolver> idResolverClass =
+ jobConf_.getClass("stream.io.identifier.resolver.class",
+ IdentifierResolver.class, IdentifierResolver.class);
+ IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);
+
+ idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.map.input.writer.class",
+ idResolver.getInputWriterClass(), InputWriter.class);
+
+ idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.reduce.input.writer.class",
+ idResolver.getInputWriterClass(), InputWriter.class);
+
+ idResolver.resolve(jobConf_.get("stream.map.output", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.map.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
+
+ idResolver.resolve(jobConf_.get("stream.reduce.output", IdentifierResolver.TEXT_ID));
+ jobConf_.setClass("stream.reduce.output.reader.class",
+ idResolver.getOutputReaderClass(), OutputReader.class);
+ jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+ jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
if (mapCmd_ != null) {
@@ -1062,6 +1099,7 @@
protected String additionalConfSpec_;
protected String mapDebugSpec_;
protected String reduceDebugSpec_;
+ protected String ioSpec_;
// Use to communicate config to the external processes (ex env.var.HADOOP_USER)
// encoding "a=b c=d"
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.typedbytes.TypedBytesWritable;
+
+/**
+ * This class is used to resolve a string identifier into the required IO
+ * classes. By extending this class and pointing the property
+ * <tt>stream.io.identifier.resolver.class</tt> to this extension, additional
+ * IO classes can be added by external code.
+ */
+public class IdentifierResolver {
+
+ // note that the identifiers are case insensitive
+ public static final String TEXT_ID = "text";
+ public static final String RAW_BYTES_ID = "rawbytes";
+ public static final String TYPED_BYTES_ID = "typedbytes";
+
+ private Class<? extends InputWriter> inputWriterClass = null;
+ private Class<? extends OutputReader> outputReaderClass = null;
+ private Class outputKeyClass = null;
+ private Class outputValueClass = null;
+
+ /**
+ * Resolves a given identifier. This method has to be called before calling
+ * any of the getters.
+ */
+ public void resolve(String identifier) {
+ if (identifier.equalsIgnoreCase(RAW_BYTES_ID)) {
+ setInputWriterClass(RawBytesInputWriter.class);
+ setOutputReaderClass(RawBytesOutputReader.class);
+ setOutputKeyClass(BytesWritable.class);
+ setOutputValueClass(BytesWritable.class);
+ } else if (identifier.equalsIgnoreCase(TYPED_BYTES_ID)) {
+ setInputWriterClass(TypedBytesInputWriter.class);
+ setOutputReaderClass(TypedBytesOutputReader.class);
+ setOutputKeyClass(TypedBytesWritable.class);
+ setOutputValueClass(TypedBytesWritable.class);
+ } else { // assume TEXT_ID
+ setInputWriterClass(TextInputWriter.class);
+ setOutputReaderClass(TextOutputReader.class);
+ setOutputKeyClass(Text.class);
+ setOutputValueClass(Text.class);
+ }
+ }
+
+ /**
+ * Returns the resolved {@link InputWriter} class.
+ */
+ public Class<? extends InputWriter> getInputWriterClass() {
+ return inputWriterClass;
+ }
+
+ /**
+ * Returns the resolved {@link OutputReader} class.
+ */
+ public Class<? extends OutputReader> getOutputReaderClass() {
+ return outputReaderClass;
+ }
+
+ /**
+ * Returns the resolved output key class.
+ */
+ public Class getOutputKeyClass() {
+ return outputKeyClass;
+ }
+
+ /**
+ * Returns the resolved output value class.
+ */
+ public Class getOutputValueClass() {
+ return outputValueClass;
+ }
+
+
+ /**
+ * Sets the {@link InputWriter} class.
+ */
+ protected void setInputWriterClass(Class<? extends InputWriter>
+ inputWriterClass) {
+ this.inputWriterClass = inputWriterClass;
+ }
+
+ /**
+ * Sets the {@link OutputReader} class.
+ */
+ protected void setOutputReaderClass(Class<? extends OutputReader>
+ outputReaderClass) {
+ this.outputReaderClass = outputReaderClass;
+ }
+
+ /**
+ * Sets the output key class class.
+ */
+ protected void setOutputKeyClass(Class outputKeyClass) {
+ this.outputKeyClass = outputKeyClass;
+ }
+
+ /**
+ * Sets the output value class.
+ */
+ protected void setOutputValueClass(Class outputValueClass) {
+ this.outputValueClass = outputValueClass;
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.streaming.PipeMapRed;
+
+/**
+ * Abstract base for classes that write the client's input.
+ */
+public abstract class InputWriter<K, V> {
+
+ /**
+ * Initializes the InputWriter. This method has to be called before calling
+ * any of the other methods.
+ */
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ // nothing here yet, but that might change in the future
+ }
+
+ /**
+ * Writes an input key.
+ */
+ public abstract void writeKey(K key) throws IOException;
+
+ /**
+ * Writes an input value.
+ */
+ public abstract void writeValue(V value) throws IOException;
+
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.streaming.PipeMapRed;
+
+/**
+ * Abstract base for classes that read the client's output.
+ */
+public abstract class OutputReader<K, V> {
+
+ /**
+ * Initializes the OutputReader. This method has to be called before
+ * calling any of the other methods.
+ */
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ // nothing here yet, but that might change in the future
+ }
+
+ /**
+ * Read the next key/value pair outputted by the client.
+ * @return true iff a key/value pair was read
+ */
+ public abstract boolean readKeyValue() throws IOException;
+
+ /**
+ * Returns the current key.
+ */
+ public abstract K getCurrentKey() throws IOException;
+
+ /**
+ * Returns the current value.
+ */
+ public abstract V getCurrentValue() throws IOException;
+
+ /**
+ * Returns the last output from the client as a String.
+ */
+ public abstract String getLastOutput();
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.streaming.PipeMapRed;
+
+/**
+ * InputWriter that writes the client's input as raw bytes.
+ */
+public class RawBytesInputWriter extends InputWriter<Writable, Writable> {
+
+ private DataOutput clientOut;
+ private ByteArrayOutputStream bufferOut;
+ private DataOutputStream bufferDataOut;
+
+ @Override
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ super.initialize(pipeMapRed);
+ clientOut = pipeMapRed.getClientOutput();
+ bufferOut = new ByteArrayOutputStream();
+ bufferDataOut = new DataOutputStream(bufferOut);
+ }
+
+ @Override
+ public void writeKey(Writable key) throws IOException {
+ writeRawBytes(key);
+ }
+
+ @Override
+ public void writeValue(Writable value) throws IOException {
+ writeRawBytes(value);
+ }
+
+ private void writeRawBytes(Writable writable) throws IOException {
+ if (writable instanceof BytesWritable) {
+ BytesWritable bw = (BytesWritable) writable;
+ byte[] bytes = bw.getBytes();
+ int length = bw.getLength();
+ clientOut.writeInt(length);
+ clientOut.write(bytes, 0, length);
+ } else {
+ bufferOut.reset();
+ writable.write(bufferDataOut);
+ byte[] bytes = bufferOut.toByteArray();
+ clientOut.writeInt(bytes.length);
+ clientOut.write(bytes);
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.streaming.PipeMapRed;
+
+/**
+ * OutputReader that reads the client's output as raw bytes.
+ */
+public class RawBytesOutputReader
+ extends OutputReader<BytesWritable, BytesWritable> {
+
+ private DataInput clientIn;
+ private byte[] bytes;
+ private BytesWritable key;
+ private BytesWritable value;
+
+ @Override
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ super.initialize(pipeMapRed);
+ clientIn = pipeMapRed.getClientInput();
+ key = new BytesWritable();
+ value = new BytesWritable();
+ }
+
+ @Override
+ public boolean readKeyValue() throws IOException {
+ int length = readLength();
+ if (length < 0) {
+ return false;
+ }
+ key.set(readBytes(length), 0, length);
+ length = readLength();
+ value.set(readBytes(length), 0, length);
+ return true;
+ }
+
+ @Override
+ public BytesWritable getCurrentKey() throws IOException {
+ return key;
+ }
+
+ @Override
+ public BytesWritable getCurrentValue() throws IOException {
+ return value;
+ }
+
+ @Override
+ public String getLastOutput() {
+ return new BytesWritable(bytes).toString();
+ }
+
+ private int readLength() throws IOException {
+ try {
+ return clientIn.readInt();
+ } catch (EOFException eof) {
+ return -1;
+ }
+ }
+
+ private byte[] readBytes(int length) throws IOException {
+ bytes = new byte[length];
+ clientIn.readFully(bytes);
+ return bytes;
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.streaming.PipeMapRed;
+
+/**
+ * InputWriter that writes the client's input as text.
+ */
+public class TextInputWriter extends InputWriter<Object, Object> {
+
+ private DataOutput clientOut;
+ private byte[] inputSeparator;
+
+ @Override
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ super.initialize(pipeMapRed);
+ clientOut = pipeMapRed.getClientOutput();
+ inputSeparator = pipeMapRed.getInputSeparator();
+ }
+
+ @Override
+ public void writeKey(Object key) throws IOException {
+ writeUTF8(key);
+ clientOut.write(inputSeparator);
+ }
+
+ @Override
+ public void writeValue(Object value) throws IOException {
+ writeUTF8(value);
+ clientOut.write('\n');
+ }
+
+ // Write an object to the output stream using UTF-8 encoding
+ private void writeUTF8(Object object) throws IOException {
+ byte[] bval;
+ int valSize;
+ if (object instanceof BytesWritable) {
+ BytesWritable val = (BytesWritable) object;
+ bval = val.getBytes();
+ valSize = val.getLength();
+ } else if (object instanceof Text) {
+ Text val = (Text) object;
+ bval = val.getBytes();
+ valSize = val.getLength();
+ } else {
+ String sval = object.toString();
+ bval = sval.getBytes("UTF-8");
+ valSize = bval.length;
+ }
+ clientOut.write(bval, 0, valSize);
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.CharacterCodingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.streaming.PipeMapRed;
+import org.apache.hadoop.streaming.StreamKeyValUtil;
+import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.UTF8ByteArrayUtils;
+
+/**
+ * OutputReader that reads the client's output as text.
+ */
+public class TextOutputReader extends OutputReader<Text, Text> {
+
+ private LineReader lineReader;
+ private byte[] bytes;
+ private DataInput clientIn;
+ private Configuration conf;
+ private int numKeyFields;
+ private byte[] separator;
+ private Text key;
+ private Text value;
+ private Text line;
+
+ @Override
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ super.initialize(pipeMapRed);
+ clientIn = pipeMapRed.getClientInput();
+ conf = pipeMapRed.getConfiguration();
+ numKeyFields = pipeMapRed.getNumOfKeyFields();
+ separator = pipeMapRed.getFieldSeparator();
+ lineReader = new LineReader((InputStream)clientIn, conf);
+ key = new Text();
+ value = new Text();
+ line = new Text();
+ }
+
+ @Override
+ public boolean readKeyValue() throws IOException {
+ if (lineReader.readLine(line) <= 0) {
+ return false;
+ }
+ bytes = line.getBytes();
+ splitKeyVal(bytes, line.getLength(), key, value);
+ line.clear();
+ return true;
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException {
+ return key;
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException {
+ return value;
+ }
+
+ @Override
+ public String getLastOutput() {
+ try {
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ return "<undecodable>";
+ }
+ }
+
+ // split a UTF-8 line into key and value
+ private void splitKeyVal(byte[] line, int length, Text key, Text val)
+ throws IOException {
+ // Need to find numKeyFields separators
+ int pos = UTF8ByteArrayUtils.findBytes(line, 0, length, separator);
+ for(int k=1; k<numKeyFields && pos!=-1; k++) {
+ pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length,
+ length, separator);
+ }
+ try {
+ if (pos == -1) {
+ key.set(line, 0, length);
+ val.set("");
+ } else {
+ StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos,
+ separator.length);
+ }
+ } catch (CharacterCodingException e) {
+ throw new IOException(StringUtils.stringifyException(e));
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.streaming.PipeMapRed;
+import org.apache.hadoop.typedbytes.TypedBytesOutput;
+import org.apache.hadoop.typedbytes.TypedBytesWritableOutput;
+
+/**
+ * InputWriter that writes the client's input as typed bytes.
+ */
+public class TypedBytesInputWriter extends InputWriter<Object, Object> {
+
+ private TypedBytesOutput tbOut;
+ private TypedBytesWritableOutput tbwOut;
+
+ @Override
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ super.initialize(pipeMapRed);
+ DataOutput clientOut = pipeMapRed.getClientOutput();
+ tbOut = new TypedBytesOutput(clientOut);
+ tbwOut = new TypedBytesWritableOutput(clientOut);
+ }
+
+ @Override
+ public void writeKey(Object key) throws IOException {
+ writeTypedBytes(key);
+ }
+
+ @Override
+ public void writeValue(Object value) throws IOException {
+ writeTypedBytes(value);
+ }
+
+ private void writeTypedBytes(Object value) throws IOException {
+ if (value instanceof Writable) {
+ tbwOut.write((Writable) value);
+ } else {
+ tbOut.write(value);
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.io;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.streaming.PipeMapRed;
+import org.apache.hadoop.typedbytes.TypedBytesInput;
+import org.apache.hadoop.typedbytes.TypedBytesWritable;
+
+/**
+ * OutputReader that reads the client's output as typed bytes.
+ */
+public class TypedBytesOutputReader extends
+ OutputReader<TypedBytesWritable, TypedBytesWritable> {
+
+ private byte[] bytes;
+ private DataInput clientIn;
+ private TypedBytesWritable key;
+ private TypedBytesWritable value;
+ private TypedBytesInput in;
+
+ @Override
+ public void initialize(PipeMapRed pipeMapRed) throws IOException {
+ super.initialize(pipeMapRed);
+ clientIn = pipeMapRed.getClientInput();
+ key = new TypedBytesWritable();
+ value = new TypedBytesWritable();
+ in = new TypedBytesInput(clientIn);
+ }
+
+ @Override
+ public boolean readKeyValue() throws IOException {
+ bytes = in.readRaw();
+ if (bytes == null) {
+ return false;
+ }
+ key.set(bytes, 0, bytes.length);
+ bytes = in.readRaw();
+ value.set(bytes, 0, bytes.length);
+ return true;
+ }
+
+ @Override
+ public TypedBytesWritable getCurrentKey() throws IOException {
+ return key;
+ }
+
+ @Override
+ public TypedBytesWritable getCurrentValue() throws IOException {
+ return value;
+ }
+
+ @Override
+ public String getLastOutput() {
+ return new TypedBytesWritable(bytes).toString();
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+/**
+ * The possible type codes.
+ */
+public enum Type {
+
+ BYTES(0),
+ BYTE(1),
+ BOOL(2),
+ INT(3),
+ LONG(4),
+ FLOAT(5),
+ DOUBLE(6),
+ STRING(7),
+ VECTOR(8),
+ LIST(9),
+ MAP(10),
+ MARKER(255);
+
+ final int code;
+
+ Type(int code) {
+ this.code = code;
+ }
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java?rev=743975&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java (added)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java Fri Feb 13 04:12:11 2009
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.typedbytes;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.record.Buffer;
+
+/**
+ * Provides functionality for reading typed bytes.
+ */
+public class TypedBytesInput {
+
+ private DataInput in;
+
+ private TypedBytesInput() {}
+
+ private void setDataInput(DataInput in) {
+ this.in = in;
+ }
+
+ private static ThreadLocal tbIn = new ThreadLocal() {
+ protected synchronized Object initialValue() {
+ return new TypedBytesInput();
+ }
+ };
+
+ /**
+ * Get a thread-local typed bytes input for the supplied {@link DataInput}.
+ * @param in data input object
+ * @return typed bytes input corresponding to the supplied {@link DataInput}.
+ */
+ public static TypedBytesInput get(DataInput in) {
+ TypedBytesInput bin = (TypedBytesInput) tbIn.get();
+ bin.setDataInput(in);
+ return bin;
+ }
+
+ /** Creates a new instance of TypedBytesInput. */
+ public TypedBytesInput(DataInput in) {
+ this.in = in;
+ }
+
+ /**
+ * Reads a typed bytes sequence and converts it to a Java object. The first
+ * byte is interpreted as a type code, and then the right number of
+ * subsequent bytes are read depending on the obtained type.
+ * @return the obtained object or null when the end of the file is reached
+ * @throws IOException
+ */
+ public Object read() throws IOException {
+ int code = 1;
+ try {
+ code = in.readUnsignedByte();
+ } catch (EOFException eof) {
+ return null;
+ }
+ if (code == Type.BYTES.code) {
+ return new Buffer(readBytes());
+ } else if (code == Type.BYTE.code) {
+ return readByte();
+ } else if (code == Type.BOOL.code) {
+ return readBool();
+ } else if (code == Type.INT.code) {
+ return readInt();
+ } else if (code == Type.LONG.code) {
+ return readLong();
+ } else if (code == Type.FLOAT.code) {
+ return readFloat();
+ } else if (code == Type.DOUBLE.code) {
+ return readDouble();
+ } else if (code == Type.STRING.code) {
+ return readString();
+ } else if (code == Type.VECTOR.code) {
+ return readVector();
+ } else if (code == Type.LIST.code) {
+ return readList();
+ } else if (code == Type.MAP.code) {
+ return readMap();
+ } else if (code == Type.MARKER.code) {
+ return null;
+ } else {
+ throw new RuntimeException("unknown type");
+ }
+ }
+
+ /**
+ * Reads a typed bytes sequence. The first byte is interpreted as a type code,
+ * and then the right number of subsequent bytes are read depending on the
+ * obtained type.
+ *
+ * @return the obtained typed bytes sequence or null when the end of the file
+ * is reached
+ * @throws IOException
+ */
+ public byte[] readRaw() throws IOException {
+ int code = -1;
+ try {
+ code = in.readUnsignedByte();
+ } catch (EOFException eof) {
+ return null;
+ }
+ if (code == Type.BYTES.code) {
+ return readRawBytes();
+ } else if (code == Type.BYTE.code) {
+ return readRawByte();
+ } else if (code == Type.BOOL.code) {
+ return readRawBool();
+ } else if (code == Type.INT.code) {
+ return readRawInt();
+ } else if (code == Type.LONG.code) {
+ return readRawLong();
+ } else if (code == Type.FLOAT.code) {
+ return readRawFloat();
+ } else if (code == Type.DOUBLE.code) {
+ return readRawDouble();
+ } else if (code == Type.STRING.code) {
+ return readRawString();
+ } else if (code == Type.VECTOR.code) {
+ return readRawVector();
+ } else if (code == Type.LIST.code) {
+ return readRawList();
+ } else if (code == Type.MAP.code) {
+ return readRawMap();
+ } else if (code == Type.MARKER.code) {
+ return null;
+ } else {
+ throw new RuntimeException("unknown type");
+ }
+ }
+
+ /**
+ * Reads a type byte and returns the corresponding {@link Type}.
+ * @return the obtained Type or null when the end of the file is reached
+ * @throws IOException
+ */
+ public Type readType() throws IOException {
+ int code = -1;
+ try {
+ code = in.readUnsignedByte();
+ } catch (EOFException eof) {
+ return null;
+ }
+ for (Type type : Type.values()) {
+ if (type.code == code) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Skips a type byte.
+ * @return true iff the end of the file was not reached
+ * @throws IOException
+ */
+ public boolean skipType() throws IOException {
+ try {
+ in.readByte();
+ return true;
+ } catch (EOFException eof) {
+ return false;
+ }
+ }
+
+ /**
+ * Reads the bytes following a <code>Type.BYTES</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readBytes() throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ return bytes;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.BYTES</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawBytes() throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[5 + length];
+ bytes[0] = (byte) Type.BYTES.code;
+ bytes[1] = (byte) (0xff & (length >> 24));
+ bytes[2] = (byte) (0xff & (length >> 16));
+ bytes[3] = (byte) (0xff & (length >> 8));
+ bytes[4] = (byte) (0xff & length);
+ in.readFully(bytes, 5, length);
+ return bytes;
+ }
+
+ /**
+ * Reads the byte following a <code>Type.BYTE</code> code.
+ * @return the obtained byte
+ * @throws IOException
+ */
+ public byte readByte() throws IOException {
+ return in.readByte();
+ }
+
+ /**
+ * Reads the raw byte following a <code>Type.BYTE</code> code.
+ * @return the obtained byte
+ * @throws IOException
+ */
+ public byte[] readRawByte() throws IOException {
+ byte[] bytes = new byte[2];
+ bytes[0] = (byte) Type.BYTE.code;
+ in.readFully(bytes, 1, 1);
+ return bytes;
+ }
+
+ /**
+ * Reads the boolean following a <code>Type.BOOL</code> code.
+ * @return the obtained boolean
+ * @throws IOException
+ */
+ public boolean readBool() throws IOException {
+ return in.readBoolean();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.BOOL</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawBool() throws IOException {
+ byte[] bytes = new byte[2];
+ bytes[0] = (byte) Type.BOOL.code;
+ in.readFully(bytes, 1, 1);
+ return bytes;
+ }
+
+ /**
+ * Reads the integer following a <code>Type.INT</code> code.
+ * @return the obtained integer
+ * @throws IOException
+ */
+ public int readInt() throws IOException {
+ return in.readInt();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.INT</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawInt() throws IOException {
+ byte[] bytes = new byte[5];
+ bytes[0] = (byte) Type.INT.code;
+ in.readFully(bytes, 1, 4);
+ return bytes;
+ }
+
+ /**
+ * Reads the long following a <code>Type.LONG</code> code.
+ * @return the obtained long
+ * @throws IOException
+ */
+ public long readLong() throws IOException {
+ return in.readLong();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.LONG</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawLong() throws IOException {
+ byte[] bytes = new byte[9];
+ bytes[0] = (byte) Type.LONG.code;
+ in.readFully(bytes, 1, 8);
+ return bytes;
+ }
+
+ /**
+ * Reads the float following a <code>Type.FLOAT</code> code.
+ * @return the obtained float
+ * @throws IOException
+ */
+ public float readFloat() throws IOException {
+ return in.readFloat();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.FLOAT</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawFloat() throws IOException {
+ byte[] bytes = new byte[5];
+ bytes[0] = (byte) Type.FLOAT.code;
+ in.readFully(bytes, 1, 4);
+ return bytes;
+ }
+
+ /**
+ * Reads the double following a <code>Type.DOUBLE</code> code.
+ * @return the obtained double
+ * @throws IOException
+ */
+ public double readDouble() throws IOException {
+ return in.readDouble();
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.DOUBLE</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawDouble() throws IOException {
+ byte[] bytes = new byte[9];
+ bytes[0] = (byte) Type.DOUBLE.code;
+ in.readFully(bytes, 1, 8);
+ return bytes;
+ }
+
+ /**
+ * Reads the string following a <code>Type.STRING</code> code.
+ * @return the obtained string
+ * @throws IOException
+ */
+ public String readString() throws IOException {
+ return WritableUtils.readString(in);
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.STRING</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawString() throws IOException {
+ int length = in.readInt();
+ byte[] bytes = new byte[5 + length];
+ bytes[0] = (byte) Type.STRING.code;
+ bytes[1] = (byte) (0xff & (length >> 24));
+ bytes[2] = (byte) (0xff & (length >> 16));
+ bytes[3] = (byte) (0xff & (length >> 8));
+ bytes[4] = (byte) (0xff & length);
+ in.readFully(bytes, 5, length);
+ return bytes;
+ }
+
+ /**
+ * Reads the vector following a <code>Type.VECTOR</code> code.
+ * @return the obtained vector
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public ArrayList readVector() throws IOException {
+ int length = readVectorHeader();
+ ArrayList result = new ArrayList(length);
+ for (int i = 0; i < length; i++) {
+ result.add(read());
+ }
+ return result;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.VECTOR</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawVector() throws IOException {
+ Buffer buffer = new Buffer();
+ int length = readVectorHeader();
+ buffer.append(new byte[] {
+ (byte) Type.VECTOR.code,
+ (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+ (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+ });
+ for (int i = 0; i < length; i++) {
+ buffer.append(readRaw());
+ }
+ return buffer.get();
+ }
+
+ /**
+ * Reads the header following a <code>Type.VECTOR</code> code.
+ * @return the number of elements in the vector
+ * @throws IOException
+ */
+ public int readVectorHeader() throws IOException {
+ return in.readInt();
+ }
+
+ /**
+ * Reads the list following a <code>Type.LIST</code> code.
+ * @return the obtained list
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public List readList() throws IOException {
+ List list = new ArrayList();
+ Object obj = read();
+ while (obj != null) {
+ list.add(obj);
+ obj = read();
+ }
+ return list;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.LIST</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawList() throws IOException {
+ Buffer buffer = new Buffer(new byte[] { (byte) Type.LIST.code });
+ byte[] bytes = readRaw();
+ while (bytes != null) {
+ buffer.append(bytes);
+ bytes = readRaw();
+ }
+ buffer.append(new byte[] { (byte) Type.MARKER.code });
+ return buffer.get();
+ }
+
+ /**
+ * Reads the map following a <code>Type.MAP</code> code.
+ * @return the obtained map
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public TreeMap readMap() throws IOException {
+ int length = readMapHeader();
+ TreeMap result = new TreeMap();
+ for (int i = 0; i < length; i++) {
+ Object key = read();
+ Object value = read();
+ result.put(key, value);
+ }
+ return result;
+ }
+
+ /**
+ * Reads the raw bytes following a <code>Type.MAP</code> code.
+ * @return the obtained bytes sequence
+ * @throws IOException
+ */
+ public byte[] readRawMap() throws IOException {
+ Buffer buffer = new Buffer();
+ int length = readMapHeader();
+ buffer.append(new byte[] {
+ (byte) Type.MAP.code,
+ (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)),
+ (byte) (0xff & (length >> 8)), (byte) (0xff & length)
+ });
+ for (int i = 0; i < length; i++) {
+ buffer.append(readRaw());
+ buffer.append(readRaw());
+ }
+ return buffer.get();
+ }
+
+ /**
+ * Reads the header following a <code>Type.MAP</code> code.
+ * @return the number of key-value pairs in the map
+ * @throws IOException
+ */
+ public int readMapHeader() throws IOException {
+ return in.readInt();
+ }
+
+}