You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/11 10:58:06 UTC

svn commit: r420768 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/

Author: cutting
Date: Tue Jul 11 01:58:05 2006
New Revision: 420768

URL: http://svn.apache.org/viewvc?rev=420768&view=rev
Log:
HADOOP-355.  Updates to streaming contrib module. Contributed by Michel.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 11 01:58:05 2006
@@ -14,6 +14,10 @@
     not yet complete, i.e., that are queued or running.
     (Mahadev Konar via cutting)
 
+ 4. HADOOP-355.  Updates to the streaming contrib module, including
+    API fixes, making reduce optional, and adding an input type for
+    StreamSequenceRecordReader.  (Michel Tourn via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Jul 11 01:58:05 2006
@@ -35,11 +35,16 @@
 import org.apache.hadoop.mapred.OutputCollector;
 
 import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 
 /** Shared functionality for PipeMapper, PipeReducer.
  *  @author Michel Tourn
@@ -56,7 +61,13 @@
   */
   abstract String getKeyColPropName();
   
-
+  /** Write output as side-effect files rather than as map outputs. 
+      This is useful to do "Map" tasks rather than "MapReduce" tasks. */
+  boolean getUseSideEffect() 
+  {
+    return false;
+  }
+  
   /**
    * @returns how many TABS before the end of the key part 
    * usually: 1 or "ALL"
@@ -154,7 +165,10 @@
       String argv = getPipeCommand(job);
       keyCols_ = getKeyColsFromPipeCommand(argv);
       
-      doPipe_ = (argv != null);
+      job_ = job;      
+      
+      // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
+      doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
       if(!doPipe_) return;
 
       setStreamJobDetails(job);
@@ -169,29 +183,46 @@
         new MustangFile(prog).setExecutable(true, true);
       }
       
+      
+      if(job_.getInputValueClass().equals(BytesWritable.class)) {
+        // TODO expose as separate config: 
+        // job or semistandard inputformat property
+        optUseKey_ = false;
+      }
+      
+      optSideEffect_ = getUseSideEffect();
+      
+      if(optSideEffect_) {
+        String fileName = job_.get("mapred.task.id");
+        sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
+        FileSystem fs = FileSystem.get(job_);
+        sideEffectOut_ = fs.create(sideEffectPath_);
+      }
+      
       // argvSplit[0]: 
       // An absolute path should be a preexisting valid path on all TaskTrackers
 	  // A  relative path should match in the unjarred Job data
       // In this case, force an absolute path to make sure exec finds it.
       argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
-      log_.println("PipeMapRed exec " + Arrays.asList(argvSplit));
-            
+      logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
+      logprintln("sideEffectPath_=" + sideEffectPath_);      
       
       Environment childEnv = (Environment)StreamUtil.env().clone();
-      addEnvironment(childEnv, job.get("stream.addenvironment"));
+      addEnvironment(childEnv, job_.get("stream.addenvironment"));
       sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
       
       /* // This way required jdk1.5
       ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
       Map<String, String> env = processBuilder.environment();
-      addEnvironment(env, job.get("stream.addenvironment"));
+      addEnvironment(env, job_.get("stream.addenvironment"));
       sim = processBuilder.start();
       */
       
       clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
-      clientIn_  = new BufferedReader(new InputStreamReader(sim.getInputStream()));
-      clientErr_ = new DataInputStream(sim.getErrorStream());
+      clientIn_  = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+      clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
       doneLock_  = new Object();
+      startTime_ = System.currentTimeMillis();
       
     } catch(Exception e) {
         e.printStackTrace();
@@ -205,7 +236,7 @@
     String s = job.get("stream.minRecWrittenToEnableSkip_");
     if(s != null) {
       minRecWrittenToEnableSkip_ = Long.parseLong(s);
-      log_.println("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+      logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
     }
   }
   
@@ -222,6 +253,22 @@
     
   }
     
