You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/03 21:33:28 UTC

svn commit: r534970 - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Author: cutting
Date: Thu May  3 12:33:27 2007
New Revision: 534970

URL: http://svn.apache.org/viewvc?view=rev&rev=534970
Log:
HADOOP-1315.  Clean up contrib/streaming, switching it to use more core classes and removing unused classes.  Contributed by Runping.

Removed:
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu May  3 12:33:27 2007
@@ -316,6 +316,9 @@
     that killed the heartbeat monitoring thread.
     (Dhruba Borthakur via cutting)
 
+94. HADOOP-1315.  Clean up contrib/streaming, switching it to use core
+    classes more and removing unused code.  (Runping Qi via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu May  3 12:33:27 2007
@@ -19,8 +19,6 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
-import java.net.Socket;
-import java.net.URI;
 import java.nio.charset.CharacterCodingException;
 import java.io.IOException;
 import java.util.Date;
@@ -29,14 +27,11 @@
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Properties;
-import java.util.regex.*;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.PhasedFileSystem;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
@@ -45,7 +40,6 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 
 /** Shared functionality for PipeMapper, PipeReducer.
@@ -60,60 +54,12 @@
    */
   abstract String getPipeCommand(JobConf job);
 
-  /*
-   */
-  abstract String getKeyColPropName();
-
   abstract char getFieldSeparator();
   
   abstract int getNumOfKeyFields();
-  
-  /** Write output as side-effect files rather than as map outputs.
-      This is useful to do "Map" tasks rather than "MapReduce" tasks. */
-  boolean getUseSideEffect() {
-    return false;
-  }
 
   abstract boolean getDoPipe();
 
-  /**
-   * @returns how many TABS before the end of the key part
-   * usually: 1 or "ALL"
-   * used for tool output of both Map and Reduce
-   * configured via tool's argv: splitKeyVal=ALL or 1..
-   * although it is interpreted here, not by tool
-   */
-  int getKeyColsFromPipeCommand(String cmd) {
-    String key = getKeyColPropName();
-    Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
-    Matcher match = kcPat.matcher(cmd);
-    String kc;
-    if (!match.matches()) {
-      kc = null;
-    } else {
-      kc = match.group(1);
-    }
-
-    int cols;
-    if (kc == null) {
-      // default value is 1 and the Stream applications could instead
-      // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
-      cols = 1;
-    } else if (kc.equals("ALL")) {
-      cols = ALL_COLS;
-    } else {
-      try {
-        cols = Integer.parseInt(kc);
-      } catch (NumberFormatException nf) {
-        cols = Integer.MAX_VALUE;
-      }
-    }
-
-    System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
-
-    return cols;
-  }
-
   final static int OUTSIDE = 1;
   final static int SINGLEQ = 2;
   final static int DOUBLEQ = 3;
@@ -164,54 +110,15 @@
     return (String[]) argList.toArray(new String[0]);
   }
 
-  OutputStream getURIOutputStream(URI uri, boolean allowSocket) throws IOException {
-    final String SOCKET = "socket";
-    if (uri.getScheme().equals(SOCKET)) {
-      if (!allowSocket) {
-        throw new IOException(SOCKET + " not allowed on outputstream " + uri);
-      }
-      final Socket sock = new Socket(uri.getHost(), uri.getPort());
-      OutputStream out = new FilterOutputStream(sock.getOutputStream()) {
-          public void close() throws IOException {
-            sock.close();
-            super.close();
-          }
-        };
-      return out;
-    } else {
-      // a FSDataOutputStreamm, localFS or HDFS.
-      // localFS file may be set up as a FIFO.
-      return sideFs_.create(new Path(uri.getSchemeSpecificPart()));
-    }
-  }
-
-  String getSideEffectFileName() {
-    FileSplit split = StreamUtil.getCurrentSplit(job_);
-    return new String(split.getPath().getName() + "-" + split.getStart() + 
-                      "-" + split.getLength());
-  }
-
   public void configure(JobConf job) {
     try {
       String argv = getPipeCommand(job);
 
-      keyCols_ = getKeyColsFromPipeCommand(argv);
-
-      debug_ = (job.get("stream.debug") != null);
-      if (debug_) {
-        System.out.println("PipeMapRed: stream.debug=true");
-      }
-
       joinDelay_ = job.getLong("stream.joindelay.milli", 0);
 
       job_ = job;
       fs_ = FileSystem.get(job_);
-      if (job_.getBoolean("stream.sideoutput.localfs", false)) {
-        //sideFs_ = new LocalFileSystem(job_);
-        sideFs_ = FileSystem.getLocal(job_);
-      } else {
-        sideFs_ = fs_;
-      }
+
       String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t");
       String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t");
       this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0);
@@ -219,22 +126,12 @@
       this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1);
       this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
       
