You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2012/03/10 02:50:47 UTC

svn commit: r1299138 - in /hadoop/common/branches/branch-1.0: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/ src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/ src...

Author: acmurthy
Date: Sat Mar 10 01:50:45 2012
New Revision: 1299138

URL: http://svn.apache.org/viewvc?rev=1299138&view=rev
Log:
Merge -c 1299136 from branch-1 to branch-1.0 to fix HADOOP-1722. Allow hadoop streaming to handle non-utf8 byte array. Contributed by Klaas Bosteels and Matthias Lehmann.

Added:
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/AutoInputFormat.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/DumpTypedBytes.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/LoadTypedBytes.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
      - copied from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/IdentifierResolver.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/InputWriter.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/OutputReader.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesInputWriter.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/RawBytesOutputReader.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextInputWriter.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TextOutputReader.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesInputWriter.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/streaming/io/TypedBytesOutputReader.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/
      - copied from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/Type.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesInput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesOutput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordInput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesRecordOutput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritable.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableInput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/TypedBytesWritableOutput.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/java/org/apache/hadoop/typedbytes/package.html
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesMapApp.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/RawBytesReduceApp.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestAutoInputFormat.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestDumpTypedBytes.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestLoadTypedBytes.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestRawBytesStreaming.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestTypedBytesStreaming.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesMapApp.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TypedBytesReduceApp.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/
      - copied from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestIO.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java
      - copied unchanged from r1299136, hadoop/common/branches/branch-1/src/contrib/streaming/src/test/org/apache/hadoop/typedbytes/TestTypedBytesWritable.java
Modified:
    hadoop/common/branches/branch-1.0/CHANGES.txt
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

Modified: hadoop/common/branches/branch-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/CHANGES.txt?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.0/CHANGES.txt Sat Mar 10 01:50:45 2012
@@ -12,6 +12,9 @@ Release 1.0.2 - unreleased
     MAPREDUCE-3773. Add queue metrics with buckets for job run times. (omalley
     via acmurthy)
 
+    HADOOP-1722. Allow hadoop streaming to handle non-utf8 byte array. (Klaas
+    Bosteels and Matthias Lehmann via acmurthy)
+
   BUG FIXES
 
     HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Sat Mar 10 01:50:45 2012
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.streaming;
 
+import java.util.Arrays;
+
 import org.apache.hadoop.util.ToolRunner;
 
 /** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
@@ -27,11 +29,28 @@ import org.apache.hadoop.util.ToolRunner
 public class HadoopStreaming {
 
   public static void main(String[] args) throws Exception {
+    if (args.length < 1) {
+      System.err.println("No Arguments Given!");
+      System.exit(1);
+    }
     int returnStatus = 0;
-    StreamJob job = new StreamJob();
-    returnStatus = ToolRunner.run(job, args);
+    String cmd = args[0];
+    String[] remainingArgs = Arrays.copyOfRange(args, 1, args.length);
+    if (cmd.equalsIgnoreCase("dumptb")) {
+      DumpTypedBytes dumptb = new DumpTypedBytes();
+      returnStatus = ToolRunner.run(dumptb, remainingArgs);
+    } else if (cmd.equalsIgnoreCase("loadtb")) {
+      LoadTypedBytes loadtb = new LoadTypedBytes();
+      returnStatus = ToolRunner.run(loadtb, remainingArgs);
+    } else if (cmd.equalsIgnoreCase("streamjob")) {
+      StreamJob job = new StreamJob();
+      returnStatus = ToolRunner.run(job, remainingArgs);
+    } else { // for backward compatibility
+      StreamJob job = new StreamJob();
+      returnStatus = ToolRunner.run(job, args);
+    }
     if (returnStatus != 0) {
-      System.err.println("Streaming Job Failed!");
+      System.err.println("Streaming Command Failed!");
       System.exit(returnStatus);
     }
   }

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Sat Mar 10 01:50:45 2012
@@ -19,8 +19,6 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.nio.charset.CharacterCodingException;
-import java.io.IOException;
 import java.util.Date;
 import java.util.Map;
 import java.util.Iterator;
@@ -30,16 +28,20 @@ import java.util.Properties;
 
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.io.TextInputWriter;
+import org.apache.hadoop.streaming.io.TextOutputReader;
 import org.apache.hadoop.util.LineReader;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.UTF8ByteArrayUtils;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.BytesWritable;
 
 import org.apache.hadoop.fs.FileSystem;
 
@@ -49,15 +51,49 @@ public abstract class PipeMapRed {
 
   protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
 
-  /** The command to be spawned as a subprocess.
-   * Mapper/Reducer operations will delegate to it
+  /**
+   * Returns the Configuration.
    */