+  void logprintln(String s)
+  {
+    if(log_ != null) {
+      log_.println(s);
+    } else {
+      System.err.println(s); // or LOG.info()
+    }
+  }
+  
+  void logflush()
+  {
+    if(log_ != null) {
+      log_.flush();
+    }
+  }
+  
   void addEnvironment(Properties env, String nameVals)
   {
     // encoding "a=b c=d" from StreamJob
@@ -230,9 +277,9 @@
     for(int i=0; i<nv.length; i++) {
       String[] pair = nv[i].split("=", 2);
       if(pair.length != 2) {
-        log_.println("Skip ev entry:" + nv[i]);
+        logprintln("Skip ev entry:" + nv[i]);
       } else {
-        log_.println("Add  ev entry:" + nv[i]);
+        logprintln("Add  ev entry:" + nv[i]);
         env.put(pair[0], pair[1]);
       }
     }
@@ -293,18 +340,23 @@
               // 3/4 Tool to Hadoop
               while((answer = clientIn_.readLine()) != null) {
                 // 4/4 Hadoop out 
-                splitKeyVal(answer, key, val);
-                output.collect(key, val);
-                numRecWritten_++;
-                if(numRecWritten_ % 100 == 0) {
-                  log_.println(numRecRead_+"/"+numRecWritten_);
-                  log_.flush();
+                if(optSideEffect_) {
+                  sideEffectOut_.write(answer.getBytes());
+                  sideEffectOut_.write('\n');
+                } else {
+                  splitKeyVal(answer, key, val);
+                  output.collect(key, val);
+                  numRecWritten_++;
+                  if(numRecWritten_ % 100 == 0) {
+                    logprintln(numRecRead_+"/"+numRecWritten_);
+                    logflush();
+                  }
                 }
               }
             } catch(IOException io) {
               io.printStackTrace(log_);
             }
-            log_.println("MROutputThread done");
+            logprintln("MROutputThread done");
       } finally {
           outputDone_ = true;
           synchronized(doneLock_) {
@@ -332,7 +384,7 @@
         int bucket = 100;
         while((line=clientErr_.readLine()) != null) {
           num++;
-          log_.println(line);
+          logprintln(line);
           if(num < 10) {
             String hline = "MRErr: " + line;
             System.err.println(hline);
@@ -353,10 +405,19 @@
 
   public void mapRedFinished()
   {
-    log_.println("mapRedFinished");
+    logprintln("mapRedFinished");
     try {
     if(!doPipe_) return;
     try {
+      if(optSideEffect_) {
+        logprintln("closing " + sideEffectPath_);
+        sideEffectOut_.close();
+        logprintln("closed  " + sideEffectPath_);
+      }
+    } catch(IOException io) {
+      io.printStackTrace();
+    }
+    try {
       if(clientOut_ != null) {
       	clientOut_.close();
       }
@@ -385,10 +446,12 @@
   void maybeLogRecord()
   {
     if(numRecRead_ >= nextRecReadLog_) {
-      log_.println(numRecInfo());
-      log_.flush();      
-      nextRecReadLog_ *= 10;
-      //nextRecReadLog_ += 1000;
+      String info = numRecInfo();
+      logprintln(info);
+      logflush();      
+      System.err.println(info);
+      //nextRecReadLog_ *= 10;
+      nextRecReadLog_ += 100;
     }    
   }
   
@@ -417,7 +480,15 @@
   
   String numRecInfo()
   {
-    return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_;
+    long elapsed = (System.currentTimeMillis() - startTime_)/1000;
+    long total = numRecRead_+numRecWritten_+numRecSkipped_;
+    return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
+     + " in:"  + safeDiv(numRecRead_, elapsed) + " [rec/s]"
+     + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
+  }
+  String safeDiv(long n, long d)
+  {
+    return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
   }
   String logFailure(Exception e)
   {
@@ -425,15 +496,15 @@
       PrintWriter pw = new PrintWriter(sw);
       e.printStackTrace(pw);    
       String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
-      log_.println(msg);
+      logprintln(msg);
       return msg;  
   }
     
 
+  long startTime_;
   long numRecRead_ = 0;
   long numRecWritten_ = 0;
   long numRecSkipped_ = 0;
-  
   long nextRecReadLog_ = 1;
   
   long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
@@ -441,6 +512,8 @@
   int keyCols_;
   final static int ALL_COLS = Integer.MAX_VALUE;
   
+  JobConf job_;
+
   // generic MapRed parameters passed on by hadoopStreaming
   String taskid_;
   int reportPortPlusOne_;
@@ -455,24 +528,31 @@
   boolean errorDone_;
   DataOutputStream clientOut_;
   DataInputStream  clientErr_;
-  BufferedReader   clientIn_;
+  DataInputStream   clientIn_;
 
   String jobLog_;
   // set in PipeMapper/PipeReducer subclasses
   String mapredKey_;
   int numExceptions_;
   
+  boolean optUseKey_ = true;
+
+  boolean optSideEffect_;
+  Path sideEffectPath_;
+  FSDataOutputStream sideEffectOut_;
+
   String LOGNAME;
   PrintStream log_;
   
+  /* curr. going to stderr so that it is preserved
   { // instance initializer
     try {
       int id = (int)((System.currentTimeMillis()/2000) % 10);
       String sid = id+ "." + StreamUtil.env().get("USER");
       LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
       log_ = new PrintStream(new FileOutputStream(LOGNAME));
-      log_.println(new java.util.Date());
-      log_.flush();
+      logprintln(new java.util.Date());
+      logflush();
     } catch(IOException io) {
       System.err.println("LOGNAME=" + LOGNAME);
       io.printStackTrace();
@@ -482,5 +562,5 @@
       }
     }    
   }
-  
+  */
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Tue Jul 11 01:58:05 2006
@@ -25,6 +25,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableComparable;
@@ -47,6 +48,15 @@
     return "mapKeyCols";
   }  
 
+  boolean getUseSideEffect()
+  {
+    String reduce = job_.get("stream.reduce.streamprocessor");
+    if(StreamJob.REDUCE_NONE.equals(reduce)) {
+      return true;  
+    }
+    return false;
+  }
+  
 
   // Do NOT declare default constructor
   // (MapRed creates it reflectively)
@@ -61,16 +71,28 @@
     }
     try {
       // 1/4 Hadoop in
-      mapredKey_ = key.toString();
+      if(key instanceof BytesWritable) {
+        mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
+      } else {
+        mapredKey_ = key.toString();        
+      }
       numRecRead_++;
 
       maybeLogRecord();
 
       // 2/4 Hadoop to Tool
       if(numExceptions_==0) {
-        clientOut_.writeBytes(mapredKey_);
-        clientOut_.writeBytes("\t");
-        clientOut_.writeBytes(value.toString());
+        String sval;
+        if(value instanceof BytesWritable) {
+          sval = new String(((BytesWritable)value).get(), "UTF-8");
+        } else {
+          sval = value.toString();
+        }
+        if(optUseKey_) {
+          clientOut_.writeBytes(mapredKey_);
+          clientOut_.writeBytes("\t");
+        }
+        clientOut_.writeBytes(sval);
         clientOut_.writeBytes("\n");
         clientOut_.flush();
       } else {
@@ -90,11 +112,10 @@
     }
   }
   
-  
   public void close()
   {
     appendLogToJobLog("success");
     mapRedFinished();
   }
-
+  
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Tue Jul 11 01:58:05 2006
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.mapred.Reporter;
@@ -46,17 +47,18 @@
   final String CONF_NS = "stream.recordreader.";
 
   public StreamBaseRecordReader(
-    FSDataInputStream in, long start, long end, 
-    String splitName, Reporter reporter, JobConf job)
+    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
     throws IOException
   {
     in_ = in;
-    start_ = start;
-    end_ = end;
-    length_ = end_ - start_;
-    splitName_ = splitName;
+    split_ = split;
+    start_ = split_.getStart();
+    length_ = split_.getLength();
+    end_ = start_ + length_;
+    splitName_ = split_.getFile().getName();
     reporter_ = reporter;
     job_ = job;
+    fs_ = fs;
     
     statusMaxRecordChars_ = job_.getInt(CONF_NS + "statuschars", 200);
   }
@@ -67,6 +69,18 @@
    */  
   public abstract boolean next(Writable key, Writable value) throws IOException;
 
+  /** This implementation always returns true. */
+  public boolean[] areValidInputDirectories(FileSystem fileSys,
+                                     Path[] inputDirs) throws IOException
+  {
+    int n = inputDirs.length;
+    boolean[] b = new boolean[n];
+    for(int i=0; i<n; i++) {
+      b[i] = true;
+    }
+    return b;
+  }
+
   /** Returns the current position in the input. */
   public synchronized long getPos() throws IOException 
   { 
@@ -125,18 +139,22 @@
     } else {
     	recStr = record.toString();
     }
-    String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " Processing record=" + recStr;
+    String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+" + split_.getLength();
+    String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos
+     + " " + unqualSplit + " Processing record=" + recStr;
     status += " " + splitName_;
     return status;
   }
 
   FSDataInputStream in_;
+  FileSplit split_;
   long start_;
   long end_;
   long length_;
   String splitName_;
   Reporter reporter_;
   JobConf job_;
+  FileSystem fs_;
   int numRec_ = 0;
   int nextStatusRec_ = 1;
   int statusMaxRecordChars_;

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Tue Jul 11 01:58:05 2006
@@ -51,6 +51,18 @@
     //LOG.setLevel(Level.FINE);
   }
   
+  /** This implementation always returns true. */
+  public boolean[] areValidInputDirectories(FileSystem fileSys,
+                                            Path[] inputDirs
+                                            ) throws IOException {
+    boolean[] b = new boolean[inputDirs.length];
+    for(int i=0; i < inputDirs.length; ++i) {
+      b[i] = true;
+    }
+    return b;
+  }
+
+  
   protected Path[] listPaths(FileSystem fs, JobConf job)
     throws IOException
   {
@@ -129,9 +141,8 @@
     
     Constructor ctor;
     try {
-      // reader = new StreamLineRecordReader(in, start, end, splitName, reporter, job);
       ctor = readerClass.getConstructor(new Class[]{
-        FSDataInputStream.class, long.class, long.class, String.class, Reporter.class, JobConf.class});
+        FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
     } catch(NoSuchMethodException nsm) {
       throw new RuntimeException(nsm);
     }
@@ -140,12 +151,21 @@
     StreamBaseRecordReader reader;
     try {
         reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
-            in, new Long(start), new Long(end), splitName, reporter, job});        
+            in, split, reporter, job, fs});        
     } catch(Exception nsm) {
       throw new RuntimeException(nsm);
     }
         
 	reader.init();
