You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/05 23:55:21 UTC

svn commit: r440503 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Author: cutting
Date: Tue Sep  5 14:55:20 2006
New Revision: 440503

URL: http://svn.apache.org/viewvc?view=rev&rev=440503
Log:
HADOOP-499.  Reduce the use of Strings in contrib/streaming, replacing them with Text for better performance.  Contributed by Hairong.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Sep  5 14:55:20 2006
@@ -110,6 +110,10 @@
 27. HADOOP-501.  Fix Configuration.toString() to handle URL resources.
     (Thomas Friol via cutting)
 
+28. HADOOP-499.  Reduce the use of Strings in contrib/streaming,
+    replacing them with Text for better performance.
+    (Hairong Kuang via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Sep  5 14:55:20 2006
@@ -17,7 +17,7 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.nio.channels.*;
+import java.nio.charset.CharacterCodingException;
 import java.io.IOException;
 import java.util.Date;
 import java.util.Map;
@@ -30,17 +30,12 @@
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 
 import org.apache.hadoop.fs.Path;
@@ -170,6 +165,9 @@
       if(debug_) {
         System.out.println("PipeMapRed: stream.debug=true");
       }
+      
+      joinDelay_ = job.getLong("stream.joindelay.milli", 0);
+      
       job_ = job;
 
       // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
@@ -228,7 +226,6 @@
       clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
       clientIn_  = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
       clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
-      doneLock_  = new Object();
       startTime_ = System.currentTimeMillis();
 
     } catch(Exception e) {
@@ -265,7 +262,7 @@
     if(log_ != null) {
       log_.println(s);
     } else {
-      System.err.println(s); // or LOG.info()
+      LOG.info(s); // or LOG.info()
     }
   }
 
@@ -341,28 +338,49 @@
 
   void startOutputThreads(OutputCollector output, Reporter reporter)
   {
-      outputDone_ = false;
-      errorDone_ = false;
       outThread_ = new MROutputThread(output, reporter);
       outThread_.start();
       errThread_ = new MRErrorThread(reporter);
       errThread_.start();
   }
+  
+  void waitOutputThreads() {
+      try {
+          sim.waitFor();
+          if(outThread_ != null) {
+              outThread_.join(joinDelay_);
+          }
+          if(errThread_ != null) {
+              errThread_.join(joinDelay_);
+          }
+      } catch(InterruptedException e) {
+          //ignore
+      }
+  }
 
-  void splitKeyVal(String line, UTF8 key, UTF8 val)
+  /**
+   * Split a line into key and value. Assume the delimitor is a tab.
+   * @param line: a byte array of line containing UTF-8 bytes
+   * @param key: key of a record
+   * @param val: value of a record
+   * @throws IOException
+   */
+  void splitKeyVal(byte [] line, Text key, Text val) throws IOException
   {
-    int pos;
-    if(keyCols_ == ALL_COLS) {
-      pos = -1;
-    } else {
-      pos = line.indexOf('\t');
+    int pos=-1;
+    if(keyCols_ != ALL_COLS) {
+        pos = UTF8ByteArrayUtils.findTab(line);
     }
-    if(pos == -1) {
-      key.set(line);
-      val.set("");
-    } else {
-      key.set(line.substring(0, pos));
-      val.set(line.substring(pos+1));
+    try {
+        if(pos == -1) {
+            key.set(line);
+            val.set("");
+        } else {
+            UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);
+        }
+    } catch (CharacterCodingException e) {
+        LOG.warn(e);
+        StringUtils.stringifyException(e);
     }
   }
   
@@ -375,41 +393,33 @@
       this.reporter = reporter;
     }
     public void run() {
-      try {
-            try {
-              UTF8 EMPTY = new UTF8("");
-              UTF8 key = new UTF8();
-              UTF8 val = new UTF8();
-              // 3/4 Tool to Hadoop
-              while((answer = clientIn_.readLine()) != null) {
+        try {
+            Text key = new Text();
+            Text val = new Text();
+            // 3/4 Tool to Hadoop
+            while((answer=UTF8ByteArrayUtils.readLine(clientIn_))!= null) {
                 // 4/4 Hadoop out
                 if(optSideEffect_) {
-                  sideEffectOut_.write(answer.getBytes());
-                  sideEffectOut_.write('\n');
+                    sideEffectOut_.write(answer);
+                    sideEffectOut_.write('\n');
                 } else {
-                  splitKeyVal(answer, key, val);
-                  output.collect(key, val);
-                  numRecWritten_++;
-                  if(numRecWritten_ % 100 == 0) {
+                    splitKeyVal(answer, key, val);
+                    output.collect(key, val);
+                }
+                numRecWritten_++;
+                if(numRecWritten_ % 100 == 0) {
                     logprintln(numRecRead_+"/"+numRecWritten_);
                     logflush();
-                  }
                 }
-              }
-            } catch(IOException io) {
-              io.printStackTrace(log_);
             }
-            logprintln("MROutputThread done");
-      } finally {
-          outputDone_ = true;
-          synchronized(doneLock_) {
-            doneLock_.notifyAll();
-          }
-      }
+        } catch(IOException io) {
+            io.printStackTrace(log_);
+        }
+        logprintln("MROutputThread done");
     }
     OutputCollector output;
     Reporter reporter;
-    String answer;
+    byte [] answer;
   }
 
   class MRErrorThread extends Thread