-  abstract String getPipeCommand(JobConf job);
-
-  abstract byte[] getFieldSeparator();
+  public Configuration getConfiguration() {
+    return job_;
+  }
+  
+  /**
+   * Returns the DataOutput to which the client input is written.
+   */
+  public DataOutput getClientOutput() {
+    return clientOut_;
+  }
+  
+  /**
+   * Returns the DataInput from which the client output is read.
+   */
+  public DataInput getClientInput() {
+    return clientIn_;
+  }
+  
+  /**
+   * Returns the input separator to be used.
+   */
+  public abstract byte[] getInputSeparator();
+  
+  /**
+   * Returns the field separator to be used.
+   */
+  public abstract byte[] getFieldSeparator();
 
-  abstract int getNumOfKeyFields();
+  /**
+   * Returns the number of key fields.
+   */
+  public abstract int getNumOfKeyFields();
 
+  
+  /** 
+   * Returns the command to be spawned as a subprocess.
+   * Mapper/Reducer operations will delegate to it
+   */
+  abstract String getPipeCommand(JobConf job);
+  
   abstract boolean getDoPipe();
 
   final static int OUTSIDE = 1;
@@ -120,7 +156,19 @@ public abstract class PipeMapRed {
 
       job_ = job;
       fs_ = FileSystem.get(job_);
-
+      
+      mapInputWriterClass_ = 
+        job_.getClass("stream.map.input.writer.class", 
+          TextInputWriter.class, InputWriter.class);
+      mapOutputReaderClass_ = 
+        job_.getClass("stream.map.output.reader.class",
+          TextOutputReader.class, OutputReader.class);
+      reduceInputWriterClass_ = 
+        job_.getClass("stream.reduce.input.writer.class",
+          TextInputWriter.class, InputWriter.class);
+      reduceOutputReaderClass_ = 
+        job_.getClass("stream.reduce.output.reader.class",
+          TextOutputReader.class, OutputReader.class);
       nonZeroExitIsFailure_ = job_.getBoolean("stream.non.zero.exit.is.failure", true);
       
       doPipe_ = getDoPipe();
@@ -280,13 +328,16 @@ public abstract class PipeMapRed {
     }
   }
 
