You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/14 09:55:16 UTC

svn commit: r421829 [1/2] - in /lucene/hadoop/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Author: cutting
Date: Fri Jul 14 00:55:15 2006
New Revision: 421829

URL: http://svn.apache.org/viewvc?rev=421829&view=rev
Log:
HADOOP-361.  Remove unix dependencies from streaming contrib module tests.  Contributed by Michel.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jul 14 00:55:15 2006
@@ -31,6 +31,9 @@
     protocol.  This lets one, e.g., more easily copy log files into
     DFS.  (Arun C Murthy via cutting)
 
+ 9. HADOOP-361.  Remove unix dependencies from streaming contrib
+    module tests, making them pure java. (Michel Tourn via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java?rev=421829&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java Fri Jul 14 00:55:15 2006
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/** A generic Combiner bridge.<br>
+ *  To use a Combiner specify -combiner myprogram in hadoopStreaming.
+ *  It delegates operations to an external program via stdin and stdout.
+ *  In one run of the external program, you can expect all records with
+ *  the same key to appear together.
+ *  You should not make assumptions about how many times the combiner is
+ *  run on your data.
+ *  Ideally the combiner and the reducer are the same program, the combiner
+ *  partially aggregates the data zero or more times and the reducer
+ *  applies the last aggregation pass.
+ *  Do not use a Combiner if your reduce logic does not suport
+ *  such a multipass aggregation.
+ *  @author Michel Tourn
+ */
+public class PipeCombiner extends PipeReducer
+{
+
+  String getPipeCommand(JobConf job)
+  {
+    return job.get("stream.combine.streamprocessor");
+  }
+
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Jul 14 00:55:15 2006
@@ -1,566 +1,566 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-import java.nio.channels.*;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Properties;
-import java.util.regex.*;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-/** Shared functionality for PipeMapper, PipeReducer.
- *  @author Michel Tourn
- */
-public abstract class PipeMapRed {
-
-  protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());  
-  
-  /** The command to be spawned as a subprocess.
-   * Mapper/Reducer operations will delegate to it
-   */
-  abstract String getPipeCommand(JobConf job);
-  /*
-  */
-  abstract String getKeyColPropName();
-  
-  /** Write output as side-effect files rather than as map outputs. 
-      This is useful to do "Map" tasks rather than "MapReduce" tasks. */
-  boolean getUseSideEffect() 
-  {
-    return false;
-  }
-  
-  /**
-   * @returns how many TABS before the end of the key part 
-   * usually: 1 or "ALL"
-   * used for tool output of both Map and Reduce
-   * configured via tool's argv: splitKeyVal=ALL or 1..
-   * although it is interpreted here, not by tool
-   */
-  int getKeyColsFromPipeCommand(String cmd)
-  {          
-    String key = getKeyColPropName();
-    Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
-    Matcher match = kcPat.matcher(cmd);
-    String kc;
-    if(!match.matches()) {
-      kc = null;
-    } else {
-      kc = match.group(1);
-    }
-
-    int cols;
-    if(kc== null) {
-      // default value is 1 and the Stream applications could instead 
-      // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
-      cols = 1;
-    } else if(kc.equals("ALL")) {
-      cols = ALL_COLS;
-    } else {
-      try {
-        cols = Integer.parseInt(kc);    
-      } catch(NumberFormatException nf) {
-        cols = Integer.MAX_VALUE;
-      }
-    }
-
-    System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
-    
-    return cols;
-  }
-  
-  final static int OUTSIDE = 1;
-  final static int SINGLEQ = 2;
-  final static int DOUBLEQ = 3;
-  
-  static String[] splitArgs(String args)
-  {
-    ArrayList argList = new ArrayList();
-    char[] ch = args.toCharArray();
-    int clen = ch.length;
-    int state = OUTSIDE;
-    int argstart = 0;
-    for(int c=0; c<=clen; c++) {
-        boolean last = (c==clen);
-        int lastState = state;
-        boolean endToken = false;
-        if(!last) {
-          if(ch[c]=='\'') {
-            if(state == OUTSIDE) {
-              state = SINGLEQ;
-            } else if(state == SINGLEQ) {
-              state = OUTSIDE;  
-            }
-            endToken = (state != lastState);
-          } else if(ch[c]=='"') {
-            if(state == OUTSIDE) {
-              state = DOUBLEQ;
-            } else if(state == DOUBLEQ) {
-              state = OUTSIDE;  
-            }          
-            endToken = (state != lastState);
-          } else if(ch[c]==' ') {
-            if(state == OUTSIDE) {
-              endToken = true;
-            }            
-          }
-        }
-        if(last || endToken) {
-          if(c == argstart) {
-            // unquoted space
-          } else {
-            String a;
-            a = args.substring(argstart, c); 
-            argList.add(a);
-          }
-          argstart = c+1;
-          lastState = state;
-        }
-    }
-    return (String[])argList.toArray(new String[0]);
-  }
-
-  public void configure(JobConf job)
-  {
-
-    try {
-      String argv = getPipeCommand(job);
-      keyCols_ = getKeyColsFromPipeCommand(argv);
-      
-      job_ = job;      
-      
-      // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
-      doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
-      if(!doPipe_) return;
-
-      setStreamJobDetails(job);
-      setStreamProperties();
-            
-      String[] argvSplit = splitArgs(argv);
-      String prog = argvSplit[0];
-      String userdir = System.getProperty("user.dir");
-      if(new File(prog).isAbsolute()) {
-        // we don't own it. Hope it is executable
-      } else {
-        new MustangFile(prog).setExecutable(true, true);
-      }
-      
-      
-      if(job_.getInputValueClass().equals(BytesWritable.class)) {
-        // TODO expose as separate config: 
-        // job or semistandard inputformat property
-        optUseKey_ = false;
-      }
-      
-      optSideEffect_ = getUseSideEffect();
-      
-      if(optSideEffect_) {
-        String fileName = job_.get("mapred.task.id");
-        sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
-        FileSystem fs = FileSystem.get(job_);
-        sideEffectOut_ = fs.create(sideEffectPath_);
-      }
-      
-      // argvSplit[0]: 
-      // An absolute path should be a preexisting valid path on all TaskTrackers
-	  // A  relative path should match in the unjarred Job data
-      // In this case, force an absolute path to make sure exec finds it.
-      argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
-      logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
-      logprintln("sideEffectPath_=" + sideEffectPath_);      
-      
-      Environment childEnv = (Environment)StreamUtil.env().clone();
-      addEnvironment(childEnv, job_.get("stream.addenvironment"));
-      sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
-      
-      /* // This way required jdk1.5
-      ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
-      Map<String, String> env = processBuilder.environment();
-      addEnvironment(env, job_.get("stream.addenvironment"));
-      sim = processBuilder.start();
-      */
-      
-      clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
-      clientIn_  = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
-      clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
-      doneLock_  = new Object();
-      startTime_ = System.currentTimeMillis();
-      
-    } catch(Exception e) {
-        e.printStackTrace();
-        e.printStackTrace(log_);
-    } 
-  }
-  
-  void setStreamJobDetails(JobConf job)
-  {
-    jobLog_ = job.get("stream.jobLog_");
-    String s = job.get("stream.minRecWrittenToEnableSkip_");
-    if(s != null) {
-      minRecWrittenToEnableSkip_ = Long.parseLong(s);
-      logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
-    }
-  }
-  
-  void setStreamProperties()
-  {
-    taskid_ = System.getProperty("stream.taskid");
-    if(taskid_ == null) {
-      taskid_ = "noid" + System.currentTimeMillis();
-    }
-    String s = System.getProperty("stream.port");
-    if(s != null) {
-      reportPortPlusOne_ = Integer.parseInt(s);
-    }
-    
-  }
-    
-  void logprintln(String s)
-  {
-    if(log_ != null) {
-      log_.println(s);
-    } else {
-      System.err.println(s); // or LOG.info()
-    }
-  }
-  
-  void logflush()
-  {
-    if(log_ != null) {
-      log_.flush();
-    }
-  }
-  
-  void addEnvironment(Properties env, String nameVals)
-  {
-    // encoding "a=b c=d" from StreamJob
-    if(nameVals == null) return;
-    String[] nv = nameVals.split(" ");
-    for(int i=0; i<nv.length; i++) {
-      String[] pair = nv[i].split("=", 2);
-      if(pair.length != 2) {
-        logprintln("Skip ev entry:" + nv[i]);
-      } else {
-        logprintln("Add  ev entry:" + nv[i]);
-        env.put(pair[0], pair[1]);
-      }
-    }
-  }
-  
-  /** .. and if successful: delete the task log */
-  void appendLogToJobLog(String status)
-  {
-    if(jobLog_ == null) {
-      return; // not using a common joblog
-    }
-    StreamUtil.exec("/bin/rm " + LOGNAME, log_);
-    // TODO socket-based aggregator (in JobTrackerInfoServer)
-  }
-  
-  
-  void startOutputThreads(OutputCollector output, Reporter reporter)
-  {
-      outputDone_ = false;
-      errorDone_ = false;
-      outThread_ = new MROutputThread(output, reporter);
-      outThread_.start();
-      errThread_ = new MRErrorThread(reporter);
-      errThread_.start();
-  }
-    
-  void splitKeyVal(String line, UTF8 key, UTF8 val)
-  {
-    int pos;
-    if(keyCols_ == ALL_COLS) {
-      pos = -1;
-    } else {
-      pos = line.indexOf('\t');
-    }    
-    if(pos == -1) {
-      key.set(line);
-      val.set("");      
-    } else {
-      key.set(line.substring(0, pos));
-      val.set(line.substring(pos+1));
-    }
-  }
-
-  class MROutputThread extends Thread
-  {
-    MROutputThread(OutputCollector output, Reporter reporter)
-    {
-      setDaemon(true);
-      this.output = output;
-      this.reporter = reporter;
-    }
-    public void run() {
-      try {
-            try {
-              UTF8 EMPTY = new UTF8("");
-              UTF8 key = new UTF8();
-              UTF8 val = new UTF8();
-              // 3/4 Tool to Hadoop
-              while((answer = clientIn_.readLine()) != null) {
-                // 4/4 Hadoop out 
-                if(optSideEffect_) {
-                  sideEffectOut_.write(answer.getBytes());
-                  sideEffectOut_.write('\n');
-                } else {
-                  splitKeyVal(answer, key, val);
-                  output.collect(key, val);
-                  numRecWritten_++;
-                  if(numRecWritten_ % 100 == 0) {
-                    logprintln(numRecRead_+"/"+numRecWritten_);
-                    logflush();
-                  }
-                }
-              }
-            } catch(IOException io) {
-              io.printStackTrace(log_);
-            }
-            logprintln("MROutputThread done");
-      } finally {
-          outputDone_ = true;
-          synchronized(doneLock_) {
-            doneLock_.notifyAll();
-          }
-      }
-    }
-    OutputCollector output;
-    Reporter reporter;
-    String answer;
-  }
-
-  class MRErrorThread extends Thread
-  {
-    public MRErrorThread(Reporter reporter)
-    {
-      this.reporter = reporter;
-      setDaemon(true);
-    }
-    public void run()
-    {
-      String line;
-      try {
-        long num = 0;
-        int bucket = 100;
-        while((line=clientErr_.readLine()) != null) {
-          num++;
-          logprintln(line);
-          if(num < 10) {
-            String hline = "MRErr: " + line;
-            System.err.println(hline);
-            reporter.setStatus(hline);
-          }
-        }
-      } catch(IOException io) {
-        io.printStackTrace(log_);
-      } finally {
-        errorDone_ = true;
-        synchronized(doneLock_) {
-          doneLock_.notifyAll();
-        }
-      }
-    }
-    Reporter reporter;
-  }
-
-  public void mapRedFinished()
-  {
-    logprintln("mapRedFinished");
-    try {
-    if(!doPipe_) return;
-    try {
-      if(optSideEffect_) {
-        logprintln("closing " + sideEffectPath_);
-        sideEffectOut_.close();
-        logprintln("closed  " + sideEffectPath_);
-      }
-    } catch(IOException io) {
-      io.printStackTrace();
-    }
-    try {
-      if(clientOut_ != null) {
-      	clientOut_.close();
-      }
-    } catch(IOException io) {
-    }
-    if(outThread_ == null) {
-      // no input records: threads were never spawned
-    } else {
-      try {
-        while(!outputDone_ || !errorDone_) {
-          synchronized(doneLock_) {
-            doneLock_.wait();
-          }
-        }
-      } catch(InterruptedException ie) {
-        ie.printStackTrace();
-      }
-    }
-      sim.destroy();
-    } catch(RuntimeException e) {
-      e.printStackTrace(log_);
-      throw e;
-    }
-  }
-  
-  void maybeLogRecord()
-  {
-    if(numRecRead_ >= nextRecReadLog_) {
-      String info = numRecInfo();
-      logprintln(info);
-      logflush();      
-      System.err.println(info);
-      //nextRecReadLog_ *= 10;
-      nextRecReadLog_ += 100;
-    }    
-  }
-  
-  public String getContext()
-  {
-    
-    String s = numRecInfo() + "\n";
-    s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
-    s += "LOGNAME=" + LOGNAME + "\n";
-    s += envline("HOST");
-    s += envline("USER");
-    s += envline("HADOOP_USER");
-    //s += envline("PWD"); // =/home/crawler/hadoop/trunk 
-    s += "last Hadoop input: |" + mapredKey_ + "|\n";
-    s += "last tool output: |" + outThread_.answer + "|\n";
-    s += "Date: " + new Date() + "\n";
-    // s += envline("HADOOP_HOME");
-    // s += envline("REMOTE_HOST");
-    return s;
-  }
-  
-  String envline(String var)
-  {
-    return var + "=" + StreamUtil.env().get(var) + "\n";
-  }
-  
-  String numRecInfo()
-  {
-    long elapsed = (System.currentTimeMillis() - startTime_)/1000;
-    long total = numRecRead_+numRecWritten_+numRecSkipped_;
-    return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
-     + " in:"  + safeDiv(numRecRead_, elapsed) + " [rec/s]"
-     + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
-  }
-  String safeDiv(long n, long d)
-  {
-    return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
-  }
-  String logFailure(Exception e)
-  {
-      StringWriter sw = new StringWriter();
-      PrintWriter pw = new PrintWriter(sw);
-      e.printStackTrace(pw);    
-      String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
-      logprintln(msg);
-      return msg;  
-  }
-    
-
-  long startTime_;
-  long numRecRead_ = 0;
-  long numRecWritten_ = 0;
-  long numRecSkipped_ = 0;
-  long nextRecReadLog_ = 1;
-  
-  long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
-  
-  int keyCols_;
-  final static int ALL_COLS = Integer.MAX_VALUE;
-  
-  JobConf job_;
-
-  // generic MapRed parameters passed on by hadoopStreaming
-  String taskid_;
-  int reportPortPlusOne_;
-
-  boolean doPipe_;
-  
-  Process sim;
-  Object doneLock_;
-  MROutputThread outThread_;
-  MRErrorThread errThread_;
-  boolean outputDone_;
-  boolean errorDone_;
-  DataOutputStream clientOut_;
-  DataInputStream  clientErr_;
-  DataInputStream   clientIn_;
-
-  String jobLog_;
-  // set in PipeMapper/PipeReducer subclasses
-  String mapredKey_;
-  int numExceptions_;
-  
-  boolean optUseKey_ = true;
-
-  boolean optSideEffect_;
-  Path sideEffectPath_;
-  FSDataOutputStream sideEffectOut_;
-
-  String LOGNAME;
-  PrintStream log_;
-  
-  /* curr. going to stderr so that it is preserved
-  { // instance initializer
-    try {
-      int id = (int)((System.currentTimeMillis()/2000) % 10);
-      String sid = id+ "." + StreamUtil.env().get("USER");
-      LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
-      log_ = new PrintStream(new FileOutputStream(LOGNAME));
-      logprintln(new java.util.Date());
-      logflush();
-    } catch(IOException io) {
-      System.err.println("LOGNAME=" + LOGNAME);
-      io.printStackTrace();
-    } finally {
-      if(log_ == null) {
-        log_ = System.err;
-      }
-    }    
-  }
-  */
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.nio.channels.*;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.regex.*;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+/** Shared functionality for PipeMapper, PipeReducer.
+ *  @author Michel Tourn
+ */
+public abstract class PipeMapRed {
+
+  protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
+
+  /** The command to be spawned as a subprocess.
+   * Mapper/Reducer operations will delegate to it
+   */
+  abstract String getPipeCommand(JobConf job);
+  /*
+  */
+  abstract String getKeyColPropName();
+
+  /** Write output as side-effect files rather than as map outputs.
+      This is useful to do "Map" tasks rather than "MapReduce" tasks. */
+  boolean getUseSideEffect()
+  {
+    return false;
+  }
+
+  /**
+   * @returns how many TABS before the end of the key part
+   * usually: 1 or "ALL"
+   * used for tool output of both Map and Reduce
+   * configured via tool's argv: splitKeyVal=ALL or 1..
+   * although it is interpreted here, not by tool
+   */
+  int getKeyColsFromPipeCommand(String cmd)
+  {
+    String key = getKeyColPropName();
+    Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
+    Matcher match = kcPat.matcher(cmd);
+    String kc;
+    if(!match.matches()) {
+      kc = null;
+    } else {
+      kc = match.group(1);
+    }
+
+    int cols;
+    if(kc== null) {
+      // default value is 1 and the Stream applications could instead
+      // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
+      cols = 1;
+    } else if(kc.equals("ALL")) {
+      cols = ALL_COLS;
+    } else {
+      try {
+        cols = Integer.parseInt(kc);
+      } catch(NumberFormatException nf) {
+        cols = Integer.MAX_VALUE;
+      }
+    }
+
+    System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
+
+    return cols;
+  }
+
+  final static int OUTSIDE = 1;
+  final static int SINGLEQ = 2;
+  final static int DOUBLEQ = 3;
+
+  static String[] splitArgs(String args)
+  {
+    ArrayList argList = new ArrayList();
+    char[] ch = args.toCharArray();
+    int clen = ch.length;
+    int state = OUTSIDE;
+    int argstart = 0;
+    for(int c=0; c<=clen; c++) {
+        boolean last = (c==clen);
+        int lastState = state;
+        boolean endToken = false;
+        if(!last) {
+          if(ch[c]=='\'') {
+            if(state == OUTSIDE) {
+              state = SINGLEQ;
+            } else if(state == SINGLEQ) {
+              state = OUTSIDE;
+            }
+            endToken = (state != lastState);
+          } else if(ch[c]=='"') {
+            if(state == OUTSIDE) {
+              state = DOUBLEQ;
+            } else if(state == DOUBLEQ) {
+              state = OUTSIDE;
+            }
+            endToken = (state != lastState);
+          } else if(ch[c]==' ') {
+            if(state == OUTSIDE) {
+              endToken = true;
+            }
+          }
+        }
+        if(last || endToken) {
+          if(c == argstart) {
+            // unquoted space
+          } else {
+            String a;
+            a = args.substring(argstart, c);
+            argList.add(a);
+          }
+          argstart = c+1;
+          lastState = state;
+        }
+    }
+    return (String[])argList.toArray(new String[0]);
+  }
+
+  public void configure(JobConf job)
+  {
+
+    try {
+      String argv = getPipeCommand(job);
+      keyCols_ = getKeyColsFromPipeCommand(argv);
+
+      job_ = job;
+
+      // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
+      doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
+      if(!doPipe_) return;
+
+      setStreamJobDetails(job);
+      setStreamProperties();
+
+      String[] argvSplit = splitArgs(argv);
+      String prog = argvSplit[0];
+      String userdir = System.getProperty("user.dir");
+      if(new File(prog).isAbsolute()) {
+        // we don't own it. Hope it is executable
+      } else {
+        new MustangFile(prog).setExecutable(true, true);
+      }
+
+
+      if(job_.getInputValueClass().equals(BytesWritable.class)) {
+        // TODO expose as separate config:
+        // job or semistandard inputformat property
+        optUseKey_ = false;
+      }
+
+      optSideEffect_ = getUseSideEffect();
+
+      if(optSideEffect_) {
+        String fileName = job_.get("mapred.task.id");
+        sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
+        FileSystem fs = FileSystem.get(job_);
+        sideEffectOut_ = fs.create(sideEffectPath_);
+      }
+
+      // argvSplit[0]:
+      // An absolute path should be a preexisting valid path on all TaskTrackers
+      // A  relative path should match in the unjarred Job data
+      // In this case, force an absolute path to make sure exec finds it.
+      argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
+      logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
+      logprintln("sideEffectPath_=" + sideEffectPath_);
+
+      Environment childEnv = (Environment)StreamUtil.env().clone();
+      addEnvironment(childEnv, job_.get("stream.addenvironment"));
+      sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
+
+      /* // This way required jdk1.5
+      ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
+      Map<String, String> env = processBuilder.environment();
+      addEnvironment(env, job_.get("stream.addenvironment"));
+      sim = processBuilder.start();
+      */
+
+      clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
+      clientIn_  = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+      clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
+      doneLock_  = new Object();
+      startTime_ = System.currentTimeMillis();
+
+    } catch(Exception e) {
+        e.printStackTrace();
+        e.printStackTrace(log_);
+    }
+  }
+
+  void setStreamJobDetails(JobConf job)
+  {
+    jobLog_ = job.get("stream.jobLog_");
+    String s = job.get("stream.minRecWrittenToEnableSkip_");
+    if(s != null) {
+      minRecWrittenToEnableSkip_ = Long.parseLong(s);
+      logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+    }
+  }
+
+  void setStreamProperties()
+  {
+    taskid_ = System.getProperty("stream.taskid");
+    if(taskid_ == null) {
+      taskid_ = "noid" + System.currentTimeMillis();
+    }
+    String s = System.getProperty("stream.port");
+    if(s != null) {
+      reportPortPlusOne_ = Integer.parseInt(s);
+    }
+
+  }
+
+  void logprintln(String s)
+  {
+    if(log_ != null) {
+      log_.println(s);
+    } else {
+      System.err.println(s); // or LOG.info()
+    }
+  }
+
+  void logflush()
+  {
+    if(log_ != null) {
+      log_.flush();
+    }
+  }
+
+  void addEnvironment(Properties env, String nameVals)
+  {
+    // encoding "a=b c=d" from StreamJob
+    if(nameVals == null) return;
+    String[] nv = nameVals.split(" ");
+    for(int i=0; i<nv.length; i++) {
+      String[] pair = nv[i].split("=", 2);
+      if(pair.length != 2) {
+        logprintln("Skip ev entry:" + nv[i]);
+      } else {
+        logprintln("Add  ev entry:" + nv[i]);
+        env.put(pair[0], pair[1]);
+      }
+    }
+  }
+
+  /** .. and if successful: delete the task log */
+  void appendLogToJobLog(String status)
+  {
+    if(jobLog_ == null) {
+      return; // not using a common joblog
+    }
+    StreamUtil.exec("/bin/rm " + LOGNAME, log_);
+    // TODO socket-based aggregator (in JobTrackerInfoServer)
+  }
+
+
+  void startOutputThreads(OutputCollector output, Reporter reporter)
+  {
+      outputDone_ = false;
+      errorDone_ = false;
+      outThread_ = new MROutputThread(output, reporter);
+      outThread_.start();
+      errThread_ = new MRErrorThread(reporter);
+      errThread_.start();
+  }
+
+  void splitKeyVal(String line, UTF8 key, UTF8 val)
+  {
+    int pos;
+    if(keyCols_ == ALL_COLS) {
+      pos = -1;
+    } else {
+      pos = line.indexOf('\t');
+    }
+    if(pos == -1) {
+      key.set(line);
+      val.set("");
+    } else {
+      key.set(line.substring(0, pos));
+      val.set(line.substring(pos+1));
+    }
+  }
+
+  class MROutputThread extends Thread
+  {
+    MROutputThread(OutputCollector output, Reporter reporter)
+    {
+      setDaemon(true);
+      this.output = output;
+      this.reporter = reporter;
+    }
+    public void run() {
+      try {
+            try {
+              UTF8 EMPTY = new UTF8("");
+              UTF8 key = new UTF8();
+              UTF8 val = new UTF8();
+              // 3/4 Tool to Hadoop
+              while((answer = clientIn_.readLine()) != null) {
+                // 4/4 Hadoop out
+                if(optSideEffect_) {
+                  sideEffectOut_.write(answer.getBytes());
+                  sideEffectOut_.write('\n');
+                } else {
+                  splitKeyVal(answer, key, val);
+                  output.collect(key, val);
+                  numRecWritten_++;
+                  if(numRecWritten_ % 100 == 0) {
+                    logprintln(numRecRead_+"/"+numRecWritten_);
+                    logflush();
+                  }
+                }
+              }
+            } catch(IOException io) {
+              io.printStackTrace(log_);
+            }
+            logprintln("MROutputThread done");
+      } finally {
+          outputDone_ = true;
+          synchronized(doneLock_) {
+            doneLock_.notifyAll();
+          }
+      }
+    }
+    OutputCollector output;
+    Reporter reporter;
+    String answer;
+  }
+
+  class MRErrorThread extends Thread
+  {
+    public MRErrorThread(Reporter reporter)
+    {
+      this.reporter = reporter;
+      setDaemon(true);
+    }
+    public void run()
+    {
+      String line;
+      try {
+        long num = 0;
+        int bucket = 100;
+        while((line=clientErr_.readLine()) != null) {
+          num++;
+          logprintln(line);
+          if(num < 10) {
+            String hline = "MRErr: " + line;
+            System.err.println(hline);
+            reporter.setStatus(hline);
+          }
+        }
+      } catch(IOException io) {
+        io.printStackTrace(log_);
+      } finally {
+        errorDone_ = true;
+        synchronized(doneLock_) {
+          doneLock_.notifyAll();
+        }
+      }
+    }
+    Reporter reporter;
+  }
+
+  public void mapRedFinished()
+  {
+    logprintln("mapRedFinished");
+    try {
+    if(!doPipe_) return;
+    try {
+      if(optSideEffect_) {
+        logprintln("closing " + sideEffectPath_);
+        sideEffectOut_.close();
+        logprintln("closed  " + sideEffectPath_);
+      }
+    } catch(IOException io) {
+      io.printStackTrace();
+    }
+    try {
+      if(clientOut_ != null) {
+        clientOut_.close();
+      }
+    } catch(IOException io) {
+    }
+    if(outThread_ == null) {
+      // no input records: threads were never spawned
+    } else {
+      try {
+        while(!outputDone_ || !errorDone_) {
+          synchronized(doneLock_) {
+            doneLock_.wait();
+          }
+        }
+      } catch(InterruptedException ie) {
+        ie.printStackTrace();
+      }
+    }
+      sim.destroy();
+    } catch(RuntimeException e) {
+      e.printStackTrace(log_);
+      throw e;
+    }
+  }
+
+  void maybeLogRecord()
+  {
+    if(numRecRead_ >= nextRecReadLog_) {
+      String info = numRecInfo();
+      logprintln(info);
+      logflush();
+      System.err.println(info);
+      //nextRecReadLog_ *= 10;
+      nextRecReadLog_ += 100;
+    }
+  }
+
+  public String getContext()
+  {
+
+    String s = numRecInfo() + "\n";
+    s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
+    s += "LOGNAME=" + LOGNAME + "\n";
+    s += envline("HOST");
+    s += envline("USER");
+    s += envline("HADOOP_USER");
+    //s += envline("PWD"); // =/home/crawler/hadoop/trunk
+    s += "last Hadoop input: |" + mapredKey_ + "|\n";
+    s += "last tool output: |" + outThread_.answer + "|\n";
+    s += "Date: " + new Date() + "\n";
+    // s += envline("HADOOP_HOME");
+    // s += envline("REMOTE_HOST");
+    return s;
+  }
+
+  String envline(String var)
+  {
+    return var + "=" + StreamUtil.env().get(var) + "\n";
+  }
+
+  String numRecInfo()
+  {
+    long elapsed = (System.currentTimeMillis() - startTime_)/1000;
+    long total = numRecRead_+numRecWritten_+numRecSkipped_;
+    return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
+     + " in:"  + safeDiv(numRecRead_, elapsed) + " [rec/s]"
+     + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
+  }
+  String safeDiv(long n, long d)
+  {
+    return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
+  }
+  String logFailure(Exception e)
+  {
+      StringWriter sw = new StringWriter();
+      PrintWriter pw = new PrintWriter(sw);
+      e.printStackTrace(pw);
+      String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
+      logprintln(msg);
+      return msg;
+  }
+
+
+  long startTime_;
+  long numRecRead_ = 0;
+  long numRecWritten_ = 0;
+  long numRecSkipped_ = 0;
+  long nextRecReadLog_ = 1;
+
+  long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
+
+  int keyCols_;
+  final static int ALL_COLS = Integer.MAX_VALUE;
+
+  JobConf job_;
+
+  // generic MapRed parameters passed on by hadoopStreaming
+  String taskid_;
+  int reportPortPlusOne_;
+
+  boolean doPipe_;
+
+  Process sim;
+  Object doneLock_;
+  MROutputThread outThread_;
+  MRErrorThread errThread_;
+  boolean outputDone_;
+  boolean errorDone_;
+  DataOutputStream clientOut_;
+  DataInputStream  clientErr_;
+  DataInputStream   clientIn_;
+
+  String jobLog_;
+  // set in PipeMapper/PipeReducer subclasses
+  String mapredKey_;
+  int numExceptions_;
+
+  boolean optUseKey_ = true;
+
+  boolean optSideEffect_;
+  Path sideEffectPath_;
+  FSDataOutputStream sideEffectOut_;
+
+  String LOGNAME;
+  PrintStream log_;
+
+  /* curr. going to stderr so that it is preserved
+  { // instance initializer
+    try {
+      int id = (int)((System.currentTimeMillis()/2000) % 10);
+      String sid = id+ "." + StreamUtil.env().get("USER");
+      LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
+      log_ = new PrintStream(new FileOutputStream(LOGNAME));
+      logprintln(new java.util.Date());
+      logflush();
+    } catch(IOException io) {
+      System.err.println("LOGNAME=" + LOGNAME);
+      io.printStackTrace();
+    } finally {
+      if(log_ == null) {
+        log_ = System.err;
+      }
+    }
+  }
+  */
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Fri Jul 14 00:55:15 2006
@@ -1,121 +1,121 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-/** A generic Mapper bridge.
- *  It delegates operations to an external program via stdin and stdout.
- *  @author Michel Tourn
- */
-public class PipeMapper extends PipeMapRed implements Mapper
-{
-
-  String getPipeCommand(JobConf job)
-  {
-    return job.get("stream.map.streamprocessor");
-  }
-
-  String getKeyColPropName()
-  {
-    return "mapKeyCols";
-  }  
-
-  boolean getUseSideEffect()
-  {
-    String reduce = job_.get("stream.reduce.streamprocessor");
-    if(StreamJob.REDUCE_NONE.equals(reduce)) {
-      return true;  
-    }
-    return false;
-  }
-  
-
-  // Do NOT declare default constructor
-  // (MapRed creates it reflectively)
-
-  public void map(WritableComparable key, Writable value,
-                  OutputCollector output, Reporter reporter)
-    throws IOException
-  {
-    // init
-    if(outThread_ == null) {
-      startOutputThreads(output, reporter);
-    }
-    try {
-      // 1/4 Hadoop in
-      if(key instanceof BytesWritable) {
-        mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
-      } else {
-        mapredKey_ = key.toString();        
-      }
-      numRecRead_++;
-
-      maybeLogRecord();
-
-      // 2/4 Hadoop to Tool
-      if(numExceptions_==0) {
-        String sval;
-        if(value instanceof BytesWritable) {
-          sval = new String(((BytesWritable)value).get(), "UTF-8");
-        } else {
-          sval = value.toString();
-        }
-        if(optUseKey_) {
-          clientOut_.writeBytes(mapredKey_);
-          clientOut_.writeBytes("\t");
-        }
-        clientOut_.writeBytes(sval);
-        clientOut_.writeBytes("\n");
-        clientOut_.flush();
-      } else {
-        numRecSkipped_++;
-      }
-    } catch(IOException io) {
-      numExceptions_++;
-      if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
-        // terminate with failure
-        String msg = logFailure(io);
-        appendLogToJobLog("failure");
-        throw new IOException(msg);
-      } else {
-        // terminate with success:
-        // swallow input records although the stream processor failed/closed
-      }
-    }
-  }
-  
-  public void close()
-  {
-    appendLogToJobLog("success");
-    mapRedFinished();
-  }
-  
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Mapper bridge.
+ *  It delegates operations to an external program via stdin and stdout.
+ *  @author Michel Tourn
+ */
+public class PipeMapper extends PipeMapRed implements Mapper
+{
+
+  String getPipeCommand(JobConf job)
+  {
+    return job.get("stream.map.streamprocessor");
+  }
+
+  String getKeyColPropName()
+  {
+    return "mapKeyCols";
+  }  
+
+  boolean getUseSideEffect()
+  {
+    String reduce = job_.get("stream.reduce.streamprocessor");
+    if(StreamJob.REDUCE_NONE.equals(reduce)) {
+      return true;  
+    }
+    return false;
+  }
+  
+
+  // Do NOT declare default constructor
+  // (MapRed creates it reflectively)
+
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector output, Reporter reporter)
+    throws IOException
+  {
+    // init
+    if(outThread_ == null) {
+      startOutputThreads(output, reporter);
+    }
+    try {
+      // 1/4 Hadoop in
+      if(key instanceof BytesWritable) {
+        mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
+      } else {
+        mapredKey_ = key.toString();        
+      }
+      numRecRead_++;
+
+      maybeLogRecord();
+
+      // 2/4 Hadoop to Tool
+      if(numExceptions_==0) {
+        String sval;
+        if(value instanceof BytesWritable) {
+          sval = new String(((BytesWritable)value).get(), "UTF-8");
+        } else {
+          sval = value.toString();
+        }
+        if(optUseKey_) {
+          clientOut_.writeBytes(mapredKey_);
+          clientOut_.writeBytes("\t");
+        }
+        clientOut_.writeBytes(sval);
+        clientOut_.writeBytes("\n");
+        clientOut_.flush();
+      } else {
+        numRecSkipped_++;
+      }
+    } catch(IOException io) {
+      numExceptions_++;
+      if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
+        // terminate with failure
+        String msg = logFailure(io);
+        appendLogToJobLog("failure");
+        throw new IOException(msg);
+      } else {
+        // terminate with success:
+        // swallow input records although the stream processor failed/closed
+      }
+    }
+  }
+  
+  public void close()
+  {
+    appendLogToJobLog("success");
+    mapRedFinished();
+  }
+  
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Fri Jul 14 00:55:15 2006
@@ -1,86 +1,86 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-/** A generic Reducer bridge.
- *  It delegates operations to an external program via stdin and stdout.
- *  @author Michel Tourn
- */
-public class PipeReducer extends PipeMapRed implements Reducer
-{
-
-  String getPipeCommand(JobConf job)
-  {
-    return job.get("stream.reduce.streamprocessor");
-  }
-
-  String getKeyColPropName()
-  {
-    return "reduceKeyCols";
-  }  
-  
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
-    throws IOException {
-
-    // init
-    if(doPipe_ && outThread_ == null) {
-      startOutputThreads(output, reporter);
-    }
-    try {
-      while (values.hasNext()) {
-        Writable val = (Writable)values.next();
-        numRecRead_++;
-        maybeLogRecord();
-        if(doPipe_) {
-          clientOut_.writeBytes(key.toString());
-          clientOut_.writeBytes("\t");
-          clientOut_.writeBytes(val.toString());
-          clientOut_.writeBytes("\n");
-          clientOut_.flush();
-        } else {
-          // "identity reduce"
-          output.collect(key, val);
-        }
-      }
-    } catch(IOException io) {
-      appendLogToJobLog("failure");
-      throw new IOException(getContext() + io.getMessage());    
-    }
-  }
-
-  public void close()
-  {
-    appendLogToJobLog("success");
-    mapRedFinished();
-  }
-
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Reducer bridge.
+ *  It delegates operations to an external program via stdin and stdout.
+ *  @author Michel Tourn
+ */
+public class PipeReducer extends PipeMapRed implements Reducer
+{
+
+  String getPipeCommand(JobConf job)
+  {
+    return job.get("stream.reduce.streamprocessor");
+  }
+
+  String getKeyColPropName()
+  {
+    return "reduceKeyCols";
+  }  
+  
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
+
+    // init
+    if(doPipe_ && outThread_ == null) {
+      startOutputThreads(output, reporter);
+    }
+    try {
+      while (values.hasNext()) {
+        Writable val = (Writable)values.next();
+        numRecRead_++;
+        maybeLogRecord();
+        if(doPipe_) {
+          clientOut_.writeBytes(key.toString());
+          clientOut_.writeBytes("\t");
+          clientOut_.writeBytes(val.toString());
+          clientOut_.writeBytes("\n");
+          clientOut_.flush();
+        } else {
+          // "identity reduce"
+          output.collect(key, val);
+        }
+      }
+    } catch(IOException io) {
+      appendLogToJobLog("failure");
+      throw new IOException(getContext() + io.getMessage());    
+    }
+  }
+
+  public void close()
+  {
+    appendLogToJobLog("success");
+    mapRedFinished();
+  }
+
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Fri Jul 14 00:55:15 2006
@@ -1,173 +1,173 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hadoop.mapred.*;
-
-/** An input format that performs globbing on DFS paths and 
- * selects a RecordReader based on a JobConf property.
- * @author Michel Tourn
- */
-public class StreamInputFormat extends InputFormatBase
-{
-
-  // an InputFormat should be public with the synthetic public default constructor
-  // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
-  
-  protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
-  
-  static {
-    //LOG.setLevel(Level.FINE);
-  }
-  
-  /** This implementation always returns true. */
-  public boolean[] areValidInputDirectories(FileSystem fileSys,
-                                            Path[] inputDirs
-                                            ) throws IOException {
-    boolean[] b = new boolean[inputDirs.length];
-    for(int i=0; i < inputDirs.length; ++i) {
-      b[i] = true;
-    }
-    return b;
-  }
-
-  
-  protected Path[] listPaths(FileSystem fs, JobConf job)
-    throws IOException
-  {
-    Path[] globs = job.getInputPaths();
-    ArrayList list = new ArrayList();
-    int dsup = globs.length;
-    for(int d=0; d<dsup; d++) {
-      String leafName = globs[d].getName();
-      LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
-      Path[] paths; Path dir;
-	  PathFilter filter = new GlobFilter(fs, leafName);
-	  dir = new Path(globs[d].getParent().toString());
-      if(dir == null) dir = new Path(".");
-	  paths = fs.listPaths(dir, filter);
-      list.addAll(Arrays.asList(paths));
-    }
-    return (Path[])list.toArray(new Path[]{});
-  }
-
-  class GlobFilter implements PathFilter
-  {
-    public GlobFilter(FileSystem fs, String glob)
-    {
-      fs_ = fs;
-      pat_ = Pattern.compile(globToRegexp(glob));
-    }
-    String globToRegexp(String glob)
-	{
-      String re = glob;
-      re = re.replaceAll("\\.", "\\\\.");
-      re = re.replaceAll("\\+", "\\\\+");
-	  re = re.replaceAll("\\*", ".*");
-      re = re.replaceAll("\\?", ".");
-      LOG.info("globToRegexp: |" + glob + "|  ->  |" + re + "|");
-      return re;
-	}
-
-    public boolean accept(Path pathname)
-    {
-      boolean acc = !fs_.isChecksumFile(pathname);
-      if(acc) {
-      	acc = pat_.matcher(pathname.getName()).matches();
-      }
-      LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
-      return acc;
-    }
-	
-	Pattern pat_;
-    FileSystem fs_;
-  }
-
-  public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
-                                      JobConf job, Reporter reporter)
-    throws IOException {
-    LOG.info("getRecordReader start.....");
-    reporter.setStatus(split.toString());
-
-    final long start = split.getStart();
-    final long end = start + split.getLength();
-
-    String splitName = split.getFile() + ":" + start + "-" + end;
-    final FSDataInputStream in = fs.open(split.getFile());
-    
-    // will open the file and seek to the start of the split
-    // Factory dispatch based on available params..    
-    Class readerClass;
-    String c = job.get("stream.recordreader.class");
-    if(c == null) {
-      readerClass = StreamLineRecordReader.class;
-    } else {
-      readerClass = StreamUtil.goodClassOrNull(c, null);
-      if(readerClass == null) {
-        throw new RuntimeException("Class not found: " + c);
-      }    
-    }
-    
-    Constructor ctor;
-    try {
-      ctor = readerClass.getConstructor(new Class[]{
-        FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
-    } catch(NoSuchMethodException nsm) {
-      throw new RuntimeException(nsm);
-    }
-
-    
-    StreamBaseRecordReader reader;
-    try {
-        reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
-            in, split, reporter, job, fs});        
-    } catch(Exception nsm) {
-      throw new RuntimeException(nsm);
-    }
-        
-	reader.init();
-
-
-    if(reader instanceof StreamSequenceRecordReader) {
-      // override k/v class types with types stored in SequenceFile
-      StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
-      job.setInputKeyClass(ss.rin_.getKeyClass());
-      job.setInputValueClass(ss.rin_.getValueClass());
-    }
-    
-    
-    return reader;
-  }
-
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.mapred.*;
+
+/** An input format that performs globbing on DFS paths and
+ * selects a RecordReader based on a JobConf property.
+ * @author Michel Tourn
+ */
+public class StreamInputFormat extends InputFormatBase
+{
+
+  // an InputFormat should be public with the synthetic public default constructor
+  // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
+
+  protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
+
+  static {
+    //LOG.setLevel(Level.FINE);
+  }
+
+  /** This implementation always returns true. */
+  public boolean[] areValidInputDirectories(FileSystem fileSys,
+                                            Path[] inputDirs
+                                            ) throws IOException {
+    boolean[] b = new boolean[inputDirs.length];
+    for(int i=0; i < inputDirs.length; ++i) {
+      b[i] = true;
+    }
+    return b;
+  }
+
+
+  protected Path[] listPaths(FileSystem fs, JobConf job)
+    throws IOException
+  {
+    Path[] globs = job.getInputPaths();
+    ArrayList list = new ArrayList();
+    int dsup = globs.length;
+    for(int d=0; d<dsup; d++) {
+      String leafName = globs[d].getName();
+      LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
+      Path[] paths; Path dir;
+      PathFilter filter = new GlobFilter(fs, leafName);
+      dir = new Path(globs[d].getParent().toString());
+      if(dir == null) dir = new Path(".");
+      paths = fs.listPaths(dir, filter);
+      list.addAll(Arrays.asList(paths));
+    }
+    return (Path[])list.toArray(new Path[]{});
+  }
+
+  class GlobFilter implements PathFilter
+  {
+    public GlobFilter(FileSystem fs, String glob)
+    {
+      fs_ = fs;
+      pat_ = Pattern.compile(globToRegexp(glob));
+    }
+    String globToRegexp(String glob)
+    {
+      String re = glob;
+      re = re.replaceAll("\\.", "\\\\.");
+      re = re.replaceAll("\\+", "\\\\+");
+      re = re.replaceAll("\\*", ".*");
+      re = re.replaceAll("\\?", ".");
+      LOG.info("globToRegexp: |" + glob + "|  ->  |" + re + "|");
+      return re;
+    }
+
+    public boolean accept(Path pathname)
+    {
+      boolean acc = !fs_.isChecksumFile(pathname);
+      if(acc) {
+          acc = pat_.matcher(pathname.getName()).matches();
+      }
+      LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
+      return acc;
+    }
+
+    Pattern pat_;
+    FileSystem fs_;
+  }
+
+  public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
+                                      JobConf job, Reporter reporter)
+    throws IOException {
+    LOG.info("getRecordReader start.....");
+    reporter.setStatus(split.toString());
+
+    final long start = split.getStart();
+    final long end = start + split.getLength();
+
+    String splitName = split.getFile() + ":" + start + "-" + end;
+    final FSDataInputStream in = fs.open(split.getFile());
+
+    // will open the file and seek to the start of the split
+    // Factory dispatch based on available params..
+    Class readerClass;
+    String c = job.get("stream.recordreader.class");
+    if(c == null) {
+      readerClass = StreamLineRecordReader.class;
+    } else {
+      readerClass = StreamUtil.goodClassOrNull(c, null);
+      if(readerClass == null) {
+        throw new RuntimeException("Class not found: " + c);
+      }
+    }
+
+    Constructor ctor;
+    try {
+      ctor = readerClass.getConstructor(new Class[]{
+        FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
+    } catch(NoSuchMethodException nsm) {
+      throw new RuntimeException(nsm);
+    }
+
+
+    StreamBaseRecordReader reader;
+    try {
+        reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
+            in, split, reporter, job, fs});
+    } catch(Exception nsm) {
+      throw new RuntimeException(nsm);
+    }
+
+    reader.init();
+
+
+    if(reader instanceof StreamSequenceRecordReader) {
+      // override k/v class types with types stored in SequenceFile
+      StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
+      job.setInputKeyClass(ss.rin_.getKeyClass());
+      job.setInputValueClass(ss.rin_.getValueClass());
+    }
+
+
+    return reader;
+  }
+
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Fri Jul 14 00:55:15 2006
@@ -34,30 +34,30 @@
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
 
-/** All the client-side work happens here. 
+/** All the client-side work happens here.
  * (Jar packaging, MapRed job submission and monitoring)
  * @author Michel Tourn
  */
 public class StreamJob
 {
-  protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());    
-  
+  protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
+
   final static String REDUCE_NONE = "NONE";
-  
+
   public StreamJob(String[] argv, boolean mayExit)
   {
     argv_ = argv;
-    mayExit_ = mayExit;    
+    mayExit_ = mayExit;
   }
-  
+
   public void go() throws IOException
   {
     init();
-    
+
     preProcessArgs();
     parseArgv();
     postProcessArgs();
-    
+
     setJobConf();
     submitAndMonitorJob();
   }
@@ -70,13 +70,13 @@
         throw new RuntimeException(io);
      }
   }
-  
+
   void preProcessArgs()
   {
     verbose_ = false;
     addTaskEnvironment_ = "";
   }
-  
+
   void postProcessArgs() throws IOException
   {
     if(cluster_ == null) {
@@ -94,18 +94,19 @@
 
     Iterator it = packageFiles_.iterator();
     while(it.hasNext()) {
-      File f = new File((String)it.next());    
+      File f = new File((String)it.next());
       if(f.isFile()) {
         shippedCanonFiles_.add(f.getCanonicalPath());
       }
     }
     msg("shippedCanonFiles_=" + shippedCanonFiles_);
-    
+
     // careful with class names..
     mapCmd_ = unqualifyIfLocalPath(mapCmd_);
-    redCmd_ = unqualifyIfLocalPath(redCmd_);    
+    comCmd_ = unqualifyIfLocalPath(comCmd_);
+    redCmd_ = unqualifyIfLocalPath(redCmd_);
   }
-  
+
   void validateNameEqValue(String neqv)
   {
     String[] nv = neqv.split("=", 2);
@@ -114,11 +115,11 @@
     }
     msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
   }
-  
+
   String unqualifyIfLocalPath(String cmd) throws IOException
   {
     if(cmd == null) {
-      //    
+      //
     } else {
       String prog = cmd;
       String args = "";
@@ -131,8 +132,8 @@
       boolean shipped = shippedCanonFiles_.contains(progCanon);
       msg("shipped: " + shipped + " " + progCanon);
       if(shipped) {
-        // Change path to simple filename. 
-        // That way when PipeMapRed calls Runtime.exec(), 
+        // Change path to simple filename.
+        // That way when PipeMapRed calls Runtime.exec(),
         // it will look for the excutable in Task's working dir.
         // And this is where TaskRunner unjars our job jar.
         prog = new File(prog).getName();
@@ -146,25 +147,25 @@
     msg("cmd=" + cmd);
     return cmd;
   }
-  
+
   String getHadoopAliasConfFile()
   {
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
   }
-   
-  
+
+
   void parseArgv()
   {
     if(argv_.length==0) {
       exitUsage(false);
     }
-    int i=0; 
+    int i=0;
     while(i < argv_.length) {
       String s;
       if(argv_[i].equals("-verbose")) {
-        verbose_ = true;      
+        verbose_ = true;
       } else if(argv_[i].equals("-info")) {
-        detailedUsage_ = true;      
+        detailedUsage_ = true;
       } else if(argv_[i].equals("-debug")) {
         debug_++;
       } else if((s = optionArg(argv_, i, "-input", false)) != null) {
@@ -176,6 +177,9 @@
       } else if((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
         i++;
         mapCmd_ = s;
+      } else if((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
+        i++;
+        comCmd_ = s;
       } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
         i++;
         redCmd_ = s;
@@ -218,7 +222,7 @@
         exitUsage(true);
     }
   }
-  
+
   String optionArg(String[] args, int index, String arg, boolean argSet)
   {
     if(index >= args.length || ! args[index].equals(arg)) {
@@ -229,10 +233,10 @@
     }
     if(index >= args.length-1) {
       throw new IllegalArgumentException("Expected argument after option " + args[index]);
-    }    
+    }
     return args[index+1];
   }
-  
+
   protected void msg(String msg)
   {
     if(verbose_) {
@@ -242,15 +246,15 @@
 
   public void exitUsage(boolean detailed)
   {
-                      //         1         2         3         4         5         6         7         
+                      //         1         2         3         4         5         6         7
                       //1234567890123456789012345678901234567890123456789012345678901234567890123456789
     System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar [options]");
     System.out.println("Options:");
     System.out.println("  -input    <path>     DFS input file(s) for the Map step");
     System.out.println("  -output   <path>     DFS output directory for the Reduce step");
     System.out.println("  -mapper   <cmd>      The streaming command to run");
-    System.out.println("  -combiner <cmd>      Not implemented. But you can pipe the mapper output");
-    System.out.println("  -reducer  <cmd>      The streaming command to run.");
+    System.out.println("  -combiner <cmd>      The streaming command to run");
+    System.out.println("  -reducer  <cmd>      The streaming command to run");
     System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
     System.out.println("  -cluster  <name>     Default uses hadoop-default.xml and hadoop-site.xml");
     System.out.println("  -config   <file>     Optional. One or more paths to xml config files");
@@ -261,7 +265,7 @@
     System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
     System.out.println("  -verbose");
     System.out.println();
-    if(!detailed) {    
+    if(!detailed) {
     System.out.println("For more details about these options:");
     System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
         fail("");
@@ -307,7 +311,7 @@
     System.out.println("  Input files are all the daily logs for days in month 2006-04");
     fail("");
   }
-  
+
   public void fail(String message)
   {
     if(mayExit_) {
@@ -319,8 +323,8 @@
   }
 
   // --------------------------------------------
-  
-  
+
+
   protected String getHadoopClientHome()
   {
     String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
@@ -342,11 +346,11 @@
     }
     return local;
   }
-  protected String getClusterNick() 
-  { 
+  protected String getClusterNick()
+  {
     return cluster_;
   }
-  
+
   /** @return path to the created Jar file or null if no files are necessary.
   */
   protected String packageJobJar() throws IOException
@@ -362,12 +366,12 @@
         msg("Found runtime classes in: " + runtimeClasses);
     }
     if(isLocalHadoop()) {
-      // don't package class files (they might get unpackaged in "." and then 
+      // don't package class files (they might get unpackaged in "." and then
       //  hide the intended CLASSPATH entry)
-      // we still package everything else (so that scripts and executable are found in 
+      // we still package everything else (so that scripts and executable are found in
       //  Task workdir like distributed Hadoop)
     } else {
-      if(new File(runtimeClasses).isDirectory()) {    
+      if(new File(runtimeClasses).isDirectory()) {
           packageFiles_.add(runtimeClasses);
       } else {
           unjarFiles.add(runtimeClasses);
@@ -377,7 +381,7 @@
       return null;
     }
     File jobJar = File.createTempFile("streamjob", ".jar");
-    System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);    
+    System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);
     if(debug_ == 0) {
       jobJar.deleteOnExit();
     }
@@ -389,7 +393,7 @@
     builder.merge(packageFiles_, unjarFiles, jobJarName);
     return jobJarName;
   }
-  
+
   protected void setJobConf() throws IOException
   {
     msg("hadoopAliasConf_ = " + hadoopAliasConf_);
@@ -403,29 +407,29 @@
     while(it.hasNext()) {
         String pathName = (String)it.next();
         config_.addFinalResource(new Path(pathName));
-    }   
+    }
     // general MapRed job properties
     jobConf_ = new JobConf(config_);
     for(int i=0; i<inputGlobs_.size(); i++) {
       jobConf_.addInputDir(new File((String)inputGlobs_.get(i)));
     }
-    
+
     jobConf_.setInputFormat(StreamInputFormat.class);
-    // for SequenceFile, input classes may be overriden in getRecordReader 
+    // for SequenceFile, input classes may be overriden in getRecordReader
     jobConf_.setInputKeyClass(UTF8.class);
     jobConf_.setInputValueClass(UTF8.class);
-    
+
     jobConf_.setOutputKeyClass(UTF8.class);
     jobConf_.setOutputValueClass(UTF8.class);
     //jobConf_.setCombinerClass();
 
     jobConf_.setOutputDir(new File(output_));
     jobConf_.setOutputFormat(StreamOutputFormat.class);
-    
+
     jobConf_.set("stream.addenvironment", addTaskEnvironment_);
-    
+
     String defaultPackage = this.getClass().getPackage().getName();
-    
+
     Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
     if(c != null) {
       jobConf_.setMapperClass(c);
@@ -434,6 +438,16 @@
       jobConf_.set("stream.map.streamprocessor", mapCmd_);
     }
 
+    if(comCmd_ != null) {
+      c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
+      if(c != null) {
+        jobConf_.setCombinerClass(c);
+      } else {
+        jobConf_.setCombinerClass(PipeCombiner.class);
+        jobConf_.set("stream.combine.streamprocessor", comCmd_);
+      }
+    }
+
     if(redCmd_ != null) {
       c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
       if(c != null) {
@@ -443,13 +457,13 @@
         jobConf_.set("stream.reduce.streamprocessor", redCmd_);
       }
     }
-    
+
     if(inReaderSpec_ != null) {
         String[] args = inReaderSpec_.split(",");
         String readerClass = args[0];
         // this argument can only be a Java class
         c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
-        if(c != null) {            
+        if(c != null) {
             jobConf_.set("stream.recordreader.class", c.getName());
         } else {
             fail("-inputreader: class not found: " + readerClass);
@@ -461,13 +475,13 @@
             jobConf_.set(k, v);
         }
     }
-    
+
     jar_ = packageJobJar();
     if(jar_ != null) {
         jobConf_.setJar(jar_);
     }
 
-    // last, allow user to override anything 
+    // last, allow user to override anything
     // (although typically used with properties we didn't touch)
     it = userJobConfProps_.iterator();
     while(it.hasNext()) {
@@ -475,39 +489,39 @@
         String[] nv = prop.split("=", 2);
         msg("xxxJobConf: set(" + nv[0] + ", " + nv[1]+")");
         jobConf_.set(nv[0], nv[1]);
-    }   
+    }
     msg("submitting to jobconf: " + getJobTrackerHostPort());
   }
-  
+
   protected String getJobTrackerHostPort()
   {
     return jobConf_.get("mapred.job.tracker");
   }
-  
+
   protected void jobInfo()
-  {    
+  {
     if(isLocalHadoop()) {
-      LOG.info("Job running in-process (local Hadoop)"); 
+      LOG.info("Job running in-process (local Hadoop)");
     } else {
       String hp = getJobTrackerHostPort();
-      LOG.info("To kill this job, run:"); 
+      LOG.info("To kill this job, run:");
       LOG.info(getHadoopClientHome() + "/bin/hadoop job  -Dmapred.job.tracker=" + hp + " -kill " + jobId_);
       //LOG.info("Job file: " + running_.getJobFile() );
       LOG.info("Tracking URL: "  + StreamUtil.qualifyHost(running_.getTrackingURL()));
     }
   }
-  
+
   // Based on JobClient
   public void submitAndMonitorJob() throws IOException {
-    
+
     if(jar_ != null && isLocalHadoop()) {
         // getAbs became required when shell and subvm have different working dirs...
         File wd = new File(".").getAbsoluteFile();
         StreamUtil.unJar(new File(jar_), wd);
     }
-    
-    // if jobConf_ changes must recreate a JobClient 
-    jc_ = new JobClient(jobConf_); 
+
+    // if jobConf_ changes must recreate a JobClient
+    jc_ = new JobClient(jobConf_);
     boolean error = true;
     running_ = null;
     String lastReport = null;
@@ -515,8 +529,8 @@
       running_ = jc_.submitJob(jobConf_);
       jobId_ = running_.getJobID();
 
-      LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));     
-      LOG.info("Running job: " + jobId_);      
+      LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));
+      LOG.info("Running job: " + jobId_);
       jobInfo();
 
       while (!running_.isComplete()) {
@@ -548,7 +562,7 @@
       jc_.close();
     }
   }
-  
+
 
   protected boolean mayExit_;
   protected String[] argv_;
@@ -557,7 +571,7 @@
   protected int debug_;
 
   protected Environment env_;
-  
+
   protected String jar_;
   protected boolean localHadoop_;
   protected Configuration config_;
@@ -567,27 +581,28 @@
   // command-line arguments
   protected ArrayList inputGlobs_       = new ArrayList(); // <String>
   protected ArrayList packageFiles_     = new ArrayList(); // <String>
-  protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>  
+  protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>
   protected ArrayList userJobConfProps_ = new ArrayList(); // <String>
   protected String output_;
   protected String mapCmd_;
+  protected String comCmd_;
   protected String redCmd_;
   protected String cluster_;
   protected ArrayList configPath_ = new ArrayList(); // <String>
   protected String hadoopAliasConf_;
   protected String inReaderSpec_;
-  
+
 
   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
   // encoding "a=b c=d"
   protected String addTaskEnvironment_;
-  
+
   protected boolean outputSingleNode_;
   protected long minRecWrittenToEnableSkip_;
-  
+
   protected RunningJob running_;
   protected String jobId_;
-  
-  
+
+
 }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Fri Jul 14 00:55:15 2006
@@ -1,73 +1,73 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.io.File;
-
-import org.apache.hadoop.mapred.*;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hadoop.util.Progressable;
-
-/** Similar to org.apache.hadoop.mapred.TextOutputFormat, 
- * but delimits key and value with a TAB.
- * @author Michel Tourn
- */
-public class StreamOutputFormat implements OutputFormat {
-
-  public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
-                                      String name, Progressable progr) throws IOException {
- 
-    File file = new File(job.getOutputDir(), name);
-
-    final FSDataOutputStream out = fs.create(file);
-
-    return new RecordWriter() {
-        public synchronized void write(WritableComparable key, Writable value)
-          throws IOException {
-          out.write(key.toString().getBytes("UTF-8"));
-          out.writeByte('\t');
-          out.write(value.toString().getBytes("UTF-8"));
-          out.writeByte('\n');
-        }
-        public synchronized void close(Reporter reporter) throws IOException {
-          out.close();
-        }
-      };
-  }
-  
-  
-  /** Check whether the output specification for a job is appropriate.  Called
-   * when a job is submitted.  Typically checks that it does not already exist,
-   * throwing an exception when it already exists, so that output is not
-   * overwritten.
-   *
-   * @param job the job whose output will be written
-   * @throws IOException when output should not be attempted
-   */
-  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
-  {
-    // allow existing data (for app-level restartability)
-  }
-  
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.mapred.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.util.Progressable;
+
+/** Similar to org.apache.hadoop.mapred.TextOutputFormat, 
+ * but delimits key and value with a TAB.
+ * @author Michel Tourn
+ */
+public class StreamOutputFormat implements OutputFormat {
+
+  public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+                                      String name, Progressable progr) throws IOException {
+ 
+    File file = new File(job.getOutputDir(), name);
+
+    final FSDataOutputStream out = fs.create(file);
+
+    return new RecordWriter() {
+        public synchronized void write(WritableComparable key, Writable value)
+          throws IOException {
+          out.write(key.toString().getBytes("UTF-8"));
+          out.writeByte('\t');
+          out.write(value.toString().getBytes("UTF-8"));
+          out.writeByte('\n');
+        }
+        public synchronized void close(Reporter reporter) throws IOException {
+          out.close();
+        }
+      };
+  }
+  
+  
+  /** Check whether the output specification for a job is appropriate.  Called
+   * when a job is submitted.  Typically checks that it does not already exist,
+   * throwing an exception when it already exists, so that output is not
+   * overwritten.
+   *
+   * @param job the job whose output will be written
+   * @throws IOException when output should not be attempted
+   */
+  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
+  {
+    // allow existing data (for app-level restartability)
+  }
+  
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Fri Jul 14 00:55:15 2006
@@ -18,13 +18,13 @@
 
 import junit.framework.TestCase;
 import java.io.*;
+import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /**
  * This class tests hadoopStreaming in MapReduce local mode.
- * It requires the Unix utilities tr and uniq.
  */
 public class TestStreaming extends TestCase
 {
@@ -34,9 +34,12 @@
   String INPUT_FILE = "input.txt";
   String OUTPUT_DIR = "out";
   String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
-  String map = "/usr/bin/tr . \\n"; // split words into lines. Careful with spaces in args
-  String reduce = "/usr/bin/uniq";
-  String outputExpect = "are\t\nblue\t\nbunnies\t\npink\t\nred\t\nroses\t\nviolets\t\n";
+  // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
+  String map = makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
+  // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
+  String combine  = makeJavaCommand(UniqApp.class, new String[]{"C"});
+  String reduce = makeJavaCommand(UniqApp.class, new String[]{"R"});
+  String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
 
   StreamJob job;
 
@@ -60,6 +63,7 @@
     }
     System.out.println("test.build.data=" + antTestDir);
   }
+
   void createInput() throws IOException
   {
     String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked vm
@@ -68,6 +72,37 @@
     out.close();
   }
 
+  public String makeJavaCommand(Class main, String[] argv)
+  {
+    ArrayList vargs = new ArrayList();
+    File javaHomeBin = new File(System.getProperty("java.home"), "bin");
+    File jvm = new File(javaHomeBin, "java");
+    vargs.add(jvm.toString());
+    // copy parent classpath
+    vargs.add("-classpath");
+    vargs.add(System.getProperty("java.class.path"));
+
+    // Add main class and its arguments
+    vargs.add(main.getName());
+    for(int i=0; i<argv.length; i++) {
+      vargs.add(argv[i]);
+    }
+    return collate(vargs, " ");
+  }
+
+  String collate(ArrayList args, String sep)
+  {
+    StringBuffer buf = new StringBuffer();
+    Iterator it = args.iterator();
+    while(it.hasNext()) {
+      if(buf.length() > 0) {
+        buf.append(" ");
+      }
+      buf.append(it.next());
+    }
+    return buf.toString();
+  }
+
   public void testCommandLine()
   {
     try {
@@ -80,11 +115,11 @@
           "-input", INPUT_FILE,
           "-output", OUTPUT_DIR,
           "-mapper", map,
+          "-combiner", combine,
           "-reducer", reduce,
           /*"-debug",*/
           "-verbose"
       };
-
       job = new StreamJob(argv, mayExit);
       job.go();
       File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile();

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=421829&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Fri Jul 14 00:55:15 2006
@@ -0,0 +1,64 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/** A minimal Java implementation of /usr/bin/tr.
+    Used to test the usage of external applications without adding
+    platform-specific dependencies.
+*/
+public class TrApp
+{
+
+  public TrApp(char find, char replace)
+  {
+    this.find = find;
+    this.replace = replace;
+  }
+
+  public void go() throws IOException
+  {
+    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+    String line;
+
+    while ((line = in.readLine()) != null) {
+        String out = line.replace(find, replace);
+        System.out.println(out);
+    }
+  }
+
+  public static void main(String[] args) throws IOException
+  {
+    args[0] = CUnescape(args[0]);
+    args[1] = CUnescape(args[1]);
+    TrApp app = new TrApp(args[0].charAt(0), args[1].charAt(0));
+    app.go();
+  }
+
+  public static String CUnescape(String s)
+  {
+    if(s.equals("\\n")) {
+      return "\n";
+    } else {
+      return s;
+    }
+  }
+  char find;
+  char replace;
+
+}