@@ -421,26 +431,21 @@
     }
     public void run()
     {
-      String line;
+      byte [] line;
       try {
         long num = 0;
-        int bucket = 100;
-        while((line=clientErr_.readLine()) != null) {
+        while((line=UTF8ByteArrayUtils.readLine(clientErr_)) != null) {
           num++;
-          logprintln(line);
+          String lineStr = new String(line, "UTF-8"); 
+          logprintln(lineStr);
           if(num < 10) {
-            String hline = "MRErr: " + line;
+            String hline = "MRErr: " + lineStr;
             System.err.println(hline);
             reporter.setStatus(hline);
           }
         }
       } catch(IOException io) {
         io.printStackTrace(log_);
-      } finally {
-        errorDone_ = true;
-        synchronized(doneLock_) {
-          doneLock_.notifyAll();
-        }
       }
     }
     Reporter reporter;
@@ -448,42 +453,31 @@
 
   public void mapRedFinished()
   {
-    logprintln("mapRedFinished");
-    try {
-    if(!doPipe_) return;
-    try {
-      if(optSideEffect_) {
-        logprintln("closing " + sideEffectPath_);
-        sideEffectOut_.close();
-        logprintln("closed  " + sideEffectPath_);
-      }
-    } catch(IOException io) {
-      io.printStackTrace();
-    }
-    try {
-      if(clientOut_ != null) {
-        clientOut_.close();
-      }
-    } catch(IOException io) {
-    }
-    if(outThread_ == null) {
-      // no input records: threads were never spawned
-    } else {
+      logprintln("mapRedFinished");
+      if(!doPipe_) return;
+
       try {
-        while(!outputDone_ || !errorDone_) {
-          synchronized(doneLock_) {
-            doneLock_.wait();
+          try {
+              if(clientOut_ != null) {
+                  clientOut_.close();
+              }
+          } catch(IOException io) {
           }
-        }
-      } catch(InterruptedException ie) {
-        ie.printStackTrace();
+          waitOutputThreads();
+          try {
+              if(optSideEffect_) {
+                  logprintln("closing " + sideEffectPath_);
+                  sideEffectOut_.close();
+                  logprintln("closed  " + sideEffectPath_);
+              }
+          } catch(IOException io) {
+              io.printStackTrace();
+          }
+          sim.destroy();
+      } catch(RuntimeException e) {
+          e.printStackTrace(log_);
+          throw e;
       }
-    }
-      sim.destroy();
-    } catch(RuntimeException e) {
-      e.printStackTrace(log_);
-      throw e;
-    }
   }
 
   void maybeLogRecord()
@@ -543,7 +537,30 @@
       return msg;
   }
 
-
+  /**
+   * Write a writable value to the output stream using UTF-8 encoding
+   * @param value output value
+   * @throws IOException
+   */
+  void write(Writable value) throws IOException {
+      byte[] bval;
+      int valSize;
+      if(value instanceof BytesWritable) {
+          BytesWritable val = (BytesWritable)value;
+          bval = val.get();
+          valSize = val.getSize();
+      } else if(value instanceof Text){
+          Text val = (Text)value;
+          bval = val.getBytes();
+          valSize = val.getLength();
+      } else  {
+          String sval = value.toString();
+          bval = sval.getBytes("UTF-8");
+          valSize = bval.length;
+      }
+      clientOut_.write(bval, 0, valSize);
+  }
+  
   long startTime_;
   long numRecRead_ = 0;
   long numRecWritten_ = 0;
@@ -555,6 +572,7 @@
   int keyCols_;
   final static int ALL_COLS = Integer.MAX_VALUE;
 
+  long joinDelay_;
   JobConf job_;
 
   // generic MapRed parameters passed on by hadoopStreaming
@@ -565,16 +583,13 @@
   boolean debug_;
 
   Process sim;
-  Object doneLock_;
   MROutputThread outThread_;
+  String jobLog_;
   MRErrorThread errThread_;
-  boolean outputDone_;
-  boolean errorDone_;
   DataOutputStream clientOut_;
   DataInputStream  clientErr_;
   DataInputStream   clientIn_;
 
-  String jobLog_;
   // set in PipeMapper/PipeReducer subclasses
   String mapredKey_;
   int numExceptions_;

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Tue Sep  5 14:55:20 2006
@@ -17,17 +17,12 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.util.Iterator;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
@@ -71,34 +66,20 @@
     }
     try {
       // 1/4 Hadoop in
-      if(key instanceof BytesWritable) {
-        BytesWritable bKey = (BytesWritable)key;
-        mapredKey_ = new String(bKey.get(), 0, bKey.getSize(), "UTF-8");
-      } else {
-        mapredKey_ = key.toString();        
-      }
       numRecRead_++;
-
       maybeLogRecord();
 
       // 2/4 Hadoop to Tool
       if(numExceptions_==0) {
-        String sval;
-        if(value instanceof BytesWritable) {
-          BytesWritable bVal = (BytesWritable)value;
-          sval = new String(bVal.get(), 0, bVal.getSize(), "UTF-8");
-        } else {
-          sval = value.toString();
-        }
-        if(optUseKey_) {
-          clientOut_.writeBytes(mapredKey_);
-          clientOut_.writeBytes("\t");
-        }
-        clientOut_.writeBytes(sval);
-        clientOut_.writeBytes("\n");
-        clientOut_.flush();
+          if(optUseKey_) {
+              write(key);
+              clientOut_.write('\t');
+          }
+          write(value);
+          clientOut_.write('\n');
+          clientOut_.flush();
       } else {
-        numRecSkipped_++;
+          numRecSkipped_++;
       }
     } catch(IOException io) {
       numExceptions_++;
@@ -106,6 +87,7 @@
         // terminate with failure
         String msg = logFailure(io);
         appendLogToJobLog("failure");
+        mapRedFinished();
         throw new IOException(msg);
       } else {
         // terminate with success:

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Tue Sep  5 14:55:20 2006
@@ -20,13 +20,10 @@
 import java.util.Iterator;
 
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
@@ -61,10 +58,10 @@
         numRecRead_++;
         maybeLogRecord();
         if(doPipe_) {
-          clientOut_.writeBytes(key.toString());
-          clientOut_.writeBytes("\t");
-          clientOut_.writeBytes(val.toString());
-          clientOut_.writeBytes("\n");
+          write(key);
+          clientOut_.write('\t');
+          write(val);
+          clientOut_.write('\n');
           clientOut_.flush();
         } else {
           // "identity reduce"
@@ -73,6 +70,7 @@
       }
     } catch(IOException io) {
       appendLogToJobLog("failure");
+      mapRedFinished();
       throw new IOException(getContext() + io.getMessage());    
     }
   }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Tue Sep  5 14:55:20 2006