-      if (debug_) {
-        System.out.println("kind   :" + this.getClass());
-        System.out.println("split  :" + StreamUtil.getCurrentSplit(job_));
-        System.out.println("fs     :" + fs_.toString());
-        System.out.println("sideFs :" + sideFs_.toString());
-      }
 
       doPipe_ = getDoPipe();
       if (!doPipe_) return;
 
       setStreamJobDetails(job);
-      setStreamProperties();
-
-      if (debugFailEarly_) {
-        throw new RuntimeException("debugFailEarly_");
-      }
+      
       String[] argvSplit = splitArgs(argv);
       String prog = argvSplit[0];
       File currentDir = new File(".").getAbsoluteFile();
@@ -245,39 +142,6 @@
         FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x");
       }
 
-      if (job_.getInputValueClass().equals(BytesWritable.class)) {
-        // TODO expose as separate config:
-        // job or semistandard inputformat property
-        optUseKey_ = false;
-      }
-
-      optSideEffect_ = getUseSideEffect();
-
-      if (optSideEffect_) {
-        // during work: use a completely unique filename to avoid HDFS namespace conflicts
-        // after work: rename to a filename that depends only on the workload (the FileSplit)
-        //   it's a friendly name and in case of reexecution it will clobber. 
-        // reexecution can be due to: other job, failed task and speculative task
-        // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: 
-        // client has renamed outputPath and saved the argv's original output path as:
-        if (useSingleSideOutputURI_) {
-          finalOutputURI = new URI(sideOutputURI_);
-          sideEffectPathFinal_ = null; // in-place, no renaming to final
-        } else {
-          sideFs_ = new PhasedFileSystem(sideFs_, job);
-          String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath() 
-          String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale
-          sideEffectPathFinal_ = new Path(sideOutputPath, fileName);
-          finalOutputURI = new URI(sideEffectPathFinal_.toString()); // implicit dfs: 
-        }
-        // apply default scheme
-        if (finalOutputURI.getScheme() == null) {
-          finalOutputURI = new URI("file", finalOutputURI.getSchemeSpecificPart(), null);
-        }
-        boolean allowSocket = useSingleSideOutputURI_;
-        sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket);
-      }
-
       // 
       // argvSplit[0]:
       // An absolute path should be a preexisting valid path on all TaskTrackers
@@ -295,8 +159,6 @@
         f = null;
       }
       logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
-      logprintln("sideEffectURI_=" + finalOutputURI);
-
       Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
@@ -327,34 +189,6 @@
       logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
     }
     taskId_ = StreamUtil.getTaskInfo(job_);
