You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2007/04/16 21:35:18 UTC

svn commit: r529378 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: tomwhite
Date: Mon Apr 16 12:35:15 2007
New Revision: 529378

URL: http://svn.apache.org/viewvc?view=rev&rev=529378
Log:
HADOOP-1214.  Replace streaming classes with new counterparts from Hadoop core.  Contributed by Runping Qi.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Apr 16 12:35:15 2007
@@ -201,6 +201,9 @@
 60. HADOOP-1256.  Fix NameNode so that multiple DataNodeDescriptors
     can no longer be created on startup.  (Hairong Kuang via cutting)
 
+61. HADOOP-1214.  Replace streaming classes with new counterparts 
+    from Hadoop core.  (Runping Qi via tomwhite)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Apr 16 12:35:15 2007
@@ -20,85 +20,39 @@
 
 import java.io.*;
 import java.lang.reflect.*;
-import java.util.ArrayList;
 
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.compress.GzipCodec;
 
 import org.apache.hadoop.mapred.*;
 
-/** An input format that performs globbing on DFS paths and
- * selects a RecordReader based on a JobConf property.
- * @author Michel Tourn
+/** An input format that selects a RecordReader based on a JobConf property.
+ *  This should be used only for non-standard record reader such as 
+ *  StreamXmlRecordReader. For all other standard 
+ *  record readers, the appropriate input format classes should be used.
  */
-public class StreamInputFormat extends TextInputFormat {
-
-  // an InputFormat should be public with the synthetic public default constructor
-  // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
-
-  protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
-
-  static boolean isGzippedInput(JobConf job) {
-    String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
-    return "gzip".equals(val);
-  }
+public class StreamInputFormat extends KeyValueTextInputFormat {
 
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-
-    if (isGzippedInput(job)) {
-      return getFullFileSplits(job);
-    } else {
-      return super.getSplits(job, numSplits);
-    }
-  }
-  /** For the compressed-files case: override InputFormatBase to produce one split. */
-  FileSplit[] getFullFileSplits(JobConf job) throws IOException {
-    Path[] files = listPaths(job);
-    int numSplits = files.length;
-    ArrayList splits = new ArrayList(numSplits);
-    for (int i = 0; i < files.length; i++) {
-      Path file = files[i];
-      long splitSize = file.getFileSystem(job).getLength(file);
-      splits.add(new FileSplit(file, 0, splitSize, job));
+  public RecordReader getRecordReader(final InputSplit genericSplit,
+      JobConf job, Reporter reporter) throws IOException {
+    String c = job.get("stream.recordreader.class");
+    if (c == null || c.indexOf("LineRecordReader") >= 0) {
+      return super.getRecordReader(genericSplit, job, reporter);
     }
-    return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
-  }
 
-  public RecordReader getRecordReader(final InputSplit genericSplit, 
-                                      JobConf job,
-                                      Reporter reporter) throws IOException {
+    // handling non-standard record reader (likely StreamXmlRecordReader) 
     FileSplit split = (FileSplit) genericSplit;
     LOG.info("getRecordReader start.....split=" + split);
     reporter.setStatus(split.toString());
 
-    long start = split.getStart();
-    long length  = split.getLength();
-    
     // Open the file and seek to the start of the split
     FileSystem fs = split.getPath().getFileSystem(job);
     FSDataInputStream in = fs.open(split.getPath());
-    if (isGzippedInput(job)) {
-      length = Long.MAX_VALUE;
-    } else if (start != 0) {
-      in.seek(start-1);
-      LineRecordReader.readLine(in, null);
-      long oldStart = start;
-      start = in.getPos();
-      length -= (start - oldStart); 
-    }
-    // Ugly hack! 
-    split = new FileSplit(split.getPath(), start, length, job);
 
     // Factory dispatch based on available params..
     Class readerClass;
-    String c = job.get("stream.recordreader.class");
-    if (c == null) {
-      readerClass = StreamLineRecordReader.class;
-    } else {
+
+    {
       readerClass = StreamUtil.goodClassOrNull(c, null);
       if (readerClass == null) {
         throw new RuntimeException("Class not found: " + c);
@@ -107,27 +61,19 @@
 
     Constructor ctor;
     try {
-      ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, FileSplit.class,
-          Reporter.class, JobConf.class, FileSystem.class });
+      ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class,
+          FileSplit.class, Reporter.class, JobConf.class, FileSystem.class });
     } catch (NoSuchMethodException nsm) {
       throw new RuntimeException(nsm);
     }
 
     RecordReader reader;
     try {
-      reader = (RecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
-          fs });
+      reader = (RecordReader) ctor.newInstance(new Object[] { in, split,
+          reporter, job, fs });
     } catch (Exception nsm) {
       throw new RuntimeException(nsm);
     }
-
-    if (reader instanceof StreamSequenceRecordReader) {
-      // override k/v class types with types stored in SequenceFile
-      StreamSequenceRecordReader ss = (StreamSequenceRecordReader) reader;
-      job.setInputKeyClass(ss.rin_.getKeyClass());
-      job.setInputValueClass(ss.rin_.getValueClass());
-    }
-
     return reader;
   }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Apr 16 12:35:15 2007
@@ -59,7 +59,11 @@
 import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
 import org.apache.log4j.helpers.OptionConverter;
@@ -235,6 +239,10 @@
         userJobConfProps_.put("fs.default.name", jt);        
       }
       