@@ -18,7 +18,7 @@
 
 import java.io.*;
 
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.fs.Path;
@@ -95,11 +95,11 @@
   }
 
   public WritableComparable createKey() {
-    return new UTF8();
+    return new Text();
   }
   
   public Writable createValue() {
-    return new UTF8();
+    return new Text();
   }
   
   /// StreamBaseRecordReader API
@@ -123,12 +123,14 @@
   public abstract void seekNextRecordBoundary() throws IOException;
   
     
-  void numRecStats(CharSequence record) throws IOException
+  void numRecStats(byte[] record, int start, int len) throws IOException
   {
     numRec_++;          
     if(numRec_ == nextStatusRec_) {
+      String recordStr = new String(record, start, 
+                Math.min(len, statusMaxRecordChars_), "UTF-8");    
       nextStatusRec_ +=100;//*= 10;
-      String status = getStatus(record);
+      String status = getStatus(recordStr);
       LOG.info(status);
       reporter_.setStatus(status);
     }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Sep  5 14:55:20 2006
@@ -28,7 +28,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.hadoop.mapred.JobConf;
@@ -417,19 +417,19 @@
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
     for(int i=0; i<inputGlobs_.size(); i++) {
-      jobConf_.addInputDir(new File((String)inputGlobs_.get(i)));
+      jobConf_.addInputPath(new Path((String)inputGlobs_.get(i)));
     }
 
     jobConf_.setInputFormat(StreamInputFormat.class);
     // for SequenceFile, input classes may be overriden in getRecordReader
-    jobConf_.setInputKeyClass(UTF8.class);
-    jobConf_.setInputValueClass(UTF8.class);
+    jobConf_.setInputKeyClass(Text.class);
+    jobConf_.setInputValueClass(Text.class);
 
-    jobConf_.setOutputKeyClass(UTF8.class);
-    jobConf_.setOutputValueClass(UTF8.class);
+    jobConf_.setOutputKeyClass(Text.class);
+    jobConf_.setOutputValueClass(Text.class);
     //jobConf_.setCombinerClass();
 
-    jobConf_.setOutputDir(new File(output_));
+    jobConf_.setOutputPath(new Path(output_));
     jobConf_.setOutputFormat(StreamOutputFormat.class);
 
     jobConf_.set("stream.addenvironment", addTaskEnvironment_);

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Tue Sep  5 14:55:20 2006
@@ -17,17 +17,18 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.nio.charset.MalformedInputException;
 import java.util.zip.GZIPInputStream; 
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Similar to org.apache.hadoop.mapred.TextRecordReader, 
@@ -75,68 +76,49 @@
   }
 
   public synchronized boolean next(Writable key, Writable value)