-    debugFailEarly_ = isDebugFail("early");
-    debugFailDuring_ = isDebugFail("during");
-    debugFailLate_ = isDebugFail("late");
-
-    sideOutputURI_ = job_.get("stream.sideoutput.uri");
-    useSingleSideOutputURI_ = (sideOutputURI_ != null);
-  }
-
-  boolean isDebugFail(String kind) {
-    String execidlist = job_.get("stream.debugfail.reexec." + kind);
-    if (execidlist == null) {
-      return false;
-    }
-    String[] e = execidlist.split(",");
-    for (int i = 0; i < e.length; i++) {
-      int ei = Integer.parseInt(e[i]);
-      if (taskId_.execid == ei) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  void setStreamProperties() {
-    String s = System.getProperty("stream.port");
-    if (s != null) {
-      reportPortPlusOne_ = Integer.parseInt(s);
-    }
   }
 
   void logStackTrace(Exception e) {
@@ -442,7 +276,6 @@
     if (log_ != null) {
       StreamUtil.exec("/bin/rm " + LOGNAME, log_);
     }
-    // TODO socket-based aggregator (in JobTrackerInfoServer)
   }
 
   void startOutputThreads(OutputCollector output, Reporter reporter) {
@@ -474,14 +307,7 @@
    * @throws IOException
    */
   void splitKeyVal(byte[] line, Text key, Text val) throws IOException {
-    int pos = -1;
-    if (keyCols_ != ALL_COLS) {
-      pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
-    }
-    LOG.info("FieldSeparator: " + this.getFieldSeparator());
-    LOG.info("NumOfKeyFields: " + this.getNumOfKeyFields());
-    LOG.info("Line: " + new String (line));
-    LOG.info("Pos: " + pos);
+    int pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
     try {
       if (pos == -1) {
         key.set(line);
@@ -508,15 +334,10 @@
         Text val = new Text();
         // 3/4 Tool to Hadoop
         while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) {
-          // 4/4 Hadoop out
-          if (optSideEffect_) {
-            sideEffectOut_.write(answer);
-            sideEffectOut_.write('\n');
-            sideEffectOut_.flush();
-          } else {
-            splitKeyVal(answer, key, val);
-            output.collect(key, val);
-          }
+          
+          splitKeyVal(answer, key, val);
+          output.collect(key, val);
+          
           numRecWritten_++;
           long now = System.currentTimeMillis();
           if (now-lastStdoutReport > reporterOutDelay_) {
@@ -584,18 +405,6 @@
       } catch (IOException io) {
       }
       waitOutputThreads();
-      try {
-        if (optSideEffect_) {
-          logprintln("closing " + finalOutputURI);
-          if (sideEffectOut_ != null) sideEffectOut_.close();
-          logprintln("closed  " + finalOutputURI);
-          if (!useSingleSideOutputURI_) {
-            ((PhasedFileSystem)sideFs_).commit(); 
-          }
-        }
-      } catch (IOException io) {
-        io.printStackTrace();
-      }
       if (sim != null) sim.destroy();
     } catch (RuntimeException e) {
       logStackTrace(e);
@@ -692,18 +501,11 @@
   
   long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
 
-  int keyCols_;
-  final static int ALL_COLS = Integer.MAX_VALUE;
-
   long reporterOutDelay_ = 10*1000L; 
   long reporterErrDelay_ = 10*1000L; 
   long joinDelay_;
   JobConf job_;
   FileSystem fs_;
-  FileSystem sideFs_;
-
-  // generic MapRed parameters passed on by hadoopStreaming
-  int reportPortPlusOne_;
 
   boolean doPipe_;
   boolean debug_;
@@ -723,17 +525,6 @@
   String mapredKey_;
   int numExceptions_;
   StreamUtil.TaskId taskId_;
-
-  boolean optUseKey_ = true;
-
-  private boolean optSideEffect_;
-  private URI finalOutputURI;
-  private Path sideEffectPathFinal_;
-
-  private boolean useSingleSideOutputURI_;
-  private String sideOutputURI_;
-
-  private OutputStream sideEffectOut_;
 
   protected volatile Throwable outerrThreadsThrowable;
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu May  3 12:33:27 2007
@@ -25,8 +25,10 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
@@ -36,6 +38,8 @@
  */
 public class PipeMapper extends PipeMapRed implements Mapper {
 
+  private boolean ignoreKey = false;
+  
   String getPipeCommand(JobConf job) {
     String str = job.get("stream.map.streamprocessor");
     if (str == null) {
@@ -50,17 +54,15 @@
     }
   }
 
-  String getKeyColPropName() {
-    return "mapKeyCols";
-  }
-
-  boolean getUseSideEffect() {
-    return StreamUtil.getUseMapSideEffect(job_);
-  }
-
   boolean getDoPipe() {
     return true;
   }
+  
+  public void configure(JobConf job) {
+    super.configure(job);
+    String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
+    this.ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+  }
 
   // Do NOT declare default constructor
   // (MapRed creates it reflectively)
@@ -86,7 +88,7 @@
 
       // 2/4 Hadoop to Tool
       if (numExceptions_ == 0) {
-        if (optUseKey_) {
+        if (!this.ignoreKey) {
           write(key);
           clientOut_.write('\t');
         }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Thu May  3 12:33:27 2007
@@ -57,10 +57,6 @@
     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
   }
 
-  String getKeyColPropName() {
-    return "reduceKeyCols";
-  }
-
   public void reduce(WritableComparable key, Iterator values, OutputCollector output,
                      Reporter reporter) throws IOException {
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Thu May  3 12:33:27 2007
@@ -22,7 +22,6 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,7 +40,6 @@
 import org.apache.commons.cli2.option.PropertyOption;
 import org.apache.commons.cli2.resource.ResourceConstants;
 import org.apache.commons.cli2.util.HelpFormatter;
-import org.apache.commons.cli2.validation.FileValidator;
 import org.apache.commons.cli2.validation.InvalidArgumentException;
 import org.apache.commons.cli2.validation.Validator;
 import org.apache.commons.logging.*;
@@ -61,6 +59,7 @@
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
@@ -73,8 +72,7 @@
 
   protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
   final static String REDUCE_NONE = "NONE";
-  private boolean reducerNone_;
-
+    
   /** -----------Streaming CLI Implementation  **/
   private DefaultOptionBuilder builder = 
     new DefaultOptionBuilder("-","-", false);
@@ -214,7 +212,6 @@
       
       inputSpecs_.addAll(cmdLine.getValues("-input"));
       output_ = (String) cmdLine.getValue("-output"); 
-      mapsideoutURI_ = (String) cmdLine.getValue("-mapsideoutput");
       
       mapCmd_ = (String)cmdLine.getValue("-mapper"); 
       comCmd_ = (String)cmdLine.getValue("-combiner"); 
@@ -450,20 +447,17 @@
       System.out.println("Options:");
       System.out.println("  -input    <path>     DFS input file(s) for the Map step");
       System.out.println("  -output   <path>     DFS output directory for the Reduce step");
-      System.out.println("  -mapper   <cmd>      The streaming command to run");
-      System.out.println("  -combiner <cmd>      The streaming command to run");
-      System.out.println("  -reducer  <cmd>      The streaming command to run");
+      System.out.println("  -mapper   <cmd|JavaClassName>      The streaming command to run");
+      System.out.println("  -combiner <JavaClassName> Combiner has to be a Java class");
+      System.out.println("  -reducer  <cmd|JavaClassName>      The streaming command to run");
       System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
-      //Only advertise the standard way: [--config dir] in our launcher 
-      //System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
-      //System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
       System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");
       System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");
       System.out.println("  -additionalconfspec specfile  Optional.");
-      System.out.println("  -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat  Optional.");
-      System.out.println("  -outputformat specfile  Optional.");
-      System.out.println("  -partitioner specfile  Optional.");
-      System.out.println("  -numReduceTasks specfile  Optional.");
+      System.out.println("  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.");
+      System.out.println("  -outputformat TextOutputFormat(default)|JavaClassName  Optional.");
+      System.out.println("  -partitioner JavaClassName  Optional.");
+      System.out.println("  -numReduceTasks <num>  Optional.");
       System.out.println("  -inputreader <spec>  Optional.");
       System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
       System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
@@ -478,10 +472,7 @@
     System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
     System.out.println("Default Map input format: a line is a record in UTF-8");
     System.out.println("  the key part ends at first TAB, the rest of the line is the value");
-    System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
-    System.out
-      .println("  comma-separated name-values can be specified to configure the InputFormat");
-    System.out.println("  Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
+    System.out.println("Custom input format: -inputformat package.MyInputFormat ");
     System.out.println("Map output format, reduce input/output format:");
     System.out.println("  Format defined by what the mapper command outputs. Line-oriented");
     System.out.println();
@@ -489,34 +480,21 @@
     System.out.println("  working directory when the mapper and reducer are run.");
     System.out.println("  The location of this working directory is unspecified.");
     System.out.println();
-    //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
-    //System.out.println("  Hadoop clusters. ");
-    //System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");
-    //System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
-    //System.out.println();
+    System.out.println("To set the number of reduce tasks (num. of output files):");
+    System.out.println("  -jobconf mapred.reduce.tasks=10");
     System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
-    System.out.println("  Use -reducer " + REDUCE_NONE);
+    System.out.println("  Use -numReduceTasks 0");
     System.out
       .println("  A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
     System.out
       .println("  This speeds up processing, This also feels more like \"in-place\" processing");
     System.out.println("  because the input filename and the map input order are preserved");
-    System.out.println("To specify a single side-effect output file");
-    System.out.println("    -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated
-    System.out.println("  If the jobtracker is local this is a local file");
-    System.out.println("  This currently requires -reducer NONE");
+    System.out.println("  This equivalent -reducer NONE");
     System.out.println();
-    System.out.println("To set the number of reduce tasks (num. of output files):");
-    System.out.println("  -jobconf mapred.reduce.tasks=10");
     System.out.println("To speed up the last reduces:");
     System.out.println("  -jobconf mapred.speculative.execution=true");
-    System.out.println("  Do not use this along -reducer " + REDUCE_NONE);
     System.out.println("To name the job (appears in the JobTracker Web UI):");
     System.out.println("  -jobconf mapred.job.name='My Job' ");
-    System.out.println("To specify that line-oriented input is in gzip format:");
-    System.out
-      .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
-    System.out.println("   -jobconf stream.recordreader.compression=gzip ");
     System.out.println("To change the local temp directory:");
     System.out.println("  -jobconf dfs.data.dir=/tmp/dfs");
     System.out.println("  -jobconf stream.tmpdir=/tmp/streaming");
@@ -681,8 +659,6 @@
       config_.addFinalResource(new Path(pathName));
     }
 
-    testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge"));
-
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
     
@@ -695,25 +671,32 @@
     // (to resolve local vs. dfs drive letter differences) 
     // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
     for (int i = 0; i < inputSpecs_.size(); i++) {
-      addInputSpec((String) inputSpecs_.get(i), i);
+      jobConf_.addInputPath(new Path(((String) inputSpecs_.get(i))));
     }
-    jobConf_.setBoolean("stream.inputtagged", inputTagged_);
     jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
 
     String defaultPackage = this.getClass().getPackage().getName();
     Class c;
     Class fmt = null;
     if (inReaderSpec_ == null && inputFormatSpec_ == null) {
-      fmt = KeyValueTextInputFormat.class;
+      fmt = TextInputFormat.class;
     } else if (inputFormatSpec_ != null) {
-      if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName())
-          || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) {
-        fmt = KeyValueTextInputFormat.class;
-      } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName())
-          || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) {
-        fmt = SequenceFileInputFormat.class;
-      } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName())
-          || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) {
+      if (inputFormatSpec_.equals(TextInputFormat.class.getName())
+          || inputFormatSpec_.equals(TextInputFormat.class.getCanonicalName())) {
+        fmt = TextInputFormat.class;
+      } else if (inputFormatSpec_.equals(KeyValueTextInputFormat.class
+          .getName())
+          || inputFormatSpec_.equals(KeyValueTextInputFormat.class
+              .getCanonicalName())) {
+      } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class
+          .getName())
+          || inputFormatSpec_
+              .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class
+                  .getCanonicalName())) {
+      } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
+          .getName())
+          || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
+              .getCanonicalName())) {
         fmt = SequenceFileAsTextInputFormat.class;
       } else {
         c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
@@ -725,14 +708,7 @@
       }
     } 
     if (fmt == null) {
-      if (testMerge_ && false == hasSimpleInputSpecs_) {
-        // this ignores -inputreader
-        fmt = MergerInputFormat.class;
-      } else {
-        // need to keep this case to support custom -inputreader
-        // and their parameters ,n=v,n=v
-        fmt = StreamInputFormat.class;
-      }
+      fmt = StreamInputFormat.class;
     }
 
     jobConf_.setInputFormat(fmt);