+      additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec"); 
+      inputFormatSpec_ = (String)cmdLine.getValue("-inputformat"); 
+      outputFormatSpec_ = (String)cmdLine.getValue("-outputformat"); 
+      partitionerSpec_ = (String)cmdLine.getValue("-partitioner");
       inReaderSpec_ = (String)cmdLine.getValue("-inputreader"); 
       
       List<String> car = cmdLine.getValues("-cacheArchive"); 
@@ -381,6 +389,14 @@
         "Optional. Override DFS configuration", "<h:p>|local", 1, false); 
     Option jt = createOption("jt", 
         "Optional. Override JobTracker configuration", "<h:p>|local",1, false);
+    Option additionalconfspec = createOption("additionalconfspec", 
+        "Optional.", "spec",1, false );
+    Option inputformat = createOption("inputformat", 
+        "Optional.", "spec",1, false );
+    Option outputformat = createOption("outputformat", 
+        "Optional.", "spec",1, false );
+    Option partitioner = createOption("partitioner", 
+        "Optional.", "spec",1, false );
     Option inputreader = createOption("inputreader", 
         "Optional.", "spec",1, false );
     Option cacheFile = createOption("cacheFile", 
@@ -405,6 +421,10 @@
                           withOption(file).
                           withOption(dfs).
                           withOption(jt).
+                          withOption(additionalconfspec).
+                          withOption(inputformat).
+                          withOption(outputformat).
+                          withOption(partitioner).
                           withOption(inputreader).
                           withOption(jobconf).
                           withOption(cmdenv).
@@ -438,6 +458,10 @@
       //System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
       System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");
       System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");
+      System.out.println("  -additionalconfspec specfile  Optional.");
+      System.out.println("  -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat  Optional.");
+      System.out.println("  -outputformat specfile  Optional.");
+      System.out.println("  -partitioner specfile  Optional.");
       System.out.println("  -inputreader <spec>  Optional.");
       System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
       System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
@@ -645,6 +669,10 @@
     } else {
       // use only defaults: hadoop-default.xml and hadoop-site.xml
     }