+
+
+    if(reader instanceof StreamSequenceRecordReader) {
+      // override k/v class types with types stored in SequenceFile
+      StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
+      job.setInputKeyClass(ss.rin_.getKeyClass());
+      job.setInputValueClass(ss.rin_.getValueClass());
+    }
+    
     
     return reader;
   }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Jul 11 01:58:05 2006
@@ -42,6 +42,8 @@
 {
   protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());    
   
+  final static String REDUCE_NONE = "NONE";
+  
   public StreamJob(String[] argv, boolean mayExit)
   {
     argv_ = argv;
@@ -248,7 +250,7 @@
     System.out.println("  -output   <path>     DFS output directory for the Reduce step");
     System.out.println("  -mapper   <cmd>      The streaming command to run");
     System.out.println("  -combiner <cmd>      Not implemented. But you can pipe the mapper output");
-    System.out.println("  -reducer  <cmd>      The streaming command to run");
+    System.out.println("  -reducer  <cmd>      The streaming command to run.");
     System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
     System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
     System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
@@ -278,6 +280,10 @@
     System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");
     System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
     System.out.println();
+    System.out.println("To skip the shuffle/sort/reduce step:" );
+    System.out.println("  Use -reducer " + REDUCE_NONE);
+    System.out.println("  This preserves the map input order and speeds up processing");
+    System.out.println();
     System.out.println("To set the number of reduce tasks (num. of output files):");
     System.out.println("  -jobconf mapred.reduce.tasks=10");
     System.out.println("To change the local temp directory:");
@@ -405,8 +411,10 @@
     }
     
     jobConf_.setInputFormat(StreamInputFormat.class);