@@ -757,13 +733,10 @@
       c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
       if (c != null) {
         jobConf_.setCombinerClass(c);
-      } else {
-        jobConf_.setCombinerClass(PipeCombiner.class);
-        jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(comCmd_, "UTF-8"));
-      }
+      } 
     }
 
-    reducerNone_ = false;
+    boolean reducerNone_ = false;
     if (redCmd_ != null) {
       reducerNone_ = redCmd_.equals(REDUCE_NONE);
       if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
@@ -801,9 +774,7 @@
     }
     
     setUserJobConfProps(false);
-    // output setup is done late so we can customize for reducerNone_
-    //jobConf_.setOutputDir(new File(output_));
-    setOutputSpec();
+    jobConf_.setOutputPath(new Path(output_));
     fmt = null;
     if (outputFormatSpec_!= null) {
       c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage);
@@ -812,11 +783,7 @@
       } 
     }
     if (fmt == null) {
-      if (testMerge_) {
-        fmt = MuxOutputFormat.class;
-      } else {
-        fmt = TextOutputFormat.class;
-      }
+      fmt = TextOutputFormat.class;
     }
     jobConf_.setOutputFormat(fmt);
 
@@ -831,6 +798,9 @@
       int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
       jobConf_.setNumReduceTasks(numReduceTasks);
     }