-    throws IOException {
-    if(gzipped_) {
-      // figure EOS from readLine
-    } else {
-      long pos = in_.getPos();
-      if (pos >= end_)
-        return false;
-    }
-
-    //((LongWritable)key).set(pos);      // key is position
-    //((UTF8)value).set(readLine(in));   // value is line
-    String line = readLine(din_);
-    if(line == null) {
-        return false; // for gzipped_
-    }
-
-    // key is line up to TAB, value is rest
-    final boolean NOVAL = false;
-    if(NOVAL) {
-        ((UTF8)key).set(line);
-        ((UTF8)value).set("");
-    } else {
-      int tab = line.indexOf('\t');
-      if(tab == -1) {
-        ((UTF8)key).set(line);
-        ((UTF8)value).set("");
-      } else {
-        ((UTF8)key).set(line.substring(0, tab));
-        ((UTF8)value).set(line.substring(tab+1));
-      }
+    throws IOException {  
+    if(!(key instanceof Text)) {
+        throw new IllegalArgumentException(
+                "Key should be of type Text but: "+key.getClass().getName());
+    }
+    if(!(value instanceof Text)) {
+        throw new IllegalArgumentException(
+                "Value should be of type Text but: "+value.getClass().getName());
     }
-    numRecStats(line);
-    return true;
-  }
-
 
-  // from TextInputFormat
-  private static String readLine(InputStream in) throws IOException {
-    StringBuffer buffer = new StringBuffer();
-    boolean over = true;
-    while (true) {
-
-      int b = in.read();
-      if (b == -1)
-        break;
-      
-      over = false;
-      char c = (char)b;              // bug: this assumes eight-bit characters.
-      if (c == '\r' || c == '\n')    // TODO || c == '\t' here
-        break;
-
-      buffer.append(c);
-    }
+    Text tKey = (Text)key;
+    Text tValue = (Text)value;
+    byte [] line;
     
-    if(over) {
-      return null;
-    } else {
-      return buffer.toString();
+    while (true) {
+        if(gzipped_) {
+            // figure EOS from readLine
+        } else {
+            long pos = in_.getPos();
+            if (pos >= end_)
+                return false;
+        }
+        
+        line = UTF8ByteArrayUtils.readLine(in_);
+        if(line==null)
+            return false;
+        try {
+            int tab=UTF8ByteArrayUtils.findTab(line);
+            if(tab == -1) {
+                tKey.set(line);
+                tValue.set("");
+            } else {
+                UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
+            }
+            break;
+        } catch (MalformedInputException e) {
+            LOG.warn(e);
+            StringUtils.stringifyException(e);
+        }
     }
-    
+    numRecStats( line, 0, line.length );
+    return true;
   }
-
   boolean gzipped_;
   GZIPInputStream zin_;
   DataInputStream din_; // GZIP or plain

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Tue Sep  5 14:55:20 2006
@@ -68,7 +68,8 @@
         success = false;
       }
     } while(!success);
-    numRecStats("");
+    
+    numRecStats(new byte[0], 0, 0);
     return more_;
   }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Tue Sep  5 14:55:20 2006