+    // for SequenceFile, input classes may be overriden in getRecordReader 
     jobConf_.setInputKeyClass(UTF8.class);
     jobConf_.setInputValueClass(UTF8.class);
+    
     jobConf_.setOutputKeyClass(UTF8.class);
     jobConf_.setOutputValueClass(UTF8.class);
     //jobConf_.setCombinerClass();
@@ -465,10 +473,10 @@
     while(it.hasNext()) {
         String prop = (String)it.next();
         String[] nv = prop.split("=", 2);
-        msg("JobConf: set(" + nv[0] + ", " + nv[1]+")");
+        msg("xxxJobConf: set(" + nv[0] + ", " + nv[1]+")");
         jobConf_.set(nv[0], nv[1]);
     }   
-    
+    msg("submitting to jobconf: " + getJobTrackerHostPort());
   }
   
   protected String getJobTrackerHostPort()
@@ -532,7 +540,7 @@
       LOG.info("Job complete: " + jobId_);
       LOG.info("Output: " + output_);
       error = false;
-    } finally {    
+    } finally {
       if (error && (running_ != null)) {
         LOG.info("killJob...");
         running_.killJob();

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Tue Jul 11 01:58:05 2006
@@ -37,11 +37,10 @@
 {
 
   public StreamLineRecordReader(
-    FSDataInputStream in, long start, long end, 
-    String splitName, Reporter reporter, JobConf job)
+    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
     throws IOException
   {
-    super(in, start, end, splitName, reporter, job);
+    super(in, split, reporter, job, fs);
   }
 
   public void seekNextRecordBoundary() throws IOException
@@ -59,7 +58,7 @@
       }
     }
 
-    System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+    //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
   }
 
   public synchronized boolean next(Writable key, Writable value)

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Tue Jul 11 01:58:05 2006
@@ -27,6 +27,8 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
+import org.apache.hadoop.util.Progressable;
+
 /** Similar to org.apache.hadoop.mapred.TextOutputFormat, 
  * but delimits key and value with a TAB.
  * @author Michel Tourn
@@ -34,8 +36,8 @@
 public class StreamOutputFormat implements OutputFormat {
 
   public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
-                                      String name) throws IOException {
-
+                                      String name, Progressable progr) throws IOException {
+ 
     File file = new File(job.getOutputDir(), name);
 
     final FSDataOutputStream out = fs.create(file);

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?rev=420768&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Tue Jul 11 01:58:05 2006
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+
+public class StreamSequenceRecordReader extends StreamBaseRecordReader
+{
+
+  public StreamSequenceRecordReader (
+    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
+    throws IOException
+  {
+    super(in, split, reporter, job, fs);
+    numFailed_ = 0;
+    // super.in_ ignored, using rin_ instead
+  }
+  
+    
+  public synchronized boolean next(Writable key, Writable value)
+   throws IOException
+  {         
+    boolean success;
+    do {    
+      if (!more_) return false;
+      success = false;
+      try {
+        long pos = rin_.getPosition();
+        boolean eof = rin_.next(key, value);
+        if (pos >= end_ && rin_.syncSeen()) {
+          more_ = false;
+        } else {
+          more_ = eof;
+        }
+        success = true;
+      } catch(IOException io) {
+        numFailed_++;
+        if(numFailed_ < 100 || numFailed_ % 100 == 0) {
+          err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" 
+            + numFailed_+ "/" + numRec_);
+        }
+        io.printStackTrace(err_);
+        success = false;
+      }
+    } while(!success);
+    numRecStats("");
+    return more_;    
+  }
+  
+
+  public void seekNextRecordBoundary() throws IOException
+  {
+    rin_ = new SequenceFile.Reader(fs_, split_.getPath(), job_);
+    end_ = split_.getStart() + split_.getLength();
+
+    if (split_.getStart() > rin_.getPosition())
+      rin_.sync(split_.getStart());                  // sync to start
+
+    more_ = rin_.getPosition() < end_;
+
+    reporter_.setStatus(split_.toString());
+
+    //return new SequenceFileRecordReader(job_, split_);
+  }
+
+  boolean more_;
+  SequenceFile.Reader rin_;
+  int numFailed_;
+  PrintStream err_ = System.err;
+
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Tue Jul 11 01:58:05 2006
@@ -47,11 +47,10 @@
 public class StreamXmlRecordReader extends StreamBaseRecordReader 
 {
   public StreamXmlRecordReader(
-    FSDataInputStream in, long start, long end, 
-    String splitName, Reporter reporter, JobConf job)
+    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
     throws IOException
   {
-    super(in, start, end, splitName, reporter, job);
+    super(in, split, reporter, job, fs);
     
     beginMark_ = checkJobGet(CONF_NS + "begin");
     endMark_   = checkJobGet(CONF_NS + "end");