+    if (reducerNone_) {
+      jobConf_.setNumReduceTasks(0);
+    }
     
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
@@ -880,78 +850,6 @@
     }
     msg("====");
   }
-  
-  /** InputSpec-s encode: a glob pattern x additional column files x additional joins */
-  protected void addInputSpec(String inSpec, int index) {
-    if (!testMerge_) {
-      jobConf_.addInputPath(new Path(inSpec));
-    } else {
-      CompoundDirSpec spec = new CompoundDirSpec(inSpec, true);
-      msg("Parsed -input:\n" + spec.toTableString());
-      if (index == 0) {
-        hasSimpleInputSpecs_ = (spec.paths_.length == 0);
-        msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_);
-      }
-      String primary = spec.primarySpec();
-      if (!seenPrimary_.add(primary)) {
-        // this won't detect glob overlaps and noncanonical path variations
-        fail("Primary used in multiple -input spec: " + primary);
-      }
-      jobConf_.addInputPath(new Path(primary));
-      // during Job execution, will reparse into a CompoundDirSpec 
-      jobConf_.set("stream.inputspecs." + index, inSpec);
-    }
-  }
-
-  /** uses output_ and mapsideoutURI_ */
-  protected void setOutputSpec() throws IOException {
-    CompoundDirSpec spec = new CompoundDirSpec(output_, false);
-    msg("Parsed -output:\n" + spec.toTableString());
-    String primary = spec.primarySpec();
-    String channel0;
-    // TODO simplify cases, encapsulate in a StreamJobConf
-    if (!reducerNone_) {
-      channel0 = primary;
-    } else {
-      if (mapsideoutURI_ != null) {
-        // user can override in case this is in a difft filesystem..
-        try {
-          URI uri = new URI(mapsideoutURI_);
-          if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs")
-            if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) {
-              fail("Must be absolute: " + mapsideoutURI_);
-            }
-          } else if (uri.getScheme().equals("socket")) {
-            // ok
-          } else {
-            fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_);
-          }
-        } catch (URISyntaxException e) {
-          throw (IOException) new IOException().initCause(e);
-        }
-      }
-      // an empty reduce output named "part-00002" will go here and not collide.
-      channel0 = primary + ".NONE";
-      // the side-effect of the first split of an input named "part-00002" 
-      // will go in this directory
-      jobConf_.set("stream.sideoutput.dir", primary);
-      // oops if user overrides low-level this isn't set yet :-(
-      boolean localjt = StreamUtil.isLocalJobTracker(jobConf_);
-      // just a guess user may prefer remote..
-      jobConf_.setBoolean("stream.sideoutput.localfs", localjt);
-    }
-    // a path in fs.name.default filesystem
-    System.out.println(channel0);
-    System.out.println(new Path(channel0));
-    jobConf_.setOutputPath(new Path(channel0));
-    // will reparse remotely
-    jobConf_.set("stream.outputspec", output_);
-    if (null != mapsideoutURI_) {
-      // a path in "jobtracker's filesystem"
-      // overrides sideoutput.dir
-      jobConf_.set("stream.sideoutput.uri", mapsideoutURI_);
-    }
-  }
 
   protected String getJobTrackerHostPort() {
     return jobConf_.get("mapred.job.tracker");
@@ -1099,15 +997,12 @@
 
   // command-line arguments
   protected ArrayList inputSpecs_ = new ArrayList(); // <String>
-  protected boolean inputTagged_ = false;
   protected TreeSet seenPrimary_ = new TreeSet(); // <String>
   protected boolean hasSimpleInputSpecs_;
   protected ArrayList packageFiles_ = new ArrayList(); // <String>
   protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
-  //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
   protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); 
   protected String output_;