-  void startOutputThreads(OutputCollector output, Reporter reporter) {
-    outThread_ = new MROutputThread(output, reporter);
+  void startOutputThreads(OutputCollector output, Reporter reporter) 
+    throws IOException {
+    inWriter_ = createInputWriter();
+    outReader_ = createOutputReader();
+    outThread_ = new MROutputThread(outReader_, output, reporter);
     outThread_.start();
     errThread_.setReporter(reporter);
   }
-
-  void waitOutputThreads() {
+  
+  void waitOutputThreads() throws IOException {
     try {
       if (outThread_ == null) {
         // This happens only when reducer has empty input(So reduce() is not
@@ -328,58 +379,46 @@ public abstract class PipeMapRed {
       //ignore
     }
   }
-
-  /**
-   * Split a line into key and value.
-   * @param line: a byte array of line containing UTF-8 bytes
-   * @param key: key of a record
-   * @param val: value of a record
-   * @throws IOException
-   */
-  void splitKeyVal(byte[] line, int length, Text key, Text val)
-  throws IOException {
-    int numKeyFields = getNumOfKeyFields();
-    byte[] separator = getFieldSeparator();
-    
-    // Need to find numKeyFields separators
-    int pos = UTF8ByteArrayUtils.findBytes(line, 0, length, separator);
-    for(int k=1; k<numKeyFields && pos!=-1; k++) {
-      pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length, 
-          length, separator);
-    }
-    try {
-      if (pos == -1) {
-        key.set(line, 0, length);
-        val.set("");
-      } else {
-        StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos, separator.length);
-      }
-    } catch (CharacterCodingException e) {
-      LOG.warn(StringUtils.stringifyException(e));
-    }
+  
+  
+  abstract InputWriter createInputWriter() throws IOException;
+  
+  InputWriter createInputWriter(Class<? extends InputWriter> inputWriterClass) 
+    throws IOException {
+    InputWriter inputWriter =
+      ReflectionUtils.newInstance(inputWriterClass, job_);
+    inputWriter.initialize(this);
+    return inputWriter;
+  }
+
+  abstract OutputReader createOutputReader() throws IOException;
+
+  OutputReader createOutputReader(Class<? extends OutputReader> outputReaderClass) 
+    throws IOException {
+    OutputReader outputReader =
+      ReflectionUtils.newInstance(outputReaderClass, job_);
+    outputReader.initialize(this);
+    return outputReader;
   }
-
+  
+  
   class MROutputThread extends Thread {
 
-    MROutputThread(OutputCollector output, Reporter reporter) {
+    MROutputThread(OutputReader outReader, OutputCollector outCollector,
+      Reporter reporter) {
       setDaemon(true);
-      this.output = output;
+      this.outReader = outReader;
+      this.outCollector = outCollector;
       this.reporter = reporter;
     }
 
     public void run() {
-      LineReader lineReader = null;
       try {
-        Text key = new Text();
-        Text val = new Text();
-        Text line = new Text();
-        lineReader = new LineReader((InputStream)clientIn_, job_);
         // 3/4 Tool to Hadoop
-        while (lineReader.readLine(line) > 0) {
-          answer = line.getBytes();
-          splitKeyVal(answer, line.getLength(), key, val);
-          output.collect(key, val);
-          line.clear();
+        while (outReader.readKeyValue()) {
+          Object key = outReader.getCurrentKey();
+          Object value = outReader.getCurrentValue();
+          outCollector.collect(key, value);
           numRecWritten_++;
           long now = System.currentTimeMillis();
           if (now-lastStdoutReport > reporterOutDelay_) {
@@ -394,21 +433,11 @@ public abstract class PipeMapRed {
             logflush();
           }
         }
-        if (lineReader != null) {
-          lineReader.close();
-        }
-        if (clientIn_ != null) {
-          clientIn_.close();
-          clientIn_ = null;
-          LOG.info("MROutputThread done");
-        }
       } catch (Throwable th) {
         outerrThreadsThrowable = th;
         LOG.warn(StringUtils.stringifyException(th));
+      } finally {
         try {
-          if (lineReader != null) {
-            lineReader.close();
-          }
           if (clientIn_ != null) {
             clientIn_.close();
             clientIn_ = null;
@@ -419,9 +448,9 @@ public abstract class PipeMapRed {
       }
     }
 
-    OutputCollector output;
-    Reporter reporter;
-    byte[] answer;
+    OutputReader outReader = null;
+    OutputCollector outCollector = null;
+    Reporter reporter = null;
     long lastStdoutReport = 0;
     
   }
@@ -540,9 +569,10 @@ public abstract class PipeMapRed {
           clientOut_.flush();
           clientOut_.close();
         }
+        waitOutputThreads();
       } catch (IOException io) {
+        LOG.warn(StringUtils.stringifyException(io));
       }
-      waitOutputThreads();
       if (sim != null) sim.destroy();
       logprintln("mapRedFinished");
     } catch (RuntimeException e) {
@@ -579,7 +609,7 @@ public abstract class PipeMapRed {
     //s += envline("PWD"); // =/home/crawler/hadoop/trunk
     s += "last Hadoop input: |" + mapredKey_ + "|\n";
     if (outThread_ != null) {
-      s += "last tool output: |" + outThread_.answer + "|\n";
+      s += "last tool output: |" + outReader_.getLastOutput() + "|\n";
     }
     s += "Date: " + new Date() + "\n";
     // s += envline("HADOOP_HOME");
@@ -611,37 +641,12 @@ public abstract class PipeMapRed {
     return msg;
   }
 
-  /**
-   * Write a value to the output stream using UTF-8 encoding
-   * @param value output value
-   * @throws IOException
-   */
-  void write(Object value) throws IOException {
-    byte[] bval;
-    int valSize;
-    if (value instanceof BytesWritable) {
-      BytesWritable val = (BytesWritable) value;
-      bval = val.getBytes();
-      valSize = val.getLength();
-    } else if (value instanceof Text) {
-      Text val = (Text) value;
-      bval = val.getBytes();
-      valSize = val.getLength();
-    } else {
-      String sval = value.toString();
-      bval = sval.getBytes("UTF-8");
-      valSize = bval.length;
-    }
-    clientOut_.write(bval, 0, valSize);
-  }
-
   long startTime_;
   long numRecRead_ = 0;
   long numRecWritten_ = 0;
   long numRecSkipped_ = 0;
   long nextRecReadLog_ = 1;
 
-  
   long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
 
   long reporterOutDelay_ = 10*1000L; 
@@ -656,9 +661,15 @@ public abstract class PipeMapRed {
   boolean debugFailDuring_;
   boolean debugFailLate_;
 
+  Class<? extends InputWriter> mapInputWriterClass_;
+  Class<? extends OutputReader> mapOutputReaderClass_;
+  Class<? extends InputWriter> reduceInputWriterClass_;
+  Class<? extends OutputReader> reduceOutputReaderClass_;
   boolean nonZeroExitIsFailure_;
   
   Process sim;
+  InputWriter inWriter_;
+  OutputReader outReader_;
   MROutputThread outThread_;
   String jobLog_;
   MRErrorThread errThread_;

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Sat Mar 10 01:50:45 2012
@@ -27,6 +27,9 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.io.TextInputWriter;
 import org.apache.hadoop.util.StringUtils;
 
 /** A generic Mapper bridge.
@@ -66,9 +69,11 @@ public class PipeMapper extends PipeMapR
     //records input.
     SkipBadRecords.setAutoIncrMapperProcCount(job, false);
     skipping = job.getBoolean("mapred.skip.on", false);
-    String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
-    ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
-
+    if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
+      String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
+      ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+    }
+    
     try {
       mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
       mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
@@ -99,11 +104,9 @@ public class PipeMapper extends PipeMapR
       // 2/4 Hadoop to Tool
       if (numExceptions_ == 0) {
         if (!this.ignoreKey) {
-          write(key);
-          clientOut_.write(getInputSeparator());
+          inWriter_.writeKey(key);
         }
-        write(value);
-        clientOut_.write('\n');
+        inWriter_.writeValue(value);
         if(skipping) {
           //flush the streams on every record input if running in skip mode
           //so that we don't buffer other records surrounding a bad record. 
@@ -132,18 +135,29 @@ public class PipeMapper extends PipeMapR
     mapRedFinished();
   }
 
-  byte[] getInputSeparator() {
+  @Override
+  public byte[] getInputSeparator() {
     return mapInputFieldSeparator;
   }
 
   @Override
-  byte[] getFieldSeparator() {
+  public byte[] getFieldSeparator() {
     return mapOutputFieldSeparator;
   }
 
   @Override
-  int getNumOfKeyFields() {
+  public int getNumOfKeyFields() {
     return numOfMapOutputKeyFields;
   }
 
+  @Override
+  InputWriter createInputWriter() throws IOException {
+    return super.createInputWriter(mapInputWriterClass_);
+  }
+
+  @Override
+  OutputReader createOutputReader() throws IOException {
+    return super.createOutputReader(mapOutputReaderClass_);
+  }
+
 }

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Sat Mar 10 01:50:45 2012
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.SkipBadRecords;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Writable;
@@ -97,10 +99,8 @@ public class PipeReducer extends PipeMap
                                    + StringUtils.stringifyException(
                                                                     outerrThreadsThrowable));
           }
-          write(key);
-          clientOut_.write(getInputSeparator());
-          write(val);
-          clientOut_.write('\n');
+          inWriter_.writeKey(key);
+          inWriter_.writeValue(val);
         } else {
           // "identity reduce"
           output.collect(key, val);
@@ -137,18 +137,29 @@ public class PipeReducer extends PipeMap
     mapRedFinished();
   }
 
-  byte[] getInputSeparator() {
+  @Override
+  public byte[] getInputSeparator() {
     return reduceInputFieldSeparator;
   }
 
   @Override
-  byte[] getFieldSeparator() {
+  public byte[] getFieldSeparator() {
     return reduceOutFieldSeparator;
   }
   
   @Override
-  int getNumOfKeyFields() {
+  public int getNumOfKeyFields() {
     return numOfReduceOutputKeyFields;
   }
+  
+  @Override
+  InputWriter createInputWriter() throws IOException {
+    return super.createInputWriter(reduceInputWriterClass_);
+  }
+
+  @Override
+  OutputReader createOutputReader() throws IOException {
+    return super.createOutputReader(reduceOutputReaderClass_);
+  }
 
 }

Modified: hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=1299138&r1=1299137&r2=1299138&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/common/branches/branch-1.0/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Sat Mar 10 01:50:45 2012
@@ -47,7 +47,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
@@ -63,7 +62,11 @@ import org.apache.hadoop.mapred.TextInpu
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
 import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
+import org.apache.hadoop.streaming.io.IdentifierResolver;
+import org.apache.hadoop.streaming.io.InputWriter;
+import org.apache.hadoop.streaming.io.OutputReader;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 
@@ -284,6 +287,7 @@ public class StreamJob implements Tool {
       inReaderSpec_ = (String)cmdLine.getOptionValue("inputreader"); 
       mapDebugSpec_ = (String)cmdLine.getOptionValue("mapdebug");    
       reduceDebugSpec_ = (String)cmdLine.getOptionValue("reducedebug");
+      ioSpec_ = (String)cmdLine.getOptionValue("io");
       
       String[] car = cmdLine.getOptionValues("cacheArchive");
       if (null != car && car.length > 0){
@@ -408,6 +412,8 @@ public class StreamJob implements Tool {
                                     "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
     Option cacheArchive = createOption("cacheArchive", 
                                        "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
+    Option io = createOption("io",
+                             "Optional.", "spec", 1, false);
     
     // boolean properties
     
@@ -437,6 +443,7 @@ public class StreamJob implements Tool {
       addOption(cmdenv).
       addOption(cacheFile).
       addOption(cacheArchive).
+      addOption(io).
       addOption(verbose).
       addOption(info).
       addOption(debug).
@@ -467,6 +474,7 @@ public class StreamJob implements Tool {
     "To run this script when a map task fails ");
     System.out.println("  -reducedebug <path>  Optional." +
     " To run this script when a reduce task fails ");
+    System.out.println("  -io <identifier>  Optional.");
     System.out.println("  -verbose");
     System.out.println();
     GenericOptionsParser.printGenericCommandUsage(System.out);
@@ -689,9 +697,38 @@ public class StreamJob implements Tool {
 
     jobConf_.setInputFormat(fmt);
 
-    jobConf_.setOutputKeyClass(Text.class);
-    jobConf_.setOutputValueClass(Text.class);
-
+    if (ioSpec_ != null) {
+      jobConf_.set("stream.map.input", ioSpec_);
+      jobConf_.set("stream.map.output", ioSpec_);
+      jobConf_.set("stream.reduce.input", ioSpec_);
+      jobConf_.set("stream.reduce.output", ioSpec_);
+    }
+    
+    Class<? extends IdentifierResolver> idResolverClass = 
+      jobConf_.getClass("stream.io.identifier.resolver.class",
+        IdentifierResolver.class, IdentifierResolver.class);
+    IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);
+    
+    idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.map.input.writer.class",
+      idResolver.getInputWriterClass(), InputWriter.class);
+    
+    idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.reduce.input.writer.class",
+      idResolver.getInputWriterClass(), InputWriter.class);
+    
+    idResolver.resolve(jobConf_.get("stream.map.output", IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.map.output.reader.class",
+      idResolver.getOutputReaderClass(), OutputReader.class);
+    jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
+    jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());
+    
+    idResolver.resolve(jobConf_.get("stream.reduce.output", IdentifierResolver.TEXT_ID));
+    jobConf_.setClass("stream.reduce.output.reader.class",
+      idResolver.getOutputReaderClass(), OutputReader.class);
+    jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
+    jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
+    
     jobConf_.set("stream.addenvironment", addTaskEnvironment_);
 
     if (mapCmd_ != null) {
@@ -962,6 +999,7 @@ public class StreamJob implements Tool {
   protected String additionalConfSpec_;
   protected String mapDebugSpec_;
   protected String reduceDebugSpec_;
+  protected String ioSpec_;
 
   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
   // encoding "a=b c=d"