@@ -19,14 +19,12 @@
 import java.io.*;
 import java.util.regex.*;
 
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -76,18 +74,22 @@
       return false;
     }
     
-    StringBuffer buf = new StringBuffer();
+    DataOutputBuffer buf = new DataOutputBuffer();
     if(!readUntilMatchBegin()) {
         return false;
     }
     if(!readUntilMatchEnd(buf)) {
         return false;
     }
-    numRecStats(buf);
     
     // There is only one elem..key/value splitting is not done here.
-    ((UTF8)key).set(buf.toString());
-    ((UTF8)value).set("");
+    byte [] record = new byte[buf.getLength()];
+    System.arraycopy(buf.getData(), 0, record, 0, record.length);
+    
+    numRecStats(record, 0, record.length); 
+
+    ((Text)key).set(record);
+    ((Text)value).set("");
     
     /*if(numNext < 5) {
         System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
@@ -111,7 +113,7 @@
     }
   }
   
-  boolean readUntilMatchEnd(StringBuffer buf) throws IOException
+  private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException
   {
     if(slowMatch_) {
       return slowReadUntilMatch(endPat_, true, buf);
@@ -121,7 +123,8 @@
   }
   
   
-  boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, StringBuffer outBufOrNull) 
+  private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, 
+          DataOutputBuffer outBufOrNull) 
     throws IOException   
   {
     try {
@@ -131,7 +134,10 @@
       boolean success = true;
       in_.mark(lookAhead_ + 2);
       read = in_.read(buf);
-      String sbuf = new String(buf);        
+      if( read == -1 )
+          return false;
+      
+      String sbuf = new String(buf, 0, read, "UTF-8");        
       Matcher match = markPattern.matcher(sbuf);
 
       firstMatchStart_ = NA;
@@ -176,16 +182,11 @@
       if(matched) {
         int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
         //System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_=" + firstMatchEnd_);
-        String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
+        //String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
         //System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern);
         if(outBufOrNull != null) {
-          buf = new byte[endPos];
           in_.reset();      
-          read = in_.read(buf);
-          if(read != endPos) {
-              //System.out.println("@@@ BAD re-read less: " + read + " < " + endPos);
-          }          
-          outBufOrNull.append(new String(buf));
+          outBufOrNull.write(in_,endPos);  
         } else {
           //System.out.println("Skip to " + (inStart + endPos));
           in_.seek(inStart + endPos);
@@ -255,10 +256,12 @@
   
   
   
-  boolean fastReadUntilMatch(String textPat, boolean includePat, StringBuffer outBufOrNull) throws IOException 
+  boolean fastReadUntilMatch(String textPat, 
+          boolean includePat, 
+          DataOutputBuffer outBufOrNull) throws IOException 
   {
     //System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos());  
-    char[] cpat = textPat.toCharArray();
+    byte[] cpat = textPat.getBytes("UTF-8");
     int m = 0;
     boolean match = false;
     long markPos = -1;
@@ -273,10 +276,7 @@
       if (b == -1)
         break;
 
-      char c = (char)b; // this assumes eight-bit matching. OK with UTF-8
-      if(outBufOrNull != null) {
-        outBufOrNull.append(c);
-      }
+      byte c = (byte)b; // this assumes eight-bit matching. OK with UTF-8
       if (c == cpat[m]) {
         m++;
         if(m==msup) {
@@ -284,16 +284,20 @@
           break;
         }
       } else {
+        if(outBufOrNull != null) {
+          outBufOrNull.write(cpat, 0, m);
+          outBufOrNull.write(c);
+        }
+        
         m = 0;
       }
     }
     if(!includePat && match) {
-      if(outBufOrNull != null) {
-        outBufOrNull.setLength(outBufOrNull.length() - textPat.length());
-      }
       long pos = in_.getPos() - textPat.length();
       in_.reset();
       in_.seek(pos);
+    } else if(outBufOrNull != null){
+      outBufOrNull.write(cpat);
     }
     //System.out.println("@@@DONE  readUntilMatch inPos=" + in_.getPos() + " includePat=" + includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|");
     return match;

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=auto&rev=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Tue Sep  5 14:55:20 2006
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * General utils for byte array containing UTF-8 encoded strings
+ * @author hairong 
+ */
+
+public class UTF8ByteArrayUtils {
+    /**
+     * Find the first occured tab in a UTF-8 encoded string
+     * @param utf: a byte array containing a UTF-8 encoded string
+     * @return position that first tab occures otherwise -1
+     */
+    public static int findTab(byte [] utf) {
+        for(int i=0; i<utf.length; i++) {
+            if(utf[i]==(byte)'\t') {
+                return i;
+            }
+          }
+          return -1;      
+    }
+    
+    /**
+     * split a UTF-8 byte array into key and value 
+     * assuming that the delimilator is at splitpos. 
+     * @param ut: utf-8 encoded string
+     * @param key: contains key upon the method is returned
+     * @param val: contains value upon the method is returned
+     * @param splitPos: the split pos
+     * @throws IOException: when 
+     */
+    public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos) 
+    throws IOException {
+        if(splitPos<0 || splitPos >= utf.length)
+            throw new IllegalArgumentException(
+                    "splitPos must be in the range [0, "+splitPos+"]: " +splitPos);
+        byte [] keyBytes = new byte[splitPos];
+        System.arraycopy(utf, 0, keyBytes, 0, splitPos);
+        int valLen = utf.length-splitPos-1;
+        byte [] valBytes = new byte[valLen];
+        System.arraycopy(utf,splitPos+1, valBytes, 0, valLen );
+        key.set(keyBytes);
+        val.set(valBytes);
+    }
+    
+    /**
+     * Read a utf8 encoded line from a data input stream. 
+     * @param in data input stream
+     * @return a byte array containing the line 
+     * @throws IOException
+     */
+    public static byte[] readLine(DataInputStream in) throws IOException {
+      byte [] buf = new byte[128];
+      byte [] lineBuffer = buf;
+      int room = 128;
+      int offset = 0;
+      boolean isEOF = false;
+      while (true) {
+        int b = in.read();
+        if (b == -1) {
+          isEOF = true;
+          break;
+        }
+
+        char c = (char)b;
+        if (c == '\r' || c == '\n')
+          break;
+
+        if (--room < 0) {
+            buf = new byte[offset + 128];
+            room = buf.length - offset - 1;
+            System.arraycopy(lineBuffer, 0, buf, 0, offset);
+            lineBuffer = buf;
+        }
+        buf[offset++] = (byte) c;
+      }
+
+      if(isEOF && offset==0) {
+          return null;
+      } else {
+          lineBuffer = new byte[offset];
+          System.arraycopy(buf, 0, lineBuffer, 0, offset);
+          return lineBuffer;
+      }
+    }
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Tue Sep  5 14:55:20 2006
@@ -17,7 +17,6 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.util.*;
 import org.apache.hadoop.streaming.Environment;
 
 /** A minimal Java implementation of /usr/bin/tr.
@@ -51,12 +50,12 @@
     // property names have been escaped in PipeMapRed.safeEnvVarName()
     expect("mapred_input_format_class", "org.apache.hadoop.streaming.StreamInputFormat");
     expect("mapred_job_tracker", "local");
-    expect("mapred_input_key_class", "org.apache.hadoop.io.UTF8");
-    expect("mapred_input_value_class", "org.apache.hadoop.io.UTF8");
+    expect("mapred_input_key_class", "org.apache.hadoop.io.Text");
+    expect("mapred_input_value_class", "org.apache.hadoop.io.Text");
     expect("mapred_local_dir", "build/test/mapred/local");
     expect("mapred_output_format_class", "org.apache.hadoop.streaming.StreamOutputFormat");
-    expect("mapred_output_key_class", "org.apache.hadoop.io.UTF8");
-    expect("mapred_output_value_class", "org.apache.hadoop.io.UTF8");
+    expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
+    expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
 
     expect("mapred_task_is_map", "true");
     expect("mapred_reduce_tasks", "1");