+    System.out.println("additionalConfSpec_:" + additionalConfSpec_);
+    if (additionalConfSpec_ != null) {
+      config_.addDefaultResource(new Path(additionalConfSpec_));
+    }
     Iterator it = configPath_.iterator();
     while (it.hasNext()) {
       String pathName = (String) it.next();
@@ -670,29 +698,53 @@
     jobConf_.setBoolean("stream.inputtagged", inputTagged_);
     jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
 
-    Class fmt;
-    if (testMerge_ && false == hasSimpleInputSpecs_) {
-      // this ignores -inputreader
-      fmt = MergerInputFormat.class;
-    } else {
-      // need to keep this case to support custom -inputreader 
-      // and their parameters ,n=v,n=v
-      fmt = StreamInputFormat.class;
+    String defaultPackage = this.getClass().getPackage().getName();
+    Class c;
+    Class fmt = null;
+    if (inReaderSpec_ == null && inputFormatSpec_ == null) {
+      fmt = KeyValueTextInputFormat.class;
+    } else if (inputFormatSpec_ != null) {
+      if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0)
+          || (inputFormatSpec_
+              .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) {
+        fmt = KeyValueTextInputFormat.class;
+      } else if ((inputFormatSpec_
+          .compareToIgnoreCase("SequenceFileInputFormat") == 0)
+          || (inputFormatSpec_
+              .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
+        fmt = SequenceFileInputFormat.class;
+      } else if ((inputFormatSpec_
+          .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
+          || (inputFormatSpec_
+              .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
+        fmt = SequenceFileAsTextInputFormat.class;
+      } else {
+        c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
+        if (c != null) {
+          fmt = c;
+        } else {
+
+        }
+      }
+    } 
+    if (fmt == null) {
+      if (testMerge_ && false == hasSimpleInputSpecs_) {
+        // this ignores -inputreader
+        fmt = MergerInputFormat.class;
+      } else {
+        // need to keep this case to support custom -inputreader
+        // and their parameters ,n=v,n=v
+        fmt = StreamInputFormat.class;
+      }
     }
-    jobConf_.setInputFormat(fmt);
 
-    // for SequenceFile, input classes may be overriden in getRecordReader
-    jobConf_.setInputKeyClass(Text.class);
-    jobConf_.setInputValueClass(Text.class);
+    jobConf_.setInputFormat(fmt);
 
     jobConf_.setOutputKeyClass(Text.class);
     jobConf_.setOutputValueClass(Text.class);
 
     jobConf_.set("stream.addenvironment", addTaskEnvironment_);
 
-    String defaultPackage = this.getClass().getPackage().getName();
-
-    Class c;
     if (mapCmd_ != null) {
       c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
       if (c != null) {
@@ -748,13 +800,29 @@
     // output setup is done late so we can customize for reducerNone_
     //jobConf_.setOutputDir(new File(output_));
     setOutputSpec();
-    if (testMerge_) {
-      fmt = MuxOutputFormat.class;
-    } else {
-      fmt = StreamOutputFormat.class;
+    fmt = null;
+    if (outputFormatSpec_!= null) {
+      c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage);
+      if (c != null) {
+        fmt = c;
+      } 
+    }
+    if (fmt == null) {
+      if (testMerge_) {
+        fmt = MuxOutputFormat.class;
+      } else {
+        fmt = TextOutputFormat.class;
+      }
     }
     jobConf_.setOutputFormat(fmt);
 
+    if (partitionerSpec_!= null) {
+      c = StreamUtil.goodClassOrNull(partitionerSpec_, defaultPackage);
+      if (c != null) {
+        jobConf_.setPartitionerClass(c);
+      } 
+    }
+    
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
 
@@ -1042,6 +1110,10 @@
   protected ArrayList configPath_ = new ArrayList(); // <String>
   protected String hadoopAliasConf_;
   protected String inReaderSpec_;
+  protected String inputFormatSpec_;
+  protected String outputFormatSpec_;
+  protected String partitionerSpec_;
+  protected String additionalConfSpec_;
 
   protected boolean testMerge_;
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Mon Apr 16 12:35:15 2007
@@ -18,128 +18,21 @@
 
 package org.apache.hadoop.streaming;
 
-import java.io.*;
+import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.mapred.LineRecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.KeyValueLineRecordReader;
 import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
 
 /**
- * Similar to org.apache.hadoop.mapred.TextRecordReader, 
- * but delimits key and value with a TAB.
- * @author Michel Tourn
+ * same as org.apache.hadoop.mapred.KeyValueLineRecordReader
+ * 
+ * @deprecated
  */
-public class StreamLineRecordReader extends LineRecordReader {
-  
-  private String splitName;
-  private Reporter reporter;
-  private FileSplit split;  
-  private int numRec = 0;
-  private int nextStatusRec = 1;
-  private int statusMaxRecordChars;
-  protected static final Log LOG = LogFactory.getLog(StreamLineRecordReader.class);
-  // base class uses LongWritable as key, use this. 
-  private WritableComparable dummyKey = super.createKey(); 
-  private Text innerValue = (Text)super.createValue(); 
+public class StreamLineRecordReader extends KeyValueLineRecordReader {
 
-  public StreamLineRecordReader(FSDataInputStream in, FileSplit split, 
-      Reporter reporter,
-      JobConf job, FileSystem fs) throws IOException {
-    super(createStream(in, job), split.getStart(), 
-            (split.getStart() + split.getLength()));
-    this.split = split ; 
-    this.reporter = reporter ; 
-  }
-  
-  private static InputStream createStream(FSDataInputStream in, JobConf job) 
-    throws IOException{
-    InputStream finalStream = in ;
-    boolean gzipped = StreamInputFormat.isGzippedInput(job);
-    if ( gzipped ) {
-      GzipCodec codec = new GzipCodec();
-      codec.setConf(job);
-      finalStream = codec.createInputStream(in);
-    } 
-    return finalStream; 
-  }
-  
-  public WritableComparable createKey() {
-    return new Text();
-  }  
-  
-  public Writable createValue() {
-    return new Text();
-  }
-
-  public synchronized boolean next(Writable key, Writable value) throws IOException {
-    if (!(key instanceof Text)) {
-      throw new IllegalArgumentException("Key should be of type Text but: "
-          + key.getClass().getName());
-    }
-    if (!(value instanceof Text)) {
-      throw new IllegalArgumentException("Value should be of type Text but: "
-          + value.getClass().getName());
-    }
-
-    Text tKey = (Text) key;
-    Text tValue = (Text) value;
-    byte[] line = null ;
-    int lineLen = -1;
-    if( super.next(dummyKey, innerValue) ){
-      line = innerValue.getBytes();
-      lineLen = innerValue.getLength();
-    }else{
-      return false;
-    }
-    if (line == null) return false;
-    int tab = UTF8ByteArrayUtils.findTab(line, 0, lineLen);
-    if (tab == -1) {
-      tKey.set(line, 0, lineLen);
-      tValue.set("");
-    } else {
-      UTF8ByteArrayUtils.splitKeyVal(line, 0, lineLen, tKey, tValue, tab);
-    }
-    numRecStats(line, 0, lineLen);
-    return true;
-  }
-  
-  private void numRecStats(byte[] record, int start, int len) throws IOException {
-    numRec++;
-    if (numRec == nextStatusRec) {
-      String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars), "UTF-8");
-      nextStatusRec += 100;//*= 10;
-      String status = getStatus(recordStr);
-      LOG.info(status);
-      reporter.setStatus(status);
-    }
-  }
-
-  private String getStatus(CharSequence record) {
-    long pos = -1;
-    try {
-      pos = getPos();
-    } catch (IOException io) {
-    }
-    String recStr;
-    if (record.length() > statusMaxRecordChars) {
-      recStr = record.subSequence(0, statusMaxRecordChars) + "...";
-    } else {
-      recStr = record.toString();
-    }
-    String unqualSplit = split.getFile().getName() + ":" + split.getStart() + "+"
-        + split.getLength();
-    String status = "HSTR " + StreamUtil.HOST + " " + numRec + ". pos=" + pos + " " + unqualSplit
-        + " Processing record=" + recStr;
-    status += " " + splitName;
-    return status;
+  public StreamLineRecordReader(Configuration job, FileSplit split)
+      throws IOException {
+    super(job, split);
   }
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Mon Apr 16 12:35:15 2007
@@ -18,56 +18,11 @@
 
 package org.apache.hadoop.streaming;
 
-import java.io.IOException;
+import org.apache.hadoop.mapred.TextOutputFormat;
 
-import org.apache.hadoop.mapred.*;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hadoop.util.Progressable;
-
-/** Similar to org.apache.hadoop.mapred.TextOutputFormat, 
- * but delimits key and value with a TAB.
- * @author Michel Tourn
+/** Same as org.apache.hadoop.mapred.TextOutputFormat, 
+ * @deprecated
  */
-public class StreamOutputFormat implements OutputFormat {
-
-  public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progr) throws IOException {
-
-    Path file = new Path(job.getOutputPath(), name);
-
-    final FSDataOutputStream out = fs.create(file);
-
-    return new RecordWriter() {
-
-      public synchronized void write(WritableComparable key, Writable value) throws IOException {
-        out.write(key.toString().getBytes("UTF-8"));
-        out.writeByte('\t');
-        out.write(value.toString().getBytes("UTF-8"));
-        out.writeByte('\n');
-      }
-
-      public synchronized void close(Reporter reporter) throws IOException {
-        out.close();
-      }
-    };
-  }
-
-  /** Check whether the output specification for a job is appropriate.  Called
-   * when a job is submitted.  Typically checks that it does not already exist,
-   * throwing an exception when it already exists, so that output is not
-   * overwritten.
-   *
-   * @param job the job whose output will be written
-   * @throws IOException when output should not be attempted
-   */
-  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
-    // allow existing data (for app-level restartability)
-  }
+public class StreamOutputFormat extends TextOutputFormat {
 
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Mon Apr 16 12:35:15 2007
@@ -20,80 +20,19 @@
 
 import java.io.*;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
 
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class StreamSequenceRecordReader extends StreamBaseRecordReader {
-
-  public StreamSequenceRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
-      JobConf job, FileSystem fs) throws IOException {
-    super(in, split, reporter, job, fs);
-    numFailed_ = 0;
-    seekNextRecordBoundary();
-    // super.in_ ignored, using rin_ instead
-  }
-
-  public synchronized boolean next(Writable key, Writable value) throws IOException {
-    boolean success;
-    do {
-      if (!more_) return false;
-      success = false;
-      try {
-        long pos = rin_.getPosition();
-        boolean eof = rin_.next(key, value);
-        if (pos >= end_ && rin_.syncSeen()) {
-          more_ = false;
-        } else {
-          more_ = eof;
-        }
-        success = true;
-      } catch (IOException io) {
-        numFailed_++;
-        if (numFailed_ < 100 || numFailed_ % 100 == 0) {
-          err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" + numFailed_ + "/"
-              + numRec_);
-        }
-        io.printStackTrace(err_);
-        success = false;
-      }
-    } while (!success);
-
-    numRecStats(new byte[0], 0, 0);
-    return more_;
+/**
+ * same as org.apache.hadoop.mapred.SequenceFileRecordReader
+ * 
+ * @deprecated
+ */
+public class StreamSequenceRecordReader extends SequenceFileRecordReader {
+
+  public StreamSequenceRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+    super(conf, split);
   }
-
-  public void seekNextRecordBoundary() throws IOException {
-    rin_ = new SequenceFile.Reader(fs_, split_.getPath(), job_);
-    end_ = split_.getStart() + split_.getLength();
-
-    if (split_.getStart() > rin_.getPosition()) rin_.sync(split_.getStart()); // sync to start
-
-    more_ = rin_.getPosition() < end_;
-
-    reporter_.setStatus(split_.toString());
-
-    //return new SequenceFileRecordReader(job_, split_);
-  }
-
-  public WritableComparable createKey() {
-    return (WritableComparable) ReflectionUtils.newInstance(rin_.getKeyClass(), null);
-  }
-
-  public Writable createValue() {
-    return (Writable) ReflectionUtils.newInstance(rin_.getValueClass(), null);
-  }
-
-  boolean more_;
-  SequenceFile.Reader rin_;
-  int numFailed_;
-  PrintStream err_ = System.err;
-
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java Mon Apr 16 12:35:15 2007
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.streaming;
 
+import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.zip.GZIPOutputStream;
@@ -29,6 +30,7 @@
 {
 
   public TestGzipInput() throws IOException {
+    INPUT_FILE = new File("input.txt.gz");
   }
   
   protected void createInput() throws IOException
@@ -38,6 +40,7 @@
     out.write(input.getBytes("UTF-8"));
     out.close();
   }
+
 
   protected String[] genArgs() {
     return new String[] {

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Apr 16 12:35:15 2007
@@ -77,6 +77,11 @@
   public void testCommandLine()
   {
     try {
+      try {
+         OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+
       createInput();
       boolean mayExit = false;
 
@@ -93,8 +98,10 @@
     } catch(Exception e) {
       failTrace(e);
     } finally {
+      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
-      OUTPUT_DIR.delete();
+      outFileCRC.delete();
+      OUTPUT_DIR.getAbsoluteFile().delete();
     }
   }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Mon Apr 16 12:35:15 2007
@@ -79,7 +79,6 @@
         };
 
         fileSys.delete(new Path(OUTPUT_DIR));
-        fileSys.mkdirs(new Path(OUTPUT_DIR));
         
         DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
         file.writeBytes(mapString);

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Mon Apr 16 12:35:15 2007
@@ -41,13 +41,11 @@
     // test that some JobConf properties are exposed as expected     
     // Note the dots translated to underscore: 
     // property names have been escaped in PipeMapRed.safeEnvVarName()
-    expect("mapred_input_format_class", "org.apache.hadoop.streaming.StreamInputFormat");
+    expect("mapred_input_format_class", "org.apache.hadoop.mapred.KeyValueTextInputFormat");
     expect("mapred_job_tracker", "local");
-    expect("mapred_input_key_class", "org.apache.hadoop.io.Text");
-    expect("mapred_input_value_class", "org.apache.hadoop.io.Text");
     //expect("mapred_local_dir", "build/test/mapred/local");
     expectDefined("mapred_local_dir");
-    expect("mapred_output_format_class", "org.apache.hadoop.streaming.StreamOutputFormat");
+    expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
     expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
     expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This class treats a line in the input as a key/value pair separated by a 
+ * separator character. The separator can be specified in config file 
+ * under the attribute name key.value.separator.in.input.line. The default
+ * separator is the tab character ('\t').
+ *
+ */
+public class KeyValueLineRecordReader extends LineRecordReader {
+
+  private byte separator = (byte) '\t';
+
+  private WritableComparable dummyKey = super.createKey();
+
+  private Text innerValue = (Text) super.createValue();
+
+  public Class getKeyClass() { return Text.class; }
+  
+  public Text createKey() {
+    return new Text();
+  }
+
+  public KeyValueLineRecordReader(Configuration job, FileSplit split)
+      throws IOException {
+    super(job, split);
+    String sepStr = job.get("key.value.separator.in.input.line", "\t");
+    this.separator = (byte) sepStr.charAt(0);
+  }
+
+  public static int findSeparator(byte[] utf, int start, int length, byte sep) {
+    for (int i = start; i < (start + length); i++) {
+      if (utf[i] == sep) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  /** Read key/value pair in a line. */
+  public synchronized boolean next(Writable key, Writable value)
+      throws IOException {
+    Text tKey = (Text) key;
+    Text tValue = (Text) value;
+    byte[] line = null;
+    int lineLen = -1;
+    if (super.next(dummyKey, innerValue)) {
+      line = innerValue.getBytes();
+      lineLen = innerValue.getLength();
+    } else {
+      return false;
+    }
+    if (line == null)
+      return false;
+    int pos = findSeparator(line, 0, lineLen, this.separator);
+    if (pos == -1) {
+      tKey.set(line, 0, lineLen);
+      tValue.set("");
+    } else {
+      int keyLen = pos;
+      byte[] keyBytes = new byte[keyLen];
+      System.arraycopy(line, 0, keyBytes, 0, keyLen);
+      int valLen = lineLen - keyLen - 1;
+      byte[] valBytes = new byte[valLen];
+      System.arraycopy(line, pos + 1, valBytes, 0, valLen);
+      tKey.set(keyBytes);
+      tValue.set(valBytes);
+    }
+    return true;
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * An {@link InputFormat} for plain text files. Files are broken into lines.
+ * Either linefeed or carriage-return are used to signal end of line. Each line
+ * is divided into key and value parts by a separator byte. If no such a byte
+ * exists, the key will be the entire line and value will be empty.
+ */
+public class KeyValueTextInputFormat extends TextInputFormat {
+
+  public RecordReader getRecordReader(InputSplit genericSplit, JobConf job,
+      Reporter reporter) throws IOException {
+    reporter.setStatus(genericSplit.toString());
+    return new KeyValueLineRecordReader(job, (FileSplit) genericSplit);
+  }
+
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This class is similar to SequenceFileInputFormat, except it generates SequenceFileAsTextRecordReader 
+ * which converts the input keys and values to their String forms by calling toString() method. 
+ *
+ */
+public class SequenceFileAsTextInputFormat extends SequenceFileInputFormat {
+
+  public SequenceFileAsTextInputFormat() {
+    super();
+  }
+
+  public RecordReader getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+
+    reporter.setStatus(split.toString());
+
+    return new SequenceFileAsTextRecordReader(job, (FileSplit) split);
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This class converts the input keys and values to their String forms by calling toString()
+ * method. This class to SequenceFileAsTextInputFormat class is as LineRecordReader
+ * class to TextInputFormat class.
+ *
+ */
+public class SequenceFileAsTextRecordReader extends SequenceFileRecordReader {
+
+  private Writable innerKey = super.createKey();
+  private Writable innerValue = super.createValue();
+
+  public SequenceFileAsTextRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+    super(conf, split);
+  }
+
+  public WritableComparable createKey() {
+    return new Text();
+  }
+  
+  public Writable createValue() {
+    return new Text();
+  }
+
+  /** Read key/value pair in a line. */
+  public synchronized boolean next(Writable key, Writable value)
+      throws IOException {
+    Text tKey = (Text) key;
+    Text tValue = (Text) value;
+    if (!super.next(innerKey, innerValue)) {
+      return false;
+    }
+    tKey.set(innerKey.toString());
+    tValue.set(innerValue.toString());
+    return true;
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Mon Apr 16 12:35:15 2007
@@ -32,7 +32,7 @@
   private long start;
   private long end;
   private boolean more = true;
-  private Configuration conf;
+  protected Configuration conf;
 
   public SequenceFileRecordReader(Configuration conf, FileSplit split)
     throws IOException {

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class TestKeyValueTextInputFormat extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestKeyValueTextInputFormat.class.getName());
+
+  private static int MAX_LENGTH = 10000;
+  
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null; 
+  static {
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestKeyValueTextInputFormat");
+  
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf();
+    Path file = new Path(workDir, "test.txt");
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    
+    int seed = new Random().nextInt();
+    LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+
+    localFs.delete(workDir);
+    job.setInputPath(workDir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+      LOG.debug("creating; entries = " + length);
+
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i*2));
+          writer.write("\t");
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      TextInputFormat format = new KeyValueTextInputFormat();
+      format.configure(job);
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH/20)+1;
+        LOG.debug("splitting: requesting = " + numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
+        LOG.debug("splitting: got =        " + splits.length);
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          LOG.debug("split["+j+"]= " + splits[j]);
+          RecordReader reader =
+            format.getRecordReader(splits[j], job, reporter);
+          Class readerClass = reader.getClass();
+          assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass);        
+
+          Writable key = reader.createKey();
+          Class keyClass = key.getClass();
+          Writable value = reader.createValue();
+          Class valueClass = value.getClass();
+          assertEquals("Key class is Text.", Text.class, keyClass);
+          assertEquals("Value class is Text.", Text.class, valueClass);
+          try {
+            int count = 0;
+            while (reader.next(key, value)) {
+              int v = Integer.parseInt(value.toString());
+              LOG.debug("read " + v);
+              if (bits.get(v)) {
+                LOG.warn("conflict with " + v + 
+                         " in split " + j +
+                         " at position "+reader.getPos());
+              }
+              assertFalse("Key in multiple partitions.", bits.get(v));
+              bits.set(v);
+              count++;
+            }
+            LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
+  private InputStream makeStream(String str) throws IOException {
+    Text text = new Text(str);
+    return new ByteArrayInputStream(text.getBytes(), 0, text.getLength());
+  }
+  
+  public void testUTF8() throws Exception {
+    InputStream in = makeStream("abcd\u20acbdcd\u20ac");
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    LineRecordReader.readLine(in, out);
+    Text line = new Text();
+    line.set(out.toByteArray());
+    assertEquals("readLine changed utf8 characters", 
+                 "abcd\u20acbdcd\u20ac", line.toString());
+    in = makeStream("abc\u200axyz");
+    out.reset();
+    LineRecordReader.readLine(in, out);
+    line.set(out.toByteArray());
+    assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+  }
+
+  public void testNewLines() throws Exception {
+    InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    LineRecordReader.readLine(in, out);
+    assertEquals("line1 length", 1, out.size());
+    out.reset();
+    LineRecordReader.readLine(in, out);
+    assertEquals("line2 length", 2, out.size());
+    out.reset();
+    LineRecordReader.readLine(in, out);
+    assertEquals("line3 length", 0, out.size());
+    out.reset();
+    LineRecordReader.readLine(in, out);
+    assertEquals("line4 length", 3, out.size());
+    out.reset();
+    LineRecordReader.readLine(in, out);
+    assertEquals("line5 length", 4, out.size());
+    out.reset();
+    LineRecordReader.readLine(in, out);
+    assertEquals("line5 length", 5, out.size());
+    assertEquals("end of file", 0, LineRecordReader.readLine(in, out));
+  }
+  
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+  
+  private static final Reporter voidReporter = Reporter.NULL;
+  
+  private static List<Text> readSplit(InputFormat format, 
+                                      InputSplit split, 
+                                      JobConf job) throws IOException {
+    List<Text> result = new ArrayList<Text>();
+    RecordReader reader = format.getRecordReader(split, job,
+                                                 voidReporter);
+    Text key = (Text) reader.createKey();
+    Text value = (Text) reader.createValue();
+    while (reader.next(key, value)) {
+      result.add(value);
+      value = (Text) reader.createValue();
+    }
+    return result;
+  }
+  
+  /**
+   * Test using the gzip codec for reading
+   */
+  public static void testGzip() throws IOException {
+    JobConf job = new JobConf();
+    CompressionCodec gzip = new GzipCodec();
+    ReflectionUtils.setConf(gzip, job);
+    localFs.delete(workDir);
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+              "line-1\tthe quick\nline-2\tbrown\nline-3\tfox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+              "line-1\tthis is a test\nline-1\tof gzip\n");
+    job.setInputPath(workDir);
+    KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+    format.configure(job);
+    InputSplit[] splits = format.getSplits(job, 100);
+    assertEquals("compressed splits == 2", 2, splits.length);
+    FileSplit tmp = (FileSplit) splits[0];
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
+      splits[0] = splits[1];
+      splits[1] = tmp;
+    }
+    List<Text> results = readSplit(format, splits[0], job);
+    assertEquals("splits[0] length", 6, results.size());
+    assertEquals("splits[0][5]", " dog", results.get(5).toString());
+    results = readSplit(format, splits[1], job);
+    assertEquals("splits[1] length", 2, results.size());
+    assertEquals("splits[1][0]", "this is a test", 
+                 results.get(0).toString());    
+    assertEquals("splits[1][1]", "of gzip", 
+                 results.get(1).toString());    
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new TestKeyValueTextInputFormat().testFormat();
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+public class TestSequenceFileAsTextInputFormat extends TestCase {
+  private static final Log LOG = InputFormatBase.LOG;
+
+  private static int MAX_LENGTH = 10000;
+  private static Configuration conf = new Configuration();
+
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf(conf);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path file = new Path(dir, "test.seq");
+    
+    Reporter reporter = Reporter.NULL;
+    
+    int seed = new Random().nextInt();
+    //LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+
+    fs.delete(dir);
+
+    job.setInputPath(dir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+      //LOG.info("creating; entries = " + length);
+
+      // create a file with length entries
+      SequenceFile.Writer writer =
+        SequenceFile.createWriter(fs, conf, file,
+                                  IntWritable.class, LongWritable.class);
+      try {
+        for (int i = 0; i < length; i++) {
+          IntWritable key = new IntWritable(i);
+          LongWritable value = new LongWritable(10 * i);
+          writer.append(key, value);
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      InputFormat format = new SequenceFileAsTextInputFormat();
+      
+      for (int i = 0; i < 3; i++) {
+        int numSplits =
+          random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
+        //LOG.info("splitting: requesting = " + numSplits);
+        InputSplit[] splits = format.getSplits(job, numSplits);
+        //LOG.info("splitting: got =        " + splits.length);
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          RecordReader reader =
+            format.getRecordReader(splits[j], job, reporter);
+          Class readerClass = reader.getClass();
+          assertEquals("reader class is SequenceFileAsTextRecordReader.", SequenceFileAsTextRecordReader.class, readerClass);        
+          Text value = (Text)reader.createValue();
+          Text key = (Text)reader.createKey();
+          try {
+            int count = 0;
+            while (reader.next(key, value)) {
+              // if (bits.get(key.get())) {
+              // LOG.info("splits["+j+"]="+splits[j]+" : " + key.get());
+              // LOG.info("@"+reader.getPos());
+              // }
+              int keyInt = Integer.parseInt(key.toString());
+              assertFalse("Key in multiple partitions.", bits.get(keyInt));
+              bits.set(keyInt);
+              count++;
+            }
+            //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestSequenceFileAsTextInputFormat().testFormat();
+  }
+}