You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/19 01:08:08 UTC

svn commit: r447626 [2/3] - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Mon Sep 18 16:08:06 2006
@@ -30,60 +30,55 @@
  *  It delegates operations to an external program via stdin and stdout.
  *  @author Michel Tourn
  */
-public class PipeMapper extends PipeMapRed implements Mapper
-{
+public class PipeMapper extends PipeMapRed implements Mapper {
 
-  String getPipeCommand(JobConf job)
-  {
+  String getPipeCommand(JobConf job) {
     return job.get("stream.map.streamprocessor");
   }
 
-  String getKeyColPropName()
-  {
+  String getKeyColPropName() {
     return "mapKeyCols";
-  }  
+  }
 
-  boolean getUseSideEffect()
-  {
-    String reduce = job_.get("stream.reduce.streamprocessor");
-    if(StreamJob.REDUCE_NONE.equals(reduce)) {
-      return true;  
-    }
-    return false;
+  boolean getUseSideEffect() {
+    return StreamUtil.getUseMapSideEffect(job_);
+  }
+
+  boolean getDoPipe() {
+    return true;
   }
-  
 
   // Do NOT declare default constructor
   // (MapRed creates it reflectively)
 
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
-    throws IOException
-  {
+  public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
     // init
-    if(outThread_ == null) {
+    if (outThread_ == null) {
       startOutputThreads(output, reporter);
     }
     try {
       // 1/4 Hadoop in
       numRecRead_++;
       maybeLogRecord();
+      if (debugFailDuring_ && numRecRead_ == 3) {
+        throw new IOException("debugFailDuring_");
+      }
 
       // 2/4 Hadoop to Tool
-      if(numExceptions_==0) {
-          if(optUseKey_) {
-              write(key);
-              clientOut_.write('\t');
-          }
-          write(value);
-          clientOut_.write('\n');
-          clientOut_.flush();
+      if (numExceptions_ == 0) {
+        if (optUseKey_) {
+          write(key);
+          clientOut_.write('\t');
+        }
+        write(value);
+        clientOut_.write('\n');
+        clientOut_.flush();
       } else {
-          numRecSkipped_++;
+        numRecSkipped_++;
       }
-    } catch(IOException io) {
+    } catch (IOException io) {
       numExceptions_++;
-      if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
+      if (numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
         // terminate with failure
         String msg = logFailure(io);
         appendLogToJobLog("failure");
@@ -95,11 +90,10 @@
       }
     }
   }
-  
-  public void close()
-  {
+
+  public void close() {
     appendLogToJobLog("success");
     mapRedFinished();
   }
-  
+
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Mon Sep 18 16:08:06 2006
@@ -31,33 +31,35 @@
  *  It delegates operations to an external program via stdin and stdout.
  *  @author Michel Tourn
  */
-public class PipeReducer extends PipeMapRed implements Reducer
-{
+public class PipeReducer extends PipeMapRed implements Reducer {
 
-  String getPipeCommand(JobConf job)
-  {
+  String getPipeCommand(JobConf job) {
     return job.get("stream.reduce.streamprocessor");
   }
 
-  String getKeyColPropName()
-  {
+  boolean getDoPipe() {
+    String argv = getPipeCommand(job_);
+    // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
+    return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
+  }
+
+  String getKeyColPropName() {
     return "reduceKeyCols";
-  }  
-  
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
-    throws IOException {
+  }
+
+  public void reduce(WritableComparable key, Iterator values, OutputCollector output,
+      Reporter reporter) throws IOException {
 
     // init
-    if(doPipe_ && outThread_ == null) {
+    if (doPipe_ && outThread_ == null) {
       startOutputThreads(output, reporter);
     }
     try {
       while (values.hasNext()) {
-        Writable val = (Writable)values.next();
+        Writable val = (Writable) values.next();
         numRecRead_++;
         maybeLogRecord();
-        if(doPipe_) {
+        if (doPipe_) {
           write(key);
           clientOut_.write('\t');
           write(val);
@@ -68,15 +70,14 @@
           output.collect(key, val);
         }
       }
-    } catch(IOException io) {
+    } catch (IOException io) {
       appendLogToJobLog("failure");
       mapRedFinished();
-      throw new IOException(getContext() + io.getMessage());    
+      throw new IOException(getContext() + io.getMessage());
     }
   }
 
-  public void close()
-  {
+  public void close() {
     appendLogToJobLog("success");
     mapRedFinished();
   }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Mon Sep 18 16:08:06 2006
@@ -30,7 +30,6 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.commons.logging.*;
 
-
 /** 
  * Shared functionality for hadoopStreaming formats.
  * A custom reader can be defined to be a RecordReader with the constructor below
@@ -39,18 +38,15 @@
  * @see StreamXmlRecordReader 
  * @author Michel Tourn
  */
-public abstract class StreamBaseRecordReader implements RecordReader
-{
-    
+public abstract class StreamBaseRecordReader implements RecordReader {
+
   protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName());
-  
+
   // custom JobConf properties for this class are prefixed with this namespace
   final static String CONF_NS = "stream.recordreader.";
 
-  public StreamBaseRecordReader(
-    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
-    throws IOException
-  {
+  public StreamBaseRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+      JobConf job, FileSystem fs) throws IOException {
     in_ = in;
     split_ = split;
     start_ = split_.getStart();
@@ -60,99 +56,90 @@
     reporter_ = reporter;
     job_ = job;
     fs_ = fs;
-    
+
     statusMaxRecordChars_ = job_.getInt(CONF_NS + "statuschars", 200);
   }
 
   /// RecordReader API
-  
+
   /** Read a record. Implementation should call numRecStats at the end
-   */  
+   */
   public abstract boolean next(Writable key, Writable value) throws IOException;
 
   /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys,
-                                     Path[] inputDirs) throws IOException
-  {
+  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
     int n = inputDirs.length;
     boolean[] b = new boolean[n];
-    for(int i=0; i<n; i++) {
+    for (int i = 0; i < n; i++) {
       b[i] = true;
     }
     return b;
   }
 
   /** Returns the current position in the input. */
-  public synchronized long getPos() throws IOException 
-  { 
-    return in_.getPos(); 
+  public synchronized long getPos() throws IOException {
+    return in_.getPos();
   }
 
   /** Close this to future operations.*/
-  public synchronized void close() throws IOException 
-  { 
-    in_.close(); 
+  public synchronized void close() throws IOException {
+    in_.close();
   }
 
   public WritableComparable createKey() {
     return new Text();
   }
-  
+
   public Writable createValue() {
     return new Text();
   }
-  
+
   /// StreamBaseRecordReader API
 
-  public void init() throws IOException
-  {
-    LOG.info("StreamBaseRecordReader.init: " +
-    " start_=" + start_ + " end_=" + end_ + " length_=" + length_ +
-    " start_ > in_.getPos() =" 
-        + (start_ > in_.getPos()) + " " + start_ 
-        + " > " + in_.getPos() );
+  public void init() throws IOException {
+    LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
+        + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
+        + in_.getPos());
     if (start_ > in_.getPos()) {
       in_.seek(start_);
-    }  
+    }
     seekNextRecordBoundary();
   }
-  
+
   /** Implementation should seek forward in_ to the first byte of the next record.
    *  The initial byte offset in the stream is arbitrary.
    */
   public abstract void seekNextRecordBoundary() throws IOException;
-  
-    
-  void numRecStats(byte[] record, int start, int len) throws IOException
-  {
-    numRec_++;          
-    if(numRec_ == nextStatusRec_) {
-      String recordStr = new String(record, start, 
-                Math.min(len, statusMaxRecordChars_), "UTF-8");    
-      nextStatusRec_ +=100;//*= 10;
+
+  void numRecStats(byte[] record, int start, int len) throws IOException {
+    numRec_++;
+    if (numRec_ == nextStatusRec_) {
+      String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars_), "UTF-8");
+      nextStatusRec_ += 100;//*= 10;
       String status = getStatus(recordStr);
       LOG.info(status);
       reporter_.setStatus(status);
     }
   }
 
- long lastMem =0;
- String getStatus(CharSequence record)
- {
+  long lastMem = 0;
+
+  String getStatus(CharSequence record) {
     long pos = -1;
-    try { 
+    try {
       pos = getPos();
-    } catch(IOException io) {
+    } catch (IOException io) {
     }
     String recStr;
-    if(record.length() > statusMaxRecordChars_) {
-        recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
+    if (record.length() > statusMaxRecordChars_) {
+      recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
     } else {
-    	recStr = record.toString();
+      recStr = record.toString();
     }
-    String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+" + split_.getLength();
-    String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos
-     + " " + unqualSplit + " Processing record=" + recStr;
+    String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+"
+        + split_.getLength();
+    String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " " + unqualSplit
+        + " Processing record=" + recStr;
     status += " " + splitName_;
     return status;
   }
@@ -169,5 +156,5 @@
   int numRec_ = 0;
   int nextStatusRec_ = 1;
   int statusMaxRecordChars_;
-  
+
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Sep 18 16:08:06 2006
@@ -39,49 +39,38 @@
  * selects a RecordReader based on a JobConf property.
  * @author Michel Tourn
  */
-public class StreamInputFormat extends InputFormatBase
-{
+public class StreamInputFormat extends InputFormatBase {
 
   // an InputFormat should be public with the synthetic public default constructor
   // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
 
   protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
 
-  static {
-    //LOG.setLevel(Level.FINE);
-  }
-
   /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys,
-                                            Path[] inputDirs
-                                            ) throws IOException {
+  public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
     boolean[] b = new boolean[inputDirs.length];
-    for(int i=0; i < inputDirs.length; ++i) {
+    for (int i = 0; i < inputDirs.length; ++i) {
       b[i] = true;
     }
     return b;
   }
 
-  static boolean isGzippedInput(JobConf job)
-  {
+  static boolean isGzippedInput(JobConf job) {
     String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
     return "gzip".equals(val);
   }
 
-  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)
-    throws IOException {
-      
-    if(isGzippedInput(job)) {
+  public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
+
+    if (isGzippedInput(job)) {
       return getFullFileSplits(fs, job);
     } else {
       return super.getSplits(fs, job, numSplits);
-    }   
+    }
   }
-  
+
   /** For the compressed-files case: override InputFormatBase to produce one split. */
-  FileSplit[] getFullFileSplits(FileSystem fs, JobConf job)
-    throws IOException
-  {
+  FileSplit[] getFullFileSplits(FileSystem fs, JobConf job) throws IOException {
     Path[] files = listPaths(fs, job);
     int numSplits = files.length;
     ArrayList splits = new ArrayList(numSplits);
@@ -90,37 +79,35 @@
       long splitSize = fs.getLength(file);
       splits.add(new FileSplit(file, 0, splitSize));
     }
-    return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
+    return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
   }
 
-  protected Path[] listPaths(FileSystem fs, JobConf job)
-    throws IOException
-  {
+  protected Path[] listPaths(FileSystem fs, JobConf job) throws IOException {
     Path[] globs = job.getInputPaths();
     ArrayList list = new ArrayList();
     int dsup = globs.length;
-    for(int d=0; d<dsup; d++) {
+    for (int d = 0; d < dsup; d++) {
       String leafName = globs[d].getName();
       LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
-      Path[] paths; Path dir;
+      Path[] paths;
+      Path dir;
       PathFilter filter = new GlobFilter(fs, leafName);
       dir = new Path(globs[d].getParent().toString());
-      if(dir == null) dir = new Path(".");
+      if (dir == null) dir = new Path(".");
       paths = fs.listPaths(dir, filter);
       list.addAll(Arrays.asList(paths));
     }
-    return (Path[])list.toArray(new Path[]{});
+    return (Path[]) list.toArray(new Path[] {});
   }
 
-  class GlobFilter implements PathFilter
-  {
-    public GlobFilter(FileSystem fs, String glob)
-    {
+  class GlobFilter implements PathFilter {
+
+    public GlobFilter(FileSystem fs, String glob) {
       fs_ = fs;
       pat_ = Pattern.compile(globToRegexp(glob));
     }
-    String globToRegexp(String glob)
-    {
+
+    String globToRegexp(String glob) {
       String re = glob;
       re = re.replaceAll("\\.", "\\\\.");
       re = re.replaceAll("\\+", "\\\\+");
@@ -130,11 +117,10 @@
       return re;
     }
 
-    public boolean accept(Path pathname)
-    {
+    public boolean accept(Path pathname) {
       boolean acc = !fs_.isChecksumFile(pathname);
-      if(acc) {
-          acc = pat_.matcher(pathname.getName()).matches();
+      if (acc) {
+        acc = pat_.matcher(pathname.getName()).matches();
       }
       LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
       return acc;
@@ -144,10 +130,9 @@
     FileSystem fs_;
   }
 
-  public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
-                                      JobConf job, Reporter reporter)
-    throws IOException {
-    LOG.info("getRecordReader start.....");
+  public RecordReader getRecordReader(FileSystem fs, final FileSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+    LOG.info("getRecordReader start.....split=" + split);
     reporter.setStatus(split.toString());
 
     final long start = split.getStart();
@@ -160,45 +145,41 @@
     // Factory dispatch based on available params..
     Class readerClass;
     String c = job.get("stream.recordreader.class");
-    if(c == null) {
+    if (c == null) {
       readerClass = StreamLineRecordReader.class;
     } else {
       readerClass = StreamUtil.goodClassOrNull(c, null);
-      if(readerClass == null) {
+      if (readerClass == null) {
         throw new RuntimeException("Class not found: " + c);
       }
     }
 
     Constructor ctor;
     try {
-      ctor = readerClass.getConstructor(new Class[]{
-        FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
-    } catch(NoSuchMethodException nsm) {
+      ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, FileSplit.class,
+          Reporter.class, JobConf.class, FileSystem.class });
+    } catch (NoSuchMethodException nsm) {
       throw new RuntimeException(nsm);
     }
 
-
     StreamBaseRecordReader reader;
     try {
-        reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
-            in, split, reporter, job, fs});
-    } catch(Exception nsm) {
+      reader = (StreamBaseRecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
+          fs });
+    } catch (Exception nsm) {
       throw new RuntimeException(nsm);
     }
 
     reader.init();
 
-
-    if(reader instanceof StreamSequenceRecordReader) {
+    if (reader instanceof StreamSequenceRecordReader) {
       // override k/v class types with types stored in SequenceFile
-      StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
+      StreamSequenceRecordReader ss = (StreamSequenceRecordReader) reader;
       job.setInputKeyClass(ss.rin_.getKeyClass());
       job.setInputValueClass(ss.rin_.getValueClass());
     }
 
-
     return reader;
   }
 
-  
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Sep 18 16:08:06 2006
@@ -18,11 +18,14 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.*;
 
@@ -39,20 +42,19 @@
  * (Jar packaging, MapRed job submission and monitoring)
  * @author Michel Tourn
  */
-public class StreamJob
-{
-  protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
+public class StreamJob {
 
+  protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
   final static String REDUCE_NONE = "NONE";
 
-  public StreamJob(String[] argv, boolean mayExit)
-  {
+  private boolean reducerNone_;
+
+  public StreamJob(String[] argv, boolean mayExit) {
     argv_ = argv;
     mayExit_ = mayExit;
   }
 
-  public void go() throws IOException
-  {
+  public void go() throws IOException {
     init();
 
     preProcessArgs();
@@ -63,40 +65,37 @@
     submitAndMonitorJob();
   }
 
-  protected void init()
-  {
-     try {
-        env_ = new Environment();
-     } catch(IOException io) {
-        throw new RuntimeException(io);
-     }
+  protected void init() {
+    try {
+      env_ = new Environment();
+    } catch (IOException io) {
+      throw new RuntimeException(io);
+    }
   }
 
-  void preProcessArgs()
-  {
+  void preProcessArgs() {
     verbose_ = false;
     addTaskEnvironment_ = "";
   }
 
-  void postProcessArgs() throws IOException
-  {
-    if(cluster_ == null) {
-        // hadoop-default.xml is standard, hadoop-local.xml is not.
-        cluster_ = "default";
+  void postProcessArgs() throws IOException {
+    if (cluster_ == null) {
+      // hadoop-default.xml is standard, hadoop-local.xml is not.
+      cluster_ = "default";
     }
     hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml";
-    if(inputGlobs_.size() == 0) {
-        fail("Required argument: -input <name>");
+    if (inputSpecs_.size() == 0) {
+      fail("Required argument: -input <name>");
     }
-    if(output_ == null) {
-        fail("Required argument: -output ");
+    if (output_ == null) {
+      fail("Required argument: -output ");
     }
     msg("addTaskEnvironment=" + addTaskEnvironment_);
 
     Iterator it = packageFiles_.iterator();
-    while(it.hasNext()) {
-      File f = new File((String)it.next());
-      if(f.isFile()) {
+    while (it.hasNext()) {
+      File f = new File((String) it.next());
+      if (f.isFile()) {
         shippedCanonFiles_.add(f.getCanonicalPath());
       }
     }
@@ -108,37 +107,40 @@
     redCmd_ = unqualifyIfLocalPath(redCmd_);
   }
 
-  void validateNameEqValue(String neqv)
-  {
+  void validateNameEqValue(String neqv) {
     String[] nv = neqv.split("=", 2);
-    if(nv.length < 2) {
-        fail("Invalid name=value spec: " + neqv);
+    if (nv.length < 2) {
+      fail("Invalid name=value spec: " + neqv);
     }
     msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
   }
 
-  String unqualifyIfLocalPath(String cmd) throws IOException
-  {
-    if(cmd == null) {
+  String unqualifyIfLocalPath(String cmd) throws IOException {
+    if (cmd == null) {
       //
     } else {
       String prog = cmd;
       String args = "";
       int s = cmd.indexOf(" ");
-      if(s != -1) {
+      if (s != -1) {
         prog = cmd.substring(0, s);
-        args = cmd.substring(s+1);
+        args = cmd.substring(s + 1);
+      }
+      String progCanon;
+      try {
+        progCanon = new File(prog).getCanonicalPath();
+      } catch (IOException io) {
+        progCanon = prog;
       }
-      String progCanon = new File(prog).getCanonicalPath();
       boolean shipped = shippedCanonFiles_.contains(progCanon);
       msg("shipped: " + shipped + " " + progCanon);
-      if(shipped) {
+      if (shipped) {
         // Change path to simple filename.
         // That way when PipeMapRed calls Runtime.exec(),
         // it will look for the excutable in Task's working dir.
         // And this is where TaskRunner unjars our job jar.
         prog = new File(prog).getName();
-        if(args.length() > 0) {
+        if (args.length() > 0) {
           cmd = prog + " " + args;
         } else {
           cmd = prog;
@@ -149,68 +151,70 @@
     return cmd;
   }
 
-  String getHadoopAliasConfFile()
-  {
+  String getHadoopAliasConfFile() {
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
   }
 
-
-  void parseArgv()
-  {
-    if(argv_.length==0) {
+  void parseArgv() {
+    if (argv_.length == 0) {
       exitUsage(false);
     }
-    int i=0;
-    while(i < argv_.length) {
+    int i = 0;
+    while (i < argv_.length) {
       String s;
-      if(argv_[i].equals("-verbose")) {
+      if (argv_[i].equals("-verbose")) {
         verbose_ = true;
-      } else if(argv_[i].equals("-info")) {
+      } else if (argv_[i].equals("-info")) {
         detailedUsage_ = true;
-      } else if(argv_[i].equals("-debug")) {
+      } else if (argv_[i].equals("-debug")) {
         debug_++;
-      } else if((s = optionArg(argv_, i, "-input", false)) != null) {
+      } else if ((s = optionArg(argv_, i, "-input", false)) != null) {
         i++;
-        inputGlobs_.add(s);
-      } else if((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
+        inputSpecs_.add(s);
+      } else if (argv_[i].equals("-inputtagged")) {
+        inputTagged_ = true;
+      } else if ((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
         i++;
         output_ = s;
-      } else if((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
+      } else if ((s = optionArg(argv_, i, "-mapsideoutput", mapsideoutURI_ != null)) != null) {
+        i++;
+        mapsideoutURI_ = s;
+      } else if ((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
         i++;
         mapCmd_ = s;
-      } else if((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
+      } else if ((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
         i++;
         comCmd_ = s;
-      } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
+      } else if ((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
         i++;
         redCmd_ = s;
-      } else if((s = optionArg(argv_, i, "-file", false)) != null) {
+      } else if ((s = optionArg(argv_, i, "-file", false)) != null) {
         i++;
         packageFiles_.add(s);
-      } else if((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
+      } else if ((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
         i++;
         cluster_ = s;
-      } else if((s = optionArg(argv_, i, "-config", false)) != null) {
+      } else if ((s = optionArg(argv_, i, "-config", false)) != null) {
         i++;
         configPath_.add(s);
-      } else if((s = optionArg(argv_, i, "-dfs", false)) != null) {
+      } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {
         i++;
-        userJobConfProps_.add("fs.default.name="+s);
-      } else if((s = optionArg(argv_, i, "-jt", false)) != null) {
+        userJobConfProps_.add("fs.default.name=" + s);
+      } else if ((s = optionArg(argv_, i, "-jt", false)) != null) {
         i++;
-        userJobConfProps_.add("mapred.job.tracker="+s);
-      } else if((s = optionArg(argv_, i, "-jobconf", false)) != null) {
+        userJobConfProps_.add("mapred.job.tracker=" + s);
+      } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {
         i++;
         validateNameEqValue(s);
         userJobConfProps_.add(s);
-      } else if((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
+      } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
         i++;
         validateNameEqValue(s);
-        if(addTaskEnvironment_.length() > 0) {
-            addTaskEnvironment_ += " ";
+        if (addTaskEnvironment_.length() > 0) {
+          addTaskEnvironment_ += " ";
         }
         addTaskEnvironment_ += s;
-      } else if((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
+      } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
         i++;
         inReaderSpec_ = s;
       } else {
@@ -219,37 +223,35 @@
       }
       i++;
     }
-    if(detailedUsage_) {
-        exitUsage(true);
+    if (detailedUsage_) {
+      exitUsage(true);
     }
   }
 
-  String optionArg(String[] args, int index, String arg, boolean argSet)
-  {
-    if(index >= args.length || ! args[index].equals(arg)) {
+  String optionArg(String[] args, int index, String arg, boolean argSet) {
+    if (index >= args.length || !args[index].equals(arg)) {
       return null;
     }
-    if(argSet) {
+    if (argSet) {
       throw new IllegalArgumentException("Can only have one " + arg + " option");
     }
-    if(index >= args.length-1) {
+    if (index >= args.length - 1) {
       throw new IllegalArgumentException("Expected argument after option " + args[index]);
     }
-    return args[index+1];
+    return args[index + 1];
   }
 
-  protected void msg(String msg)
-  {
-    if(verbose_) {
+  protected void msg(String msg) {
+    if (verbose_) {
       System.out.println("STREAM: " + msg);
     }
   }
 
-  public void exitUsage(boolean detailed)
-  {
-                      //         1         2         3         4         5         6         7
-                      //1234567890123456789012345678901234567890123456789012345678901234567890123456789
-    System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar [options]");
+  public void exitUsage(boolean detailed) {
+    //         1         2         3         4         5         6         7
+    //1234567890123456789012345678901234567890123456789012345678901234567890123456789
+    System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\");
+    System.out.println("          $HADOOP_HOME/hadoop-streaming.jar [options]");
     System.out.println("Options:");
     System.out.println("  -input    <path>     DFS input file(s) for the Map step");
     System.out.println("  -output   <path>     DFS output directory for the Reduce step");
@@ -257,58 +259,82 @@
     System.out.println("  -combiner <cmd>      The streaming command to run");
     System.out.println("  -reducer  <cmd>      The streaming command to run");
     System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
-    System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
-    System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
-    System.out.println("  -dfs      <h:p>      Optional. Override DFS configuration");
-    System.out.println("  -jt       <h:p>      Optional. Override JobTracker configuration");
+    //Only advertise the standard way: [--config dir] in our launcher 
+    //System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
+    //System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
+    System.out.println("  -dfs    <h:p>|local  Optional. Override DFS configuration");
+    System.out.println("  -jt     <h:p>|local  Optional. Override JobTracker configuration");
     System.out.println("  -inputreader <spec>  Optional.");
-    System.out.println("  -jobconf  <n>=<v>    Optional.");
+    System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
     System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
     System.out.println("  -verbose");
     System.out.println();
-    if(!detailed) {
-    System.out.println("For more details about these options:");
-    System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
-        fail("");
+    if (!detailed) {
+      System.out.println("For more details about these options:");
+      System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
+      fail("");
     }
     System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
     System.out.println("Default Map input format: a line is a record in UTF-8");
     System.out.println("  the key part ends at first TAB, the rest of the line is the value");
     System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
-    System.out.println("  comma-separated name-values can be specified to configure the InputFormat");
+    System.out
+        .println("  comma-separated name-values can be specified to configure the InputFormat");
     System.out.println("  Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
     System.out.println("Map output format, reduce input/output format:");
-    System.out.println("  Format defined by what mapper command outputs. Line-oriented");
+    System.out.println("  Format defined by what the mapper command outputs. Line-oriented");
     System.out.println();
-    System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
-    System.out.println("  Hadoop clusters. ");
-    System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");
-    System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
+    System.out.println("The files or directories named in the -file argument[s] end up in the");
+    System.out.println("  working directory when the mapper and reducer are run.");
+    System.out.println("  The location of this working directory is unspecified.");
     System.out.println();
-    System.out.println("To skip the shuffle/sort/reduce step:" );
+    //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
+    //System.out.println("  Hadoop clusters. ");
+    //System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");
+    //System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
+    //System.out.println();
+    System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
     System.out.println("  Use -reducer " + REDUCE_NONE);
-    System.out.println("  This preserves the map input order and speeds up processing");
+    System.out
+        .println("  A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
+    System.out
+        .println("  This speeds up processing, This also feels more like \"in-place\" processing");
+    System.out.println("  because the input filename and the map input order are preserved");
+    System.out.println("To specify a single side-effect output file");
+    System.out.println("    -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated
+    System.out.println("  If the jobtracker is local this is a local file");
+    System.out.println("  This currently requires -reducer NONE");
     System.out.println();
     System.out.println("To set the number of reduce tasks (num. of output files):");
     System.out.println("  -jobconf mapred.reduce.tasks=10");
-    System.out.println("To name the job (appears in the JobTrack Web UI):");
+    System.out.println("To speed up the last reduces:");
+    System.out.println("  -jobconf mapred.speculative.execution=true");
+    System.out.println("  Do not use this along -reducer " + REDUCE_NONE);
+    System.out.println("To name the job (appears in the JobTracker Web UI):");
     System.out.println("  -jobconf mapred.job.name='My Job' ");
     System.out.println("To specify that line-oriented input is in gzip format:");
-    System.out.println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
+    System.out
+        .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
     System.out.println("   -jobconf stream.recordreader.compression=gzip ");
     System.out.println("To change the local temp directory:");
-    System.out.println("  -jobconf dfs.data.dir=/tmp");
+    System.out.println("  -jobconf dfs.data.dir=/tmp/dfs");
+    System.out.println("  -jobconf stream.tmpdir=/tmp/streaming");
     System.out.println("Additional local temp directories with -cluster local:");
     System.out.println("  -jobconf mapred.local.dir=/tmp/local");
     System.out.println("  -jobconf mapred.system.dir=/tmp/system");
     System.out.println("  -jobconf mapred.temp.dir=/tmp/temp");
+    System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:");
+    System.out.println("  $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\");
+    System.out
+        .println("    [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
     System.out.println("For more details about jobconf parameters see:");
     System.out.println("  http://wiki.apache.org/lucene-hadoop/JobConfFile");
     System.out.println("To set an environement variable in a streaming command:");
     System.out.println("   -cmdenv EXAMPLE_DIR=/home/example/dictionaries/");
     System.out.println();
-    System.out.println("Shortcut to run from any directory:");
-    System.out.println("   setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/build/hadoop-streaming.jar\"");
+    System.out.println("Shortcut:");
+    System.out
+        .println("   setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar\"");
     System.out.println();
     System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\"");
     System.out.println("           -file /local/filter.pl -input \"/logs/0604*/*\" [...]");
@@ -318,81 +344,87 @@
     fail("");
   }
 
-  public void fail(String message)
-  {
-    if(mayExit_) {
-        System.err.println(message);
-        System.exit(1);
+  public void fail(String message) {
+    if (mayExit_) {
+      System.err.println(message);
+      System.exit(1);
     } else {
-       throw new IllegalArgumentException(message);
+      throw new IllegalArgumentException(message);
     }
   }
 
   // --------------------------------------------
 
-
-  protected String getHadoopClientHome()
-  {
+  protected String getHadoopClientHome() {
     String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
-    if(h == null) {
+    if (h == null) {
       //fail("Missing required environment variable: HADOOP_HOME");
       h = "UNDEF";
     }
     return h;
   }
 
-
-  protected boolean isLocalHadoop()
-  {
+  protected boolean isLocalHadoop() {
     boolean local;
-    if(jobConf_ == null) {
-        local = getClusterNick().equals("local");
+    if (jobConf_ == null) {
+      local = getClusterNick().equals("local");
     } else {
-        local = jobConf_.get("mapred.job.tracker", "").equals("local");
+      local = StreamUtil.isLocalJobTracker(jobConf_);
     }
     return local;
   }
-  protected String getClusterNick()
-  {
+
+  protected String getClusterNick() {
     return cluster_;
   }
 
   /** @return path to the created Jar file or null if no files are necessary.
-  */
-  protected String packageJobJar() throws IOException
-  {
+   */
+  protected String packageJobJar() throws IOException {
     ArrayList unjarFiles = new ArrayList();
 
     // Runtime code: ship same version of code as self (job submitter code)
     // usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar
-    String runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
-    if(runtimeClasses == null) {
-        throw new IOException("runtime classes not found: " + getClass().getPackage());
+
+    // First try an explicit spec: it's too hard to find our own location in this case:
+    // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
+    // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
+    String runtimeClasses = jobConf_.get("stream.shipped.hadoopstreaming"); // jar or class dir
+
+    if (runtimeClasses == null) {
+      runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
+    }
+    if (runtimeClasses == null) {
+      throw new IOException("runtime classes not found: " + getClass().getPackage());
     } else {
-        msg("Found runtime classes in: " + runtimeClasses);
+      msg("Found runtime classes in: " + runtimeClasses);
     }
-    if(isLocalHadoop()) {
+    if (isLocalHadoop()) {
       // don't package class files (they might get unpackaged in "." and then
       //  hide the intended CLASSPATH entry)
       // we still package everything else (so that scripts and executable are found in
       //  Task workdir like distributed Hadoop)
     } else {
-      if(new File(runtimeClasses).isDirectory()) {
-          packageFiles_.add(runtimeClasses);
+      if (new File(runtimeClasses).isDirectory()) {
+        packageFiles_.add(runtimeClasses);
       } else {
-          unjarFiles.add(runtimeClasses);
+        unjarFiles.add(runtimeClasses);
       }
     }
-    if(packageFiles_.size() + unjarFiles.size()==0) {
+    if (packageFiles_.size() + unjarFiles.size() == 0) {
       return null;
     }
-    File jobJar = File.createTempFile("streamjob", ".jar");
-    System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);
-    if(debug_ == 0) {
+    String tmp = jobConf_.get("stream.tmpdir"); //, "/tmp/${user.name}/"
+    File tmpDir = (tmp == null) ? null : new File(tmp);
+    // tmpDir=null means OS default tmp dir
+    File jobJar = File.createTempFile("streamjob", ".jar", tmpDir);
+    System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar
+        + " tmpDir=" + tmpDir);
+    if (debug_ == 0) {
       jobJar.deleteOnExit();
     }
     JarBuilder builder = new JarBuilder();
-    if(verbose_) {
+    if (verbose_) {
       builder.setVerbose(true);
     }
     String jobJarName = jobJar.getAbsolutePath();
@@ -400,53 +432,81 @@
     return jobJarName;
   }
 
-  protected void setJobConf() throws IOException
-  {
+  protected void setUserJobConfProps(boolean doEarlyProps) {
+    Iterator it = userJobConfProps_.iterator();
+    while (it.hasNext()) {
+      String prop = (String) it.next();
+      String[] nv = prop.split("=", 2);
+      if (doEarlyProps == nv[0].equals("fs.default.name")) {
+        msg("xxxJobConf: set(" + nv[0] + ", " + nv[1] + ") early=" + doEarlyProps);
+        jobConf_.set(nv[0], nv[1]);
+      }
+    }
+  }
+
+  protected void setJobConf() throws IOException {
     msg("hadoopAliasConf_ = " + hadoopAliasConf_);
     config_ = new Configuration();
-    if(!cluster_.equals("default")) {
-        config_.addFinalResource(new Path(getHadoopAliasConfFile()));
+    if (!cluster_.equals("default")) {
+      config_.addFinalResource(new Path(getHadoopAliasConfFile()));
     } else {
       // use only defaults: hadoop-default.xml and hadoop-site.xml
     }
     Iterator it = configPath_.iterator();
-    while(it.hasNext()) {
-        String pathName = (String)it.next();
-        config_.addFinalResource(new Path(pathName));
+    while (it.hasNext()) {
+      String pathName = (String) it.next();
+      config_.addFinalResource(new Path(pathName));
     }
+
+    testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge"));
+
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
-    for(int i=0; i<inputGlobs_.size(); i++) {
-      jobConf_.addInputPath(new Path((String)inputGlobs_.get(i)));
+
+    setUserJobConfProps(true);
+
+    // The correct FS must be set before this is called!
+    // (to resolve local vs. dfs drive letter differences) 
+    // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
+    for (int i = 0; i < inputSpecs_.size(); i++) {
+      addInputSpec((String) inputSpecs_.get(i), i);
+    }
+    jobConf_.setBoolean("stream.inputtagged", inputTagged_);
+    jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
+
+    Class fmt;
+    if (testMerge_ && false == hasSimpleInputSpecs_) {
+      // this ignores -inputreader
+      fmt = MergerInputFormat.class;
+    } else {
+      // need to keep this case to support custom -inputreader 
+      // and their parameters ,n=v,n=v
+      fmt = StreamInputFormat.class;
     }
+    jobConf_.setInputFormat(fmt);
 
-    jobConf_.setInputFormat(StreamInputFormat.class);
     // for SequenceFile, input classes may be overriden in getRecordReader
     jobConf_.setInputKeyClass(Text.class);
     jobConf_.setInputValueClass(Text.class);
 
     jobConf_.setOutputKeyClass(Text.class);
     jobConf_.setOutputValueClass(Text.class);
-    //jobConf_.setCombinerClass();
-
-    jobConf_.setOutputPath(new Path(output_));
-    jobConf_.setOutputFormat(StreamOutputFormat.class);
 
     jobConf_.set("stream.addenvironment", addTaskEnvironment_);
 
     String defaultPackage = this.getClass().getPackage().getName();
 
     Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
-    if(c != null) {
+    if (c != null) {
       jobConf_.setMapperClass(c);
     } else {
       jobConf_.setMapperClass(PipeMapper.class);
       jobConf_.set("stream.map.streamprocessor", mapCmd_);
     }
 
-    if(comCmd_ != null) {
+    if (comCmd_ != null) {
       c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
-      if(c != null) {
+      if (c != null) {
         jobConf_.setCombinerClass(c);
       } else {
         jobConf_.setCombinerClass(PipeCombiner.class);
@@ -454,9 +514,11 @@
       }
     }
 
-    if(redCmd_ != null) {
+    reducerNone_ = false;
+    if (redCmd_ != null) {
+      reducerNone_ = redCmd_.equals(REDUCE_NONE);
       c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
-      if(c != null) {
+      if (c != null) {
         jobConf_.setReducerClass(c);
       } else {
         jobConf_.setReducerClass(PipeReducer.class);
@@ -464,66 +526,165 @@
       }
     }
 
-    if(inReaderSpec_ != null) {
-        String[] args = inReaderSpec_.split(",");
-        String readerClass = args[0];
-        // this argument can only be a Java class
-        c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
-        if(c != null) {
-            jobConf_.set("stream.recordreader.class", c.getName());
-        } else {
-            fail("-inputreader: class not found: " + readerClass);
-        }
-        for(int i=1; i<args.length; i++) {
-            String[] nv = args[i].split("=", 2);
-            String k = "stream.recordreader." + nv[0];
-            String v = (nv.length>1) ? nv[1] : "";
-            jobConf_.set(k, v);
-        }
+    if (inReaderSpec_ != null) {
+      String[] args = inReaderSpec_.split(",");
+      String readerClass = args[0];
+      // this argument can only be a Java class
+      c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
+      if (c != null) {
+        jobConf_.set("stream.recordreader.class", c.getName());
+      } else {
+        fail("-inputreader: class not found: " + readerClass);
+      }
+      for (int i = 1; i < args.length; i++) {
+        String[] nv = args[i].split("=", 2);
+        String k = "stream.recordreader." + nv[0];
+        String v = (nv.length > 1) ? nv[1] : "";
+        jobConf_.set(k, v);
+      }
     }
 
-    jar_ = packageJobJar();
-    if(jar_ != null) {
-        jobConf_.setJar(jar_);
+    // output setup is done late so we can customize for reducerNone_
+    //jobConf_.setOutputDir(new File(output_));
+    setOutputSpec();
+    if (testMerge_) {
+      fmt = MuxOutputFormat.class;
+    } else {
+      fmt = StreamOutputFormat.class;
     }
+    jobConf_.setOutputFormat(fmt);
 
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
-    it = userJobConfProps_.iterator();
-    while(it.hasNext()) {
-        String prop = (String)it.next();
-        String[] nv = prop.split("=", 2);
-        msg("xxxJobConf: set(" + nv[0] + ", " + nv[1]+")");
-        jobConf_.set(nv[0], nv[1]);
+    setUserJobConfProps(false);
+
+    jar_ = packageJobJar();
+    if (jar_ != null) {
+      jobConf_.setJar(jar_);
     }
+
+    if(verbose_) {
+      listJobConfProperties();
+    }
+    
     msg("submitting to jobconf: " + getJobTrackerHostPort());
   }
 
-  protected String getJobTrackerHostPort()
+  protected void listJobConfProperties()
   {
+    msg("==== JobConf properties:");
+    Iterator it = jobConf_.entries();
+    TreeMap sorted = new TreeMap();
+    while(it.hasNext()) {
+      Map.Entry en = (Map.Entry)it.next();
+      sorted.put(en.getKey(), en.getValue());
+    }
+    it = sorted.entrySet().iterator();
+    while(it.hasNext()) {
+      Map.Entry en = (Map.Entry)it.next();
+      msg(en.getKey() + "=" + en.getValue());
+    }
+    msg("====");
+  }
+  
+  /** InputSpec-s encode: a glob pattern x additional column files x additional joins */
+  protected void addInputSpec(String inSpec, int index) {
+    if (!testMerge_) {
+      jobConf_.addInputPath(new Path(inSpec));
+    } else {
+      CompoundDirSpec spec = new CompoundDirSpec(inSpec, true);
+      msg("Parsed -input:\n" + spec.toTableString());
+      if (index == 0) {
+        hasSimpleInputSpecs_ = (spec.paths_.length == 0);
+        msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_);
+      }
+      String primary = spec.primarySpec();
+      if (!seenPrimary_.add(primary)) {
+        // this won't detect glob overlaps and noncanonical path variations
+        fail("Primary used in multiple -input spec: " + primary);
+      }
+      jobConf_.addInputPath(new Path(primary));
+      // during Job execution, will reparse into a CompoundDirSpec 
+      jobConf_.set("stream.inputspecs." + index, inSpec);
+    }
+  }
+
+  /** uses output_ and mapsideoutURI_ */
+  protected void setOutputSpec() throws IOException {
+    CompoundDirSpec spec = new CompoundDirSpec(output_, false);
+    msg("Parsed -output:\n" + spec.toTableString());
+    String primary = spec.primarySpec();
+    String channel0;
+    // TODO simplify cases, encapsulate in a StreamJobConf
+    if (!reducerNone_) {
+      channel0 = primary;
+    } else {
+      if (mapsideoutURI_ != null) {
+        // user can override in case this is in a difft filesystem..
+        try {
+          URI uri = new URI(mapsideoutURI_);
+          if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs")
+            if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) {
+              fail("Must be absolute: " + mapsideoutURI_);
+            }
+          } else if (uri.getScheme().equals("socket")) {
+            // ok
+          } else {
+            fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_);
+          }
+        } catch (URISyntaxException e) {
+          throw (IOException) new IOException().initCause(e);
+        }
+      } else {
+        mapsideoutURI_ = primary;
+      }
+      // an empty reduce output named "part-00002" will go here and not collide.
+      channel0 = primary + ".NONE";
+      // the side-effect of the first split of an input named "part-00002" 
+      // will go in this directory
+      jobConf_.set("stream.sideoutput.dir", primary);
+      // oops if user overrides low-level this isn't set yet :-(
+      boolean localjt = StreamUtil.isLocalJobTracker(jobConf_);
+      // just a guess user may prefer remote..
+      jobConf_.setBoolean("stream.sideoutput.localfs", localjt);
+    }
+    // a path in fs.name.default filesystem
+    System.out.println(channel0);
+    System.out.println(new Path(channel0));
+    jobConf_.setOutputPath(new Path(channel0));
+    // will reparse remotely
+    jobConf_.set("stream.outputspec", output_);
+    if (null != mapsideoutURI_) {
+      // a path in "jobtracker's filesystem"
+      // overrides sideoutput.dir
+      jobConf_.set("stream.sideoutput.uri", mapsideoutURI_);
+    }
+  }
+
+  protected String getJobTrackerHostPort() {
     return jobConf_.get("mapred.job.tracker");
   }
 
-  protected void jobInfo()
-  {
-    if(isLocalHadoop()) {
+  protected void jobInfo() {
+    if (isLocalHadoop()) {
       LOG.info("Job running in-process (local Hadoop)");
     } else {
       String hp = getJobTrackerHostPort();
       LOG.info("To kill this job, run:");
-      LOG.info(getHadoopClientHome() + "/bin/hadoop job  -Dmapred.job.tracker=" + hp + " -kill " + jobId_);
+      LOG.info(getHadoopClientHome() + "/bin/hadoop job  -Dmapred.job.tracker=" + hp + " -kill "
+          + jobId_);
       //LOG.info("Job file: " + running_.getJobFile() );
-      LOG.info("Tracking URL: "  + StreamUtil.qualifyHost(running_.getTrackingURL()));
+      LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
     }
   }
 
   // Based on JobClient
   public void submitAndMonitorJob() throws IOException {
 
-    if(jar_ != null && isLocalHadoop()) {
-        // getAbs became required when shell and subvm have different working dirs...
-        File wd = new File(".").getAbsoluteFile();
-        StreamUtil.unJar(new File(jar_), wd);
+    if (jar_ != null && isLocalHadoop()) {
+      // getAbs became required when shell and subvm have different working dirs...
+      File wd = new File(".").getAbsoluteFile();
+      StreamUtil.unJar(new File(jar_), wd);
     }
 
     // if jobConf_ changes must recreate a JobClient
@@ -542,11 +703,12 @@
       while (!running_.isComplete()) {
         try {
           Thread.sleep(1000);
-        } catch (InterruptedException e) {}
+        } catch (InterruptedException e) {
+        }
         running_ = jc_.getJob(jobId_);
         String report = null;
-        report = " map "+Math.round(running_.mapProgress()*100)
-        +"%  reduce " + Math.round(running_.reduceProgress()*100)+"%";
+        report = " map " + Math.round(running_.mapProgress() * 100) + "%  reduce "
+            + Math.round(running_.reduceProgress() * 100) + "%";
 
         if (!report.equals(lastReport)) {
           LOG.info(report);
@@ -569,7 +731,6 @@
     }
   }
 
-
   protected boolean mayExit_;
   protected String[] argv_;
   protected boolean verbose_;
@@ -585,11 +746,15 @@
   protected JobClient jc_;
 
   // command-line arguments
-  protected ArrayList inputGlobs_       = new ArrayList(); // <String>
-  protected ArrayList packageFiles_     = new ArrayList(); // <String>
-  protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>
-  protected ArrayList userJobConfProps_ = new ArrayList(); // <String>
+  protected ArrayList inputSpecs_ = new ArrayList(); // <String>
+  protected boolean inputTagged_ = false;
+  protected TreeSet seenPrimary_ = new TreeSet(); // <String>
+  protected boolean hasSimpleInputSpecs_;
+  protected ArrayList packageFiles_ = new ArrayList(); // <String>
+  protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
+  protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
   protected String output_;
+  protected String mapsideoutURI_;
   protected String mapCmd_;
   protected String comCmd_;
   protected String redCmd_;
@@ -598,6 +763,7 @@
   protected String hadoopAliasConf_;
   protected String inReaderSpec_;
 
+  protected boolean testMerge_;
 
   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
   // encoding "a=b c=d"
@@ -609,6 +775,4 @@
   protected RunningJob running_;
   protected String jobId_;
 
-
 }
-

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Mon Sep 18 16:08:06 2006
@@ -18,7 +18,8 @@
 
 import java.io.*;
 import java.nio.charset.MalformedInputException;
-import java.util.zip.GZIPInputStream; 
+import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
@@ -35,25 +36,21 @@
  * but delimits key and value with a TAB.
  * @author Michel Tourn
  */
-public class StreamLineRecordReader extends StreamBaseRecordReader 
-{
+public class StreamLineRecordReader extends StreamBaseRecordReader {
 
-  public StreamLineRecordReader(
-    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
-    throws IOException
-  {
+  public StreamLineRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+      JobConf job, FileSystem fs) throws IOException {
     super(in, split, reporter, job, fs);
     gzipped_ = StreamInputFormat.isGzippedInput(job);
-    if(gzipped_) {
+    if (gzipped_) {
       din_ = new DataInputStream(new GZIPInputStream(in_));
     } else {
       din_ = in_;
     }
   }
 
-  public void seekNextRecordBoundary() throws IOException
-  {
-    if(gzipped_) {
+  public void seekNextRecordBoundary() throws IOException {
+    if (gzipped_) {
       // no skipping: use din_ as-is 
       // assumes splitter created only one split per file
       return;
@@ -63,7 +60,7 @@
         in_.seek(start_ - 1);
         // scan to the next newline in the file
         while (in_.getPos() < end_) {
-          char c = (char)in_.read();
+          char c = (char) in_.read();
           bytesSkipped++;
           if (c == '\r' || c == '\n') {
             break;
@@ -75,51 +72,54 @@
     }
   }
 
-  public synchronized boolean next(Writable key, Writable value)
-    throws IOException {  
-    if(!(key instanceof Text)) {
-        throw new IllegalArgumentException(
-                "Key should be of type Text but: "+key.getClass().getName());
+  public synchronized boolean next(Writable key, Writable value) throws IOException {
+    if (!(key instanceof Text)) {
+      throw new IllegalArgumentException("Key should be of type Text but: "
+          + key.getClass().getName());
     }
-    if(!(value instanceof Text)) {
-        throw new IllegalArgumentException(
-                "Value should be of type Text but: "+value.getClass().getName());
+    if (!(value instanceof Text)) {
+      throw new IllegalArgumentException("Value should be of type Text but: "
+          + value.getClass().getName());
     }
 
-    Text tKey = (Text)key;
-    Text tValue = (Text)value;
-    byte [] line;
-    
+    Text tKey = (Text) key;
+    Text tValue = (Text) value;
+    byte[] line;
+
     while (true) {
-        if(gzipped_) {
-            // figure EOS from readLine
+      if (gzipped_) {
+        // figure EOS from readLine
+      } else {
+        long pos = in_.getPos();
+        if (pos >= end_) return false;
+      }
+
+      line = UTF8ByteArrayUtils.readLine((InputStream) in_);
+      try {
+        Text.validateUTF8(line);
+      } catch (MalformedInputException m) {
+        System.err.println("line=" + line + "|" + new Text(line));
+        System.out.flush();
+      }
+      if (line == null) return false;
+      try {
+        int tab = UTF8ByteArrayUtils.findTab(line);
+        if (tab == -1) {
+          tKey.set(line);
+          tValue.set("");
         } else {
-            long pos = in_.getPos();
-            if (pos >= end_)
-                return false;
-        }
-        
-        line = UTF8ByteArrayUtils.readLine((InputStream)in_);
-        if(line==null)
-            return false;
-        try {
-            int tab=UTF8ByteArrayUtils.findTab(line);
-            if(tab == -1) {
-                tKey.set(line);
-                tValue.set("");
-            } else {
-                UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
-            }
-            break;
-        } catch (MalformedInputException e) {
-            LOG.warn(e);
-            StringUtils.stringifyException(e);
+          UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
         }
+        break;
+      } catch (MalformedInputException e) {
+        LOG.warn(StringUtils.stringifyException(e));
+      }
     }
-    numRecStats( line, 0, line.length );
+    numRecStats(line, 0, line.length);
     return true;
   }
+
   boolean gzipped_;
   GZIPInputStream zin_;
-  DataInputStream din_; // GZIP or plain
+  DataInputStream din_; // GZIP or plain  
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Mon Sep 18 16:08:06 2006
@@ -35,28 +35,27 @@
  */
 public class StreamOutputFormat implements OutputFormat {
 
-  public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
-                                      String name, Progressable progr) throws IOException {
- 
+  public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progr) throws IOException {
+
     File file = new File(job.getOutputDir(), name);
 
     final FSDataOutputStream out = fs.create(file);
 
     return new RecordWriter() {
-        public synchronized void write(WritableComparable key, Writable value)
-          throws IOException {
-          out.write(key.toString().getBytes("UTF-8"));
-          out.writeByte('\t');
-          out.write(value.toString().getBytes("UTF-8"));
-          out.writeByte('\n');
-        }
-        public synchronized void close(Reporter reporter) throws IOException {
-          out.close();
-        }
-      };
+
+      public synchronized void write(WritableComparable key, Writable value) throws IOException {
+        out.write(key.toString().getBytes("UTF-8"));
+        out.writeByte('\t');
+        out.write(value.toString().getBytes("UTF-8"));
+        out.writeByte('\n');
+      }
+
+      public synchronized void close(Reporter reporter) throws IOException {
+        out.close();
+      }
+    };
   }
-  
-  
+
   /** Check whether the output specification for a job is appropriate.  Called
    * when a job is submitted.  Typically checks that it does not already exist,
    * throwing an exception when it already exists, so that output is not
@@ -65,9 +64,8 @@
    * @param job the job whose output will be written
    * @throws IOException when output should not be attempted
    */
-  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
-  {
+  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
     // allow existing data (for app-level restartability)
   }
-  
+
 }

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Mon Sep 18 16:08:06 2006
@@ -29,22 +29,16 @@
 
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class StreamSequenceRecordReader extends StreamBaseRecordReader
-{
+public class StreamSequenceRecordReader extends StreamBaseRecordReader {
 
-  public StreamSequenceRecordReader (
-    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
-    throws IOException
-  {
+  public StreamSequenceRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+      JobConf job, FileSystem fs) throws IOException {
     super(in, split, reporter, job, fs);
     numFailed_ = 0;
     // super.in_ ignored, using rin_ instead
   }
 
-
-  public synchronized boolean next(Writable key, Writable value)
-   throws IOException
-  {
+  public synchronized boolean next(Writable key, Writable value) throws IOException {
     boolean success;
     do {
       if (!more_) return false;
@@ -58,29 +52,26 @@
           more_ = eof;
         }
         success = true;
-      } catch(IOException io) {
+      } catch (IOException io) {
         numFailed_++;
-        if(numFailed_ < 100 || numFailed_ % 100 == 0) {
-          err_.println("StreamSequenceRecordReader: numFailed_/numRec_="
-            + numFailed_+ "/" + numRec_);
+        if (numFailed_ < 100 || numFailed_ % 100 == 0) {
+          err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" + numFailed_ + "/"
+              + numRec_);
         }
         io.printStackTrace(err_);
         success = false;
       }
-    } while(!success);
-    
+    } while (!success);
+
     numRecStats(new byte[0], 0, 0);
     return more_;
   }
 
-
-  public void seekNextRecordBoundary() throws IOException
-  {
+  public void seekNextRecordBoundary() throws IOException {
     rin_ = new SequenceFile.Reader(fs_, split_.getPath(), job_);
     end_ = split_.getStart() + split_.getLength();
 
-    if (split_.getStart() > rin_.getPosition())
-      rin_.sync(split_.getStart());                  // sync to start
+    if (split_.getStart() > rin_.getPosition()) rin_.sync(split_.getStart()); // sync to start
 
     more_ = rin_.getPosition() < end_;
 
@@ -90,14 +81,13 @@
   }
 
   public WritableComparable createKey() {
-    return (WritableComparable) 
-           ReflectionUtils.newInstance(rin_.getKeyClass(), null);
+    return (WritableComparable) ReflectionUtils.newInstance(rin_.getKeyClass(), null);
   }
-  
+
   public Writable createValue() {
     return (Writable) ReflectionUtils.newInstance(rin_.getValueClass(), null);
   }
-  
+
   boolean more_;
   SequenceFile.Reader rin_;
   int numFailed_;

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Mon Sep 18 16:08:06 2006
@@ -19,69 +19,81 @@
 import java.text.DecimalFormat;
 import java.io.*;
 import java.net.*;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
 import java.util.jar.*;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 /** Utilities not available elsewhere in Hadoop.
  *  
  */
-public class StreamUtil
-{
+public class StreamUtil {
 
-  public static Class goodClassOrNull(String className, String defaultPackage)
-  {
-    if(className.indexOf('.') == -1 && defaultPackage != null) {
-        className = defaultPackage + "." + className;
+  /** It may seem strange to silently switch behaviour when a String
+   * is not a classname; the reason is simplified Usage:<pre>
+   * -mapper [classname | program ]
+   * instead of the explicit Usage:
+   * [-mapper program | -javamapper classname], -mapper and -javamapper are mutually exclusive.
+   * (repeat for -reducer, -combiner) </pre>
+   */
+  public static Class goodClassOrNull(String className, String defaultPackage) {
+    if (className.indexOf('.') == -1 && defaultPackage != null) {
+      className = defaultPackage + "." + className;
     }
     Class clazz = null;
     try {
-        clazz = Class.forName(className);
-    } catch(ClassNotFoundException cnf) {
-    } catch(LinkageError cnf) {
+      clazz = Class.forName(className);
+    } catch (ClassNotFoundException cnf) {
+    } catch (LinkageError cnf) {
     }
     return clazz;
   }
-  
-   /** @return a jar file path or a base directory or null if not found.
+
+  public static String findInClasspath(String className) {
+    return findInClasspath(className, StreamUtil.class.getClassLoader());
+  }
+
+  /** @return a jar file path or a base directory or null if not found.
    */
-   public static String findInClasspath(String className) 
-   {
+  public static String findInClasspath(String className, ClassLoader loader) {
 
     String relPath = className;
-    if (!relPath.startsWith("/")) {
-      relPath = "/" + relPath;
-    }
     relPath = relPath.replace('.', '/');
     relPath += ".class";
-
-    java.net.URL classUrl = StreamUtil.class.getResource(relPath);
+    java.net.URL classUrl = loader.getResource(relPath);
 
     String codePath;
     if (classUrl != null) {
-        boolean inJar = classUrl.getProtocol().equals("jar");
-        codePath = classUrl.toString();
-        if(codePath.startsWith("jar:")) {
-            codePath = codePath.substring("jar:".length());
-        }
-        if(codePath.startsWith("file:")) { // can have both
-            codePath = codePath.substring("file:".length());
-        }
-        if(inJar) {          
-          // A jar spec: remove class suffix in /path/my.jar!/package/Class
-          int bang = codePath.lastIndexOf('!');
-          codePath = codePath.substring(0, bang);
-        } else {
-          // A class spec: remove the /my/package/Class.class portion
-          int pos = codePath.lastIndexOf(relPath);
-          if(pos == -1) {
-            throw new IllegalArgumentException(
-              "invalid codePath: className=" + className + " codePath=" + codePath);
-          }
-          codePath = codePath.substring(0, pos);
+      boolean inJar = classUrl.getProtocol().equals("jar");
+      codePath = classUrl.toString();
+      if (codePath.startsWith("jar:")) {
+        codePath = codePath.substring("jar:".length());
+      }
+      if (codePath.startsWith("file:")) { // can have both
+        codePath = codePath.substring("file:".length());
+      }
+      if (inJar) {
+        // A jar spec: remove class suffix in /path/my.jar!/package/Class
+        int bang = codePath.lastIndexOf('!');
+        codePath = codePath.substring(0, bang);
+      } else {
+        // A class spec: remove the /my/package/Class.class portion
+        int pos = codePath.lastIndexOf(relPath);
+        if (pos == -1) {
+          throw new IllegalArgumentException("invalid codePath: className=" + className
+              + " codePath=" + codePath);
         }
+        codePath = codePath.substring(0, pos);
+      }
     } else {
-        codePath = null;
+      codePath = null;
     }
     return codePath;
   }
@@ -92,7 +104,7 @@
     try {
       Enumeration entries = jar.entries();
       while (entries.hasMoreElements()) {
-        JarEntry entry = (JarEntry)entries.nextElement();
+        JarEntry entry = (JarEntry) entries.nextElement();
         if (!entry.isDirectory()) {
           InputStream in = jar.getInputStream(entry);
           try {
@@ -117,9 +129,7 @@
       jar.close();
     }
   }
-  
 
-  
   final static long KB = 1024L * 1;
   final static long MB = 1024L * KB;
   final static long GB = 1024L * MB;
@@ -128,64 +138,61 @@
 
   static DecimalFormat dfm = new DecimalFormat("####.000");
   static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
-  
-  public static String dfmt(double d)
-  {
+
+  public static String dfmt(double d) {
     return dfm.format(d);
   }
-  public static String ifmt(double d)
-  {
+
+  public static String ifmt(double d) {
     return ifm.format(d);
   }
-  
-  public static String formatBytes(long numBytes)
-  {
+
+  public static String formatBytes(long numBytes) {
     StringBuffer buf = new StringBuffer();
-    boolean bDetails = true;    
+    boolean bDetails = true;
     double num = numBytes;
-    
-    if(numBytes < KB) {
+
+    if (numBytes < KB) {
       buf.append(numBytes + " B");
       bDetails = false;
-    } else if(numBytes < MB) {
-      buf.append(dfmt(num/KB) + " KB");
-    } else if(numBytes < GB) {
-      buf.append(dfmt(num/MB) + " MB");
-    } else if(numBytes < TB) {
-      buf.append(dfmt(num/GB) + " GB");
-    } else if(numBytes < PB) {
-      buf.append(dfmt(num/TB) + " TB");
+    } else if (numBytes < MB) {
+      buf.append(dfmt(num / KB) + " KB");
+    } else if (numBytes < GB) {
+      buf.append(dfmt(num / MB) + " MB");
+    } else if (numBytes < TB) {
+      buf.append(dfmt(num / GB) + " GB");
+    } else if (numBytes < PB) {
+      buf.append(dfmt(num / TB) + " TB");
     } else {
-      buf.append(dfmt(num/PB) + " PB");
+      buf.append(dfmt(num / PB) + " PB");
     }
-    if(bDetails) {
+    if (bDetails) {
       buf.append(" (" + ifmt(numBytes) + " bytes)");
     }
     return buf.toString();
   }
 
-  public static String formatBytes2(long numBytes)
-  {
+  public static String formatBytes2(long numBytes) {
     StringBuffer buf = new StringBuffer();
     long u = 0;
-    if(numBytes >= TB) {
-      u = numBytes/TB;
-      numBytes -= u*TB;
+    if (numBytes >= TB) {
+      u = numBytes / TB;
+      numBytes -= u * TB;
       buf.append(u + " TB ");
     }
-    if(numBytes >= GB) {
-      u = numBytes/GB;
-      numBytes -= u*GB;
+    if (numBytes >= GB) {
+      u = numBytes / GB;
+      numBytes -= u * GB;
       buf.append(u + " GB ");
     }
-    if(numBytes >= MB) {
-      u = numBytes/MB;
-      numBytes -= u*MB;
+    if (numBytes >= MB) {
+      u = numBytes / MB;
+      numBytes -= u * MB;
       buf.append(u + " MB ");
     }
-    if(numBytes >= KB) {
-      u = numBytes/KB;
-      numBytes -= u*KB;
+    if (numBytes >= KB) {
+      u = numBytes / KB;
+      numBytes -= u * KB;
       buf.append(u + " KB ");
     }
     buf.append(u + " B"); //even if zero
@@ -194,125 +201,295 @@
 
   static Environment env;
   static String HOST;
-  
+
   static {
     try {
       env = new Environment();
       HOST = env.getHost();
-    } catch(IOException io) {
+    } catch (IOException io) {
       io.printStackTrace();
     }
   }
 
-  static class StreamConsumer extends Thread
-  {
-    StreamConsumer(InputStream in, OutputStream out)
-    {
-      this.bin = new LineNumberReader(
-        new BufferedReader(new InputStreamReader(in)));
-      if(out != null) {
+  static class StreamConsumer extends Thread {
+
+    StreamConsumer(InputStream in, OutputStream out) {
+      this.bin = new LineNumberReader(new BufferedReader(new InputStreamReader(in)));
+      if (out != null) {
         this.bout = new DataOutputStream(out);
       }
     }
-    public void run()
-    {
+
+    public void run() {
       try {
         String line;
-        while((line=bin.readLine()) != null) {
-          if(bout != null) {
+        while ((line = bin.readLine()) != null) {
+          if (bout != null) {
             bout.writeUTF(line); //writeChars
             bout.writeChar('\n');
           }
         }
         bout.flush();
-      } catch(IOException io) {        
+      } catch (IOException io) {
       }
     }
+
     LineNumberReader bin;
     DataOutputStream bout;
   }
 
-  static void exec(String arg, PrintStream log)
-  {
-    exec( new String[] {arg}, log );
+  static void exec(String arg, PrintStream log) {
+    exec(new String[] { arg }, log);
   }
-  
-  static void exec(String[] args, PrintStream log)
-  {
-      try {
-        log.println("Exec: start: " + Arrays.asList(args));
-        Process proc = Runtime.getRuntime().exec(args);
-        new StreamConsumer(proc.getErrorStream(), log).start();
-        new StreamConsumer(proc.getInputStream(), log).start();
-        int status = proc.waitFor();
-        //if status != 0
-        log.println("Exec: status=" + status + ": " + Arrays.asList(args));
-      } catch(InterruptedException in) {
-        in.printStackTrace();
-      } catch(IOException io) {
-        io.printStackTrace();
-      }
+
+  static void exec(String[] args, PrintStream log) {
+    try {
+      log.println("Exec: start: " + Arrays.asList(args));
+      Process proc = Runtime.getRuntime().exec(args);
+      new StreamConsumer(proc.getErrorStream(), log).start();
+      new StreamConsumer(proc.getInputStream(), log).start();
+      int status = proc.waitFor();
+      //if status != 0
+      log.println("Exec: status=" + status + ": " + Arrays.asList(args));
+    } catch (InterruptedException in) {
+      in.printStackTrace();
+    } catch (IOException io) {
+      io.printStackTrace();
+    }
   }
-  
-  static String qualifyHost(String url)
-  {
+
+  static String qualifyHost(String url) {
     try {
-        return qualifyHost(new URL(url)).toString();
-    } catch(IOException io) {
-        return url;
+      return qualifyHost(new URL(url)).toString();
+    } catch (IOException io) {
+      return url;
     }
   }
-  
-  static URL qualifyHost(URL url)
-  {    
+
+  static URL qualifyHost(URL url) {
     try {
       InetAddress a = InetAddress.getByName(url.getHost());
       String qualHost = a.getCanonicalHostName();
       URL q = new URL(url.getProtocol(), qualHost, url.getPort(), url.getFile());
       return q;
-    } catch(IOException io) {
+    } catch (IOException io) {
       return url;
     }
   }
-  
+
   static final String regexpSpecials = "[]()?*+|.!^-\\~@";
-  
-  public static String regexpEscape(String plain)
-  {
+
+  public static String regexpEscape(String plain) {
     StringBuffer buf = new StringBuffer();
     char[] ch = plain.toCharArray();
     int csup = ch.length;
-    for(int c=0; c<csup; c++) {
-      if(regexpSpecials.indexOf(ch[c]) != -1) {
-        buf.append("\\");    
+    for (int c = 0; c < csup; c++) {
+      if (regexpSpecials.indexOf(ch[c]) != -1) {
+        buf.append("\\");
       }
       buf.append(ch[c]);
     }
     return buf.toString();
   }
-  
-  static String slurp(File f) throws IOException
-  {
+
+  public static String safeGetCanonicalPath(File f) {
+    try {
+      String s = f.getCanonicalPath();
+      return (s == null) ? f.toString() : s;
+    } catch (IOException io) {
+      return f.toString();
+    }
+  }
+
+  static String slurp(File f) throws IOException {
+    int len = (int) f.length();
+    byte[] buf = new byte[len];
     FileInputStream in = new FileInputStream(f);
-    int len = (int)f.length();
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+  static String slurpHadoop(Path p, FileSystem fs) throws IOException {
+    int len = (int) fs.getLength(p);
     byte[] buf = new byte[len];
-    in.read(buf, 0, len);
-    return new String(buf);
+    InputStream in = fs.open(p);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
   }
-  
+
+  public static String rjustify(String s, int width) {
+    if (s == null) s = "null";
+    if (width > s.length()) {
+      s = getSpace(width - s.length()) + s;
+    }
+    return s;
+  }
+
+  public static String ljustify(String s, int width) {
+    if (s == null) s = "null";
+    if (width > s.length()) {
+      s = s + getSpace(width - s.length());
+    }
+    return s;
+  }
+
+  static char[] space;
+  static {
+    space = new char[300];
+    Arrays.fill(space, '\u0020');
+  }
+
+  public static String getSpace(int len) {
+    if (len > space.length) {
+      space = new char[Math.max(len, 2 * space.length)];
+      Arrays.fill(space, '\u0020');
+    }
+    return new String(space, 0, len);
+  }
+
   static private Environment env_;
-  
-  static Environment env()
-  {
-    if(env_ != null) {
+
+  static Environment env() {
+    if (env_ != null) {
       return env_;
     }
     try {
       env_ = new Environment();
-    } catch(IOException io) {      
+    } catch (IOException io) {
       io.printStackTrace();
     }
     return env_;
   }
+
+  public static String makeJavaCommand(Class main, String[] argv) {
+    ArrayList vargs = new ArrayList();
+    File javaHomeBin = new File(System.getProperty("java.home"), "bin");
+    File jvm = new File(javaHomeBin, "java");
+    vargs.add(jvm.toString());
+    // copy parent classpath
+    vargs.add("-classpath");
+    vargs.add("\"" + System.getProperty("java.class.path") + "\"");
+
+    // Add main class and its arguments
+    vargs.add(main.getName());
+    for (int i = 0; i < argv.length; i++) {
+      vargs.add(argv[i]);
+    }
+    return collate(vargs, " ");
+  }
+
+  public static String collate(Object[] args, String sep) {
+    return collate(Arrays.asList(args), sep);
+  }
+
+  public static String collate(List args, String sep) {
+    StringBuffer buf = new StringBuffer();
+    Iterator it = args.iterator();
+    while (it.hasNext()) {
+      if (buf.length() > 0) {
+        buf.append(" ");
+      }
+      buf.append(it.next());
+    }
+    return buf.toString();
+  }
+
+  // JobConf helpers
+
+  public static FileSplit getCurrentSplit(JobConf job) {
+    String path = job.get("map.input.file");
+    if (path == null) {
+      return null;
+    }
+    Path p = new Path(path);
+    long start = Long.parseLong(job.get("map.input.start"));
+    long length = Long.parseLong(job.get("map.input.length"));
+    return new FileSplit(p, start, length);
+  }
+
+  static class TaskId {
+
+    boolean mapTask;
+    String jobid;
+    int taskid;
+    int execid;
+  }
+
+  public static boolean isLocalJobTracker(JobConf job) {
+    return job.get("mapred.job.tracker", "local").equals("local");
+  }
+
+  public static TaskId getTaskInfo(JobConf job) {
+    TaskId res = new TaskId();
+
+    String id = job.get("mapred.task.id");
+    if (isLocalJobTracker(job)) {
+      // it uses difft naming 
+      res.mapTask = job.getBoolean("mapred.task.is.map", true);
+      res.jobid = "0";
+      res.taskid = 0;
+      res.execid = 0;
+    } else {
+      String[] e = id.split("_");
+      res.mapTask = e[2].equals("m");
+      res.jobid = e[1];
+      res.taskid = Integer.parseInt(e[3]);
+      res.execid = Integer.parseInt(e[4]);
+    }
+    return res;
+  }
+
+  static boolean getUseMapSideEffect(JobConf job) {
+    String reduce = job.get("stream.reduce.streamprocessor");
+    return StreamJob.REDUCE_NONE.equals(reduce);
+  }
+
+  public static void touch(File file) throws IOException {
+    file = file.getAbsoluteFile();
+    FileOutputStream out = new FileOutputStream(file);
+    out.close();
+    if (!file.exists()) {
+      throw new IOException("touch failed: " + file);
+    }
+  }
+
+  public static boolean isCygwin() {
+    String OS = System.getProperty("os.name");
+    return (OS.indexOf("Windows") > -1);
+  }
+
+  public static String localizeBin(String path) {
+    if (isCygwin()) {
+      path = "C:/cygwin/" + path;
+    }
+    return path;
+  }
   
+  /** @param name foo where &lt;junit>&lt;sysproperty key="foo" value="${foo}"/> 
+   * If foo is undefined then Ant sets the unevaluated value. 
+   * Take this into account when setting defaultVal. */
+  public static String getBoundAntProperty(String name, String defaultVal)
+  {
+    String val = System.getProperty(name);
+    if(val != null && val.indexOf("${") >= 0) {
+      val = null;
+    }
+    if(val == null) {
+      val = defaultVal;
+    }
+    return val;
+  }
+
 }