-  protected String mapsideoutURI_;
   protected String mapCmd_;
   protected String comCmd_;
   protected String redCmd_;
@@ -1124,8 +1019,6 @@
   protected String partitionerSpec_;
   protected String numReduceTasksSpec_;
   protected String additionalConfSpec_;
-
-  protected boolean testMerge_;
 
   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
   // encoding "a=b c=d"

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Thu May  3 12:33:27 2007
@@ -453,20 +453,6 @@
     return res;
   }
 
-  static boolean getUseMapSideEffect(JobConf job) {
-    String reduce = job.get("stream.reduce.streamprocessor");
-    if (reduce == null) {
-      return false;
-    }
-    try {
-      reduce = URLDecoder.decode(reduce, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      System.err.println("stream.reduce.streamprocessor in jobconf not found");
-      return false;
-    }
-    return StreamJob.REDUCE_NONE.equals(reduce);
-  }
-
   public static void touch(File file) throws IOException {
     file = file.getAbsoluteFile();
     FileOutputStream out = new FileOutputStream(file);

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Thu May  3 12:33:27 2007
@@ -19,10 +19,8 @@
 package org.apache.hadoop.streaming;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.PushbackInputStream;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.LineRecordReader;

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Thu May  3 12:33:27 2007
@@ -38,10 +38,11 @@
   protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
   // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
   protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
-  // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
+  // reduce behave like /usr/bin/uniq. But also prepend lines with R.
+  // command-line combiner does not have any effect any more.
   protected String combine  = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"});
   protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
-  protected String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
+  protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n";
 
   private StreamJob job;
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=534970&r1=534969&r2=534970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Thu May  3 12:33:27 2007
@@ -41,7 +41,7 @@
     // test that some JobConf properties are exposed as expected     
     // Note the dots translated to underscore: 
     // property names have been escaped in PipeMapRed.safeEnvVarName()
-    expect("mapred_input_format_class", "org.apache.hadoop.mapred.KeyValueTextInputFormat");
+    expect("mapred_input_format_class", "org.apache.hadoop.mapred.TextInputFormat");
     expect("mapred_job_tracker", "local");
     //expect("mapred_local_dir", "build/test/mapred/local");
     expectDefined("mapred_local_dir");