You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/03 23:00:17 UTC

svn commit: r399432 [1/2] - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/ src/contrib/streaming/src/ src/contrib/streaming/src/java/ src/contrib/streaming/src/java/org/ src/contrib/streaming/src/java/org/apache/ src/contrib/streaming/...

Author: cutting
Date: Wed May  3 14:00:13 2006
New Revision: 399432

URL: http://svn.apache.org/viewcvs?rev=399432&view=rev
Log:
HADOOP-191.  Add streaming contrib package.  Contributed by Michel Tourn.

Added:
    lucene/hadoop/trunk/src/contrib/
    lucene/hadoop/trunk/src/contrib/build-contrib.xml
    lucene/hadoop/trunk/src/contrib/build.xml
    lucene/hadoop/trunk/src/contrib/streaming/
    lucene/hadoop/trunk/src/contrib/streaming/build.xml
    lucene/hadoop/trunk/src/contrib/streaming/src/
    lucene/hadoop/trunk/src/contrib/streaming/src/java/
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
    lucene/hadoop/trunk/src/contrib/streaming/src/test/
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399432&r1=399431&r2=399432&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed May  3 14:00:13 2006
@@ -161,9 +161,15 @@
 42. HADOOP-184. Re-structure some test code to better support testing
     on a cluster.  (Mahadev Konar via cutting)
 
-40. HADOOP-189.  Fix MapReduce in standalone configuration to
+43. HADOOP-189.  Fix MapReduce in standalone configuration to
     correctly handle job jar files that contain a lib directory with
     nested jar files.  (cutting)
+
+44. HADOOP-191  Add streaming package, Hadoop's first contrib module.
+    This permits folks to easily submit MapReduce jobs whose map and
+    reduce functions are implemented by shell commands.  Use
+    'bin/hadoop jar build/hadoop-streaming.jar' to get details.
+    (Michel Tourn via cutting)
 
 
 Release 0.1.1 - 2006-04-08

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/build.xml?rev=399432&r1=399431&r2=399432&view=diff
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Wed May  3 14:00:13 2006
@@ -353,4 +353,24 @@
     <delete dir="${build.dir}"/>
   </target>
 
+  <!-- ================================================================== -->
+  <!-- Contrib targets. For now, they must be called explicitely          -->
+  <!-- Using subant instead of ant as a workaround for 30569              -->
+  <!-- ================================================================== -->
+  <target name="deploy-contrib" depends="compile">
+     <subant target="deploy">        
+        <fileset file="src/contrib/build.xml"/>
+     </subant>  	
+  </target>
+  <target name="test-contrib" depends="compile">
+     <subant target="test">        
+        <fileset file="src/contrib/build.xml"/>
+     </subant>  	
+  </target>
+  <target name="clean-contrib" depends="compile">
+     <subant target="clean">        
+        <fileset file="src/contrib/build.xml"/>
+     </subant>  	
+  </target>
+
 </project>

Added: lucene/hadoop/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/build-contrib.xml?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build-contrib.xml (added)
+++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Wed May  3 14:00:13 2006
@@ -0,0 +1,166 @@
+<?xml version="1.0"?>
+
+<!-- Imported by contrib/*/build.xml files to share generic targets. -->
+
+<project name="hadoopbuildcontrib">
+
+  <property name="name" value="${ant.project.name}"/>
+  <property name="root" value="${basedir}"/>
+
+  <!-- Load all the default properties, and any the user wants    -->
+  <!-- to contribute (without having to type -D or edit this file -->
+  <property file="${user.home}/${name}.build.properties" />
+  <property file="${root}/build.properties" />
+
+  <property name="hadoop.root" location="${root}/../../../"/>
+
+  <property name="src.dir"  location="${root}/src/java"/>
+  <property name="src.test" location="${root}/src/test"/>
+
+  <available file="${src.test}" type="dir" property="test.available"/>
+
+  <property name="conf.dir" location="${hadoop.root}/conf"/>
+
+  <property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
+  <property name="build.classes" location="${build.dir}/classes"/>
+  <property name="build.test" location="${build.dir}/test"/>
+  <!-- all jars together -->
+  <property name="deploy.dir" location="${hadoop.root}/build/"/>
+
+  <property name="javac.deprecation" value="off"/>
+  <property name="javac.debug" value="on"/>
+
+  <property name="javadoc.link"
+            value="http://java.sun.com/j2se/1.4/docs/api/"/>
+
+  <property name="build.encoding" value="ISO-8859-1"/>
+
+  <fileset id="lib.jars" dir="${root}" includes="lib/*.jar"/>
+
+  <!-- the normal classpath -->
+  <path id="classpath">
+    <pathelement location="${build.classes}"/>
+    <fileset refid="lib.jars"/>
+    <pathelement location="${hadoop.root}/build/classes"/>
+    <fileset dir="${hadoop.root}/lib">
+      <include name="*.jar" />
+    </fileset>
+  </path>
+
+  <!-- the unit test classpath -->
+  <path id="test.classpath">
+    <pathelement location="${build.test}" />
+    <pathelement location="${hadoop.root}/build/test/classes"/>
+    <pathelement location="${hadoop.root}/src/test"/>
+    <pathelement location="${conf.dir}"/>
+    <pathelement location="${hadoop.root}/build"/>
+    <path refid="classpath"/>
+  </path>
+
+
+  <!-- to be overridden by sub-projects -->
+  <target name="init-contrib"/>
+
+  <!-- ====================================================== -->
+  <!-- Stuff needed by all targets                            -->
+  <!-- ====================================================== -->
+  <target name="init">
+    <mkdir dir="${build.dir}"/>
+    <mkdir dir="${build.classes}"/>
+    <mkdir dir="${build.test}"/>
+
+    <antcall target="init-contrib"/>
+  </target>
+
+
+  <!-- ====================================================== -->
+  <!-- Compile a Hadoop contrib's files                       -->
+  <!-- ====================================================== -->
+  <target name="compile" depends="init">
+    <echo message="Compiling contrib: ${name} destdir=${build.classes}"/>
+    <javac
+     encoding="${build.encoding}"
+     srcdir="${src.dir}"
+     includes="**/*.java"
+     destdir="${build.classes}"
+     debug="${javac.debug}"
+     deprecation="${javac.deprecation}">
+      <classpath refid="classpath"/>
+    </javac>
+  </target>
+
+  <!-- ================================================================== -->
+  <!-- Compile test code                                                  -->
+  <!-- ================================================================== -->
+  <target name="compile-test" depends="compile" if="test.available">
+    <javac
+     encoding="${build.encoding}"
+     srcdir="${src.test}"
+     includes="**/*.java"
+     destdir="${build.test}"
+     debug="${debug}">
+      <classpath refid="test.classpath"/>
+    </javac>
+  </target>
+  
+
+  <!-- ====================================================== -->
+  <!-- Make a Hadoop contrib's jar                            -->
+  <!-- ====================================================== -->
+  <target name="jar" depends="compile">
+    <jar
+      jarfile="${build.dir}/hadoop-${name}.jar"
+      basedir="${build.classes}"      
+    />
+  </target>
+  
+  <target name="deploy" depends="jar">
+    <mkdir dir="${deploy.dir}"/>
+    <copy file="${build.dir}/hadoop-${name}.jar" todir="${deploy.dir}"/>
+    <!-- <copy todir="${deploy.dir}" flatten="true">
+      <fileset refid="lib.jars"/>
+    </copy> -->
+  </target>
+  
+  <!-- ================================================================== -->
+  <!-- Run unit tests                                                     -->
+  <!-- ================================================================== -->
+  <target name="test" depends="compile-test, deploy" if="test.available">
+    <echo message="Testing contrib: ${name}"/>
+    <junit 
+      printsummary="withOutAndErr" haltonfailure="no" fork="yes"
+      errorProperty="tests.failed" failureProperty="tests.failed">
+      
+      <sysproperty key="test.build.data" value="${build.test}/data"/>
+      
+      <!-- requires fork=yes for: 
+        relative File paths to use the specified user.dir 
+        classpath to use build/contrib/*.jar
+      -->
+      <sysproperty key="user.dir" value="${build.test}/data"/>
+      
+      <classpath refid="test.classpath"/>
+      <formatter type="plain" />
+      <batchtest todir="${build.test}" unless="testcase">
+        <fileset dir="${src.test}"
+                 includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+      </batchtest>
+      <batchtest todir="${build.test}" if="testcase">
+        <fileset dir="${src.test}" includes="**/${testcase}.java"/>
+      </batchtest>
+    </junit>
+    <fail if="tests.failed">Tests failed!</fail>
+
+  </target>
+
+
+  <!-- ================================================================== -->
+  <!-- Clean.  Delete the build files, and their directories              -->
+  <!-- ================================================================== -->
+  <target name="clean">
+    <echo message="Cleaning: ${name} builddir=${build.dir}"/>
+    <delete dir="${build.dir}"/>
+    <!--<delete dir="${deploy.dir}"/> -->
+  </target>
+
+</project>

Added: lucene/hadoop/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/build.xml?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build.xml (added)
+++ lucene/hadoop/trunk/src/contrib/build.xml Wed May  3 14:00:13 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+
+<project name="hadoopcontrib" default="deploy" basedir=".">
+
+  <!-- ====================================================== -->
+  <!-- Build & deploy all the contrib jars.                   -->
+  <!-- ====================================================== -->
+  
+  <target name="deploy">
+    <subant target="deploy">
+      <fileset dir="." includes="*/build.xml"/>
+    </subant>
+  </target>
+  
+  <!-- ====================================================== -->
+  <!-- Test all the contribs.                               -->
+  <!-- ====================================================== -->
+  <target name="test">
+    <subant target="test">
+      <fileset dir="." includes="*/build.xml"/>
+    </subant>
+  </target>
+  
+  
+  <!-- ====================================================== -->
+  <!-- Clean all the contribs.                              -->
+  <!-- ====================================================== -->
+  <target name="clean">
+    <subant target="clean">
+      <fileset dir="." includes="*/build.xml"/>
+    </subant>
+  </target>
+
+</project>

Added: lucene/hadoop/trunk/src/contrib/streaming/build.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/build.xml?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/build.xml (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/build.xml Wed May  3 14:00:13 2006
@@ -0,0 +1,19 @@
+<?xml version="1.0"?>
+
+<project name="streaming" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+  <!-- Override jar target to specify main class -->
+  <target name="jar" depends="compile">
+    <jar
+      jarfile="${build.dir}/hadoop-${name}.jar"
+      basedir="${build.classes}"      
+    >
+  	<manifest>
+	    <attribute name="Main-Class" value="org.apache.hadoop.streaming.HadoopStreaming"/>
+	</manifest>
+    </jar>
+  </target>
+  
+</project>

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java Wed May  3 14:00:13 2006
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.*;
+
+public class Environment extends Properties
+{
+   public Environment()
+      throws IOException
+   {
+      // Extend this code to fit all operating
+      // environments that you expect to run in
+
+      String command = null;
+      String OS = System.getProperty("os.name");
+      if (OS.equals("Windows NT")) {
+         command = "cmd /C set";
+      } else if (OS.indexOf("ix") > -1 || OS.indexOf("inux") > -1) {
+         command = "env";
+      } else {
+         // Add others here
+      }
+
+      if (command == null) {
+         throw new RuntimeException("Operating system " + OS
+            + " not supported by this class");
+      }
+
+      // Read the environment variables
+
+      Process pid = Runtime.getRuntime().exec(command);
+      BufferedReader in =
+         new BufferedReader(
+         new InputStreamReader(
+         pid.getInputStream()));
+      while(true) {
+         String line = in.readLine();
+         if (line == null)
+            break;
+         int p = line.indexOf("=");
+         if (p != -1) {
+            String name = line.substring(0, p);
+            String value = line.substring(p+1);
+            setProperty(name, value);
+         }
+      }
+      in.close();
+      try {
+         pid.waitFor();
+      }
+      catch (InterruptedException e) {
+         throw new IOException(e.getMessage());
+      }
+   }
+   
+   // to be used with Runtime.exec(String[] cmdarray, String[] envp) 
+   String[] toArray()
+   {
+     String[] arr = new String[super.size()];
+     Enumeration it = super.keys();
+     int i = -1;
+     while(it.hasMoreElements()) {
+        String key = (String)it.nextElement();
+        String val = (String)get(key);
+        i++;   
+        arr[i] = key + "=" + val;
+     }     
+     return arr;
+   }
+} 
\ No newline at end of file

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Wed May  3 14:00:13 2006
@@ -0,0 +1,31 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+/** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
+*/
+public class HadoopStreaming 
+{
+  public static void main(String[] args) throws IOException
+  {
+    boolean mayExit = true;
+    StreamJob job = new StreamJob(args, mayExit);
+    job.go();
+  }  
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java Wed May  3 14:00:13 2006
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.*;
+import java.util.jar.*;
+import java.util.zip.ZipException;
+
+public class JarBuilder
+{
+    public JarBuilder()
+    {
+    }
+
+    public void setVerbose(boolean v)
+    {
+        this.verbose = v;
+    }
+    
+    public void merge(List srcNames, List srcUnjar, String dstJar) 
+        throws IOException
+    {
+        String source = null;
+        JarOutputStream jarOut = null;
+        JarFile jarSource = null;
+        jarOut = new JarOutputStream(new FileOutputStream(dstJar));
+        boolean throwing = false;
+        try {
+          if(srcNames != null) {
+            Iterator iter = srcNames.iterator(); 
+            while(iter.hasNext()) {
+                source = (String)iter.next();
+                File fsource = new File(source);
+                String base = getBasePathInJarOut(source);
+                if(!fsource.exists()) {
+                    throwing = true;
+                    throw new FileNotFoundException(fsource.getAbsolutePath());
+                }
+                if(fsource.isDirectory()) {
+                    addDirectory(jarOut, base, fsource, 0);
+                } else {
+                    addFileStream(jarOut, base, fsource);
+                }
+            }
+          }
+          if(srcUnjar != null) {
+            Iterator iter = srcUnjar.iterator(); 
+            while(iter.hasNext()) {
+                source = (String)iter.next();
+                jarSource = new JarFile(source);
+                addJarEntries(jarOut, jarSource);
+                jarSource.close();
+            }
+          
+          }
+        } finally {
+            try {
+              jarOut.close();
+            } catch(ZipException z) {
+                if(! throwing) {
+                    throw new IOException(z.toString());
+                }
+            }
+        }
+    }
+
+    protected String fileExtension(String file)
+    {
+    	int leafPos = file.lastIndexOf('/');
+    	if(leafPos == file.length()-1) return "";
+        String leafName = file.substring(leafPos+1);
+    	int dotPos = leafName.lastIndexOf('.');
+    	if(dotPos == -1) return "";
+        String ext = leafName.substring(dotPos+1);
+    	return ext;
+    }
+    
+    /** @return empty or a jar base path. Must not start with '/' */
+    protected String getBasePathInJarOut(String sourceFile)
+    {
+        // TaskRunner will unjar and append to classpath: .:classes/:lib/*    	
+    	String ext = fileExtension(sourceFile);
+    	if(ext.equals("class")) {
+    		return "classes/"; // or ""
+        } else if(ext.equals("jar") || ext.equals("zip")) {
+    		return "lib/";
+    	} else {
+            return "";
+        }
+    }
+    
+    private void addJarEntries(JarOutputStream dst, JarFile src)
+        throws IOException
+    {
+        Enumeration entries = src.entries();
+        JarEntry entry = null;
+        while(entries.hasMoreElements()) {
+            entry = (JarEntry)entries.nextElement();
+            //if(entry.getName().startsWith("META-INF/")) continue; 
+            InputStream in = src.getInputStream(entry);
+            addNamedStream(dst, entry.getName(), in);
+        }
+    }
+    
+    /** @param name path in jar for this jar element. Must not start with '/' */
+    void addNamedStream(JarOutputStream dst, String name, InputStream in) 
+        throws IOException
+    {
+        if(verbose) {
+            System.err.println("JarBuilder.addNamedStream " + name);
+        }
+        try {
+          dst.putNextEntry(new JarEntry(name));
+          int bytesRead = 0;
+          while((bytesRead = in.read(buffer, 0, BUFF_SIZE)) != -1) {
+              dst.write(buffer, 0, bytesRead);
+          }
+        } catch(ZipException ze) {
+            if(ze.getMessage().indexOf("duplicate entry") >= 0) {
+              if(verbose) {
+                  System.err.println(ze + " Skip duplicate entry " + name);
+              }
+            } else {
+                throw ze;
+            }
+        } finally {
+          in.close();
+          dst.flush();
+          dst.closeEntry();        
+        }
+    }
+
+    void addFileStream(JarOutputStream dst, String jarBaseName, File file) 
+        throws IOException 
+    {
+    	FileInputStream in = new FileInputStream(file);
+    	String name = jarBaseName + file.getName();
+    	addNamedStream(dst, name, in);
+    	in.close();
+    }
+    
+    void addDirectory(JarOutputStream dst, String jarBaseName, File dir, int depth) 
+        throws IOException
+    {
+    	File[] contents = dir.listFiles();
+    	if(contents != null) {
+    		for(int i=0; i<contents.length; i++) {
+    			File f = contents[i];
+    			String fBaseName = (depth==0) ? "" : dir.getName();
+    			if(jarBaseName.length()>0) {
+    				fBaseName = jarBaseName + "/" + fBaseName;
+    			}
+    			if(f.isDirectory()) {
+    				addDirectory(dst, fBaseName, f, depth+1);
+    			} else {
+    				addFileStream(dst, fBaseName+"/", f);
+    			}
+    		}
+    	}
+    }
+
+    /** Test program */    
+    public static void main(String args[])
+    {
+        // args = new String[] { "C:/Temp/merged.jar", "C:/jdk1.5.0/jre/lib/ext/dnsns.jar",  "/Temp/addtojar2.log", "C:/jdk1.5.0/jre/lib/ext/mtest.jar", "C:/Temp/base"};
+        if(args.length < 2) {
+            System.err.println("Usage: JarFiles merged.jar [src.jar | dir | file ]+");
+        } else {
+            JarBuilder jarFiles = new JarBuilder();
+            List names = new ArrayList();
+            List unjar = new ArrayList();
+            for(int i = 1; i < args.length; i++) {
+                String f = args[i];
+                String ext = jarFiles.fileExtension(f);
+                boolean expandAsJar = ext.equals("jar") || ext.equals("zip");                
+                if(expandAsJar) {
+                    unjar.add(f);
+                } else {
+                    names.add(f);
+                }                
+            }
+            try {
+                jarFiles.merge(names, unjar, args[0]);
+                Date lastMod = new Date(new File(args[0]).lastModified());
+                System.out.println("Merge done to " + args[0] + " " + lastMod);
+            } catch(Exception ge) {
+                ge.printStackTrace(System.err);
+            }
+        }
+    }
+    
+    private static final int BUFF_SIZE = 32*1024;
+    private byte buffer[] = new byte[BUFF_SIZE];
+    protected boolean verbose = false;
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java Wed May  3 14:00:13 2006
@@ -0,0 +1,100 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * A simulation of some Java SE 6 File methods.
+ * http://java.sun.com/developer/technicalArticles/J2SE/Desktop/mustang/enhancements/
+ *
+ * Limitations of this version: requires Cygwin on Windows, does not perform SecurityManager checks,
+ *  always returns true (success) without verifying that the operation worked.
+ *
+ * Note: not specifying ownerOnly maps to ownerOnly = false
+ * From man chmod: If no user specs are given, the effect is as if `a' were given. 
+ * 
+ */
+public class MustangFile extends File
+{
+
+    public MustangFile(File parent, String child)
+    {
+      super(parent, child);
+    }
+
+    public MustangFile(String pathname)
+    {
+      super(pathname);
+    }
+
+    public MustangFile(String parent, String child) 
+    {
+      super(parent, child);
+    }
+
+    public boolean setReadable(boolean readable, boolean ownerOnly) 
+    {
+      chmod("r", readable, ownerOnly);
+      return SUCCESS;
+    }
+
+    public boolean setReadable(boolean readable)
+    {
+      chmod("r", readable, false);
+      return SUCCESS;
+    }
+
+    public boolean setWritable(boolean writable, boolean ownerOnly) 
+    {
+      chmod("w", writable, ownerOnly);
+      return SUCCESS;
+    }
+    
+    public boolean setWritable(boolean writable) 
+    {
+      chmod("w", writable, false);
+      return SUCCESS;
+    }
+
+    public boolean setExecutable(boolean executable, boolean ownerOnly) 
+    {
+      chmod("x", executable, ownerOnly);
+      return SUCCESS;
+    }
+    
+    public boolean setExecutable(boolean executable)
+    {
+      chmod("x", executable, false);
+      return SUCCESS;
+    }
+    
+    void chmod(String perms, boolean plus, boolean ownerOnly)
+    {
+       String[] argv = new String[3];
+       argv[0] = "/bin/chmod";
+       String spec = ownerOnly ? "u" : "ugoa";
+       spec += (plus ? "+" : "-");
+       spec += perms;
+       argv[1] = spec;
+       argv[2] = getAbsolutePath();
+       StreamUtil.exec(argv, System.err);
+    }
+    
+    final static boolean SUCCESS = true;
+}    

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Wed May  3 14:00:13 2006
@@ -0,0 +1,443 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.nio.channels.*;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.regex.*;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+
+/** Shared functionality for PipeMapper, PipeReducer.
+ *  @author Michel Tourn
+ */
+public abstract class PipeMapRed {
+
+  /** The command to be spawned as a subprocess.
+   * Mapper/Reducer operations will delegate to it
+   */
+  abstract String getPipeCommand(JobConf job);
+  /*
+  */
+  abstract String getKeyColPropName();
+  
+
+  /**
+   * @returns ow many TABS before the end of the key part 
+   * usually: 1 or "ALL"
+   * used both for tool output of both Map and Reduce
+   * configured via tool's argv: splitKeyVal=ALL or 1..
+   * although it is interpreted here, not by tool
+   */
+  int getKeyColsFromPipeCommand(String cmd)
+  {          
+    String key = getKeyColPropName();
+    Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
+    Matcher match = kcPat.matcher(cmd);
+    String kc;
+    if(!match.matches()) {
+      kc = null;
+    } else {
+      kc = match.group(1);
+    }
+
+    int cols;
+    if(kc== null) {
+      // default value is 1 and the Stream applications could instead 
+      // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
+      cols = 1;
+    } else if(kc.equals("ALL")) {
+      cols = ALL_COLS;
+    } else {
+      try {
+        cols = Integer.parseInt(kc);    
+      } catch(NumberFormatException nf) {
+        cols = Integer.MAX_VALUE;
+      }
+    }
+
+    System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
+    
+    return cols;
+  }
+  
+  String[] splitArgs(String args)
+  {
+    String regex = "\\s(?=(?:[^\"]*\"[^\"]*\")*[^\"]*\\z)";
+    String[] split = args.split(regex);
+    // remove outer quotes
+    for(int i=0; i<split.length; i++) {
+        String si = split[i].trim();
+        if(si.charAt(0)=='"' && si.charAt(si.length()-1)=='"') {
+            si = si.substring(1, si.length()-1);
+            split[i] = si;
+        }
+    }
+    return split;
+  }
+  public void configure(JobConf job)
+  {
+
+    try {
+      String argv = getPipeCommand(job);
+      keyCols_ = getKeyColsFromPipeCommand(argv);
+      
+      doPipe_ = (argv != null);
+      if(!doPipe_) return;
+
+      setStreamJobDetails(job);
+      setStreamProperties();
+            
+      String[] argvSplit = splitArgs(argv);
+      String prog = argvSplit[0];
+      String userdir = System.getProperty("user.dir");
+      if(new File(prog).isAbsolute()) {
+        // we don't own it. Hope it is executable
+      } else {
+        new MustangFile(prog).setExecutable(true, true);
+      }
+      
+      // argvSplit[0]: 
+      // An absolute path should be a preexisting valid path on all TaskTrackers
+	  // A  relative path should match in the unjarred Job data
+      // In this case, force an absolute path to make sure exec finds it.
+      argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
+      log_.println("PipeMapRed exec " + Arrays.toString(argvSplit));
+            
+      
+      Environment childEnv = (Environment)StreamUtil.env().clone();
+      addEnvironment(childEnv, job.get("stream.addenvironment"));
+      sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
+      
+      /* // This way required jdk1.5
+      ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
+      Map<String, String> env = processBuilder.environment();
+      addEnvironment(env, job.get("stream.addenvironment"));
+      sim = processBuilder.start();
+      */
+      
+      clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
+      clientIn_  = new BufferedReader(new InputStreamReader(sim.getInputStream()));
+      clientErr_ = new DataInputStream(sim.getErrorStream());
+      doneLock_  = new Object();
+      
+    } catch(Exception e) {
+        e.printStackTrace();
+        e.printStackTrace(log_);
+    } 
+  }
+  
+  void setStreamJobDetails(JobConf job)
+  {
+    jobLog_ = job.get("stream.jobLog_");
+    String s = job.get("stream.minRecWrittenToEnableSkip_");
+    if(s != null) {
+      minRecWrittenToEnableSkip_ = Long.parseLong(s);
+      log_.println("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+    }
+  }
+  
+  void setStreamProperties()
+  {
+    taskid_ = System.getProperty("stream.taskid");
+    if(taskid_ == null) {
+      taskid_ = "noid" + System.currentTimeMillis();
+    }
+    String s = System.getProperty("stream.port");
+    if(s != null) {
+      reportPortPlusOne_ = Integer.parseInt(s);
+    }
+    
+  }
+    
+  void addEnvironment(Properties env, String nameVals)
+  {
+    // encoding "a=b c=d" from StreamJob
+    if(nameVals == null) return;
+    String[] nv = nameVals.split(" ");
+    for(int i=0; i<nv.length; i++) {
+      String[] pair = nv[i].split("=", 2);
+      if(pair.length != 2) {
+        log_.println("Skip ev entry:" + nv[i]);
+      } else {
+        log_.println("Add  ev entry:" + nv[i]);
+        env.put(pair[0], pair[1]);
+      }
+    }
+  }
+  
+  /** .. and if successful: delete the task log */
+  void appendLogToJobLog(String status)
+  {
+    if(jobLog_ == null) {
+      return; // not using a common joblog
+    }
+    StreamUtil.exec("/bin/rm " + LOGNAME, log_);
+    // TODO socket-based aggregator (in JobTrackerInfoServer)
+  }
+  
+  
+  void startOutputThreads(OutputCollector output, Reporter reporter)
+  {
+      outputDone_ = false;
+      errorDone_ = false;
+      outThread_ = new MROutputThread(output, reporter);
+      outThread_.start();
+      errThread_ = new MRErrorThread(reporter);
+      errThread_.start();
+  }
+    
+  void splitKeyVal(String line, UTF8 key, UTF8 val)
+  {
+    int pos;
+    if(keyCols_ == ALL_COLS) {
+      pos = -1;
+    } else {
+      pos = line.indexOf('\t');
+    }    
+    if(pos == -1) {
+      key.set(line);
+      val.set("");      
+    } else {
+      key.set(line.substring(0, pos));
+      val.set(line.substring(pos+1));
+    }
+  }
+
+  class MROutputThread extends Thread
+  {
+    MROutputThread(OutputCollector output, Reporter reporter)
+    {
+      setDaemon(true);
+      this.output = output;
+      this.reporter = reporter;
+    }
+    public void run() {
+      try {
+            try {
+              UTF8 EMPTY = new UTF8("");
+              UTF8 key = new UTF8();
+              UTF8 val = new UTF8();
+              // 3/4 Tool to Hadoop
+              while((answer = clientIn_.readLine()) != null) {
+                // 4/4 Hadoop out 
+                splitKeyVal(answer, key, val);
+                output.collect(key, val);
+                numRecWritten_++;
+                if(numRecWritten_ % 100 == 0) {
+                  log_.println(numRecRead_+"/"+numRecWritten_);
+                  log_.flush();
+                }
+              }
+            } catch(IOException io) {
+              io.printStackTrace(log_);
+            }
+            log_.println("MROutputThread done");
+      } finally {
+          outputDone_ = true;
+          synchronized(doneLock_) {
+            doneLock_.notifyAll();
+          }
+      }
+    }
+    OutputCollector output;
+    Reporter reporter;
+    String answer;
+  }
+
+  class MRErrorThread extends Thread
+  {
+    public MRErrorThread(Reporter reporter)
+    {
+      this.reporter = reporter;
+      setDaemon(true);
+    }
+    public void run()
+    {
+      String line;
+      try {
+        long num = 0;
+        int bucket = 100;
+        while((line=clientErr_.readLine()) != null) {
+          num++;
+          log_.println(line);
+          if(num < 10) {
+            String hline = "MRErr: " + line;
+            System.err.println(hline);
+            reporter.setStatus(hline);
+          }
+        }
+      } catch(IOException io) {
+        io.printStackTrace(log_);
+      } finally {
+        errorDone_ = true;
+        synchronized(doneLock_) {
+          doneLock_.notifyAll();
+        }
+      }
+    }
+    Reporter reporter;
+  }
+
+  public void mapRedFinished()
+  {
+    log_.println("mapRedFinished");
+    try {
+    if(!doPipe_) return;
+    try {
+      if(clientOut_ != null) {
+      	clientOut_.close();
+      }
+    } catch(IOException io) {
+    }
+    if(outThread_ == null) {
+      // no input records: threads were never spawned
+    } else {
+      try {
+        while(!outputDone_ || !errorDone_) {
+          synchronized(doneLock_) {
+            doneLock_.wait();
+          }
+        }
+      } catch(InterruptedException ie) {
+        ie.printStackTrace();
+      }
+    }
+      sim.destroy();
+    } catch(RuntimeException e) {
+      e.printStackTrace(log_);
+      throw e;
+    }
+  }
+  
+  void maybeLogRecord()
+  {
+    if(numRecRead_ >= nextRecReadLog_) {
+      log_.println(numRecInfo());
+      log_.flush();      
+      nextRecReadLog_ *= 10;
+      //nextRecReadLog_ += 1000;
+    }    
+  }
+  
+  public String getContext()
+  {
+    
+    String s = numRecInfo() + "\n";
+    s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
+    s += "LOGNAME=" + LOGNAME + "\n";
+    s += envline("HOST");
+    s += envline("USER");
+    s += envline("HADOOP_USER");
+    //s += envline("PWD"); // =/home/crawler/hadoop/trunk 
+    s += "last Hadoop input: |" + mapredKey_ + "|\n";
+    s += "last tool output: |" + outThread_.answer + "|\n";
+    s += "Date: " + new Date() + "\n";
+    // s += envline("HADOOP_HOME");
+    // s += envline("REMOTE_HOST");
+    return s;
+  }
+  
+  String envline(String var)
+  {
+    return var + "=" + StreamUtil.env().get(var) + "\n";
+  }
+  
+  String numRecInfo()
+  {
+    return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_;
+  }
+  String logFailure(Exception e)
+  {
+      StringWriter sw = new StringWriter();
+      PrintWriter pw = new PrintWriter(sw);
+      e.printStackTrace(pw);    
+      String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
+      log_.println(msg);
+      return msg;  
+  }
+    
+
+  long numRecRead_ = 0;
+  long numRecWritten_ = 0;
+  long numRecSkipped_ = 0;
+  
+  long nextRecReadLog_ = 1;
+  
+  long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
+  
+  int keyCols_;
+  final static int ALL_COLS = Integer.MAX_VALUE;
+  
+  // generic MapRed parameters passed on by hadoopStreaming
+  String taskid_;
+  int reportPortPlusOne_;
+
+  boolean doPipe_;
+  
+  Process sim;
+  Object doneLock_;
+  MROutputThread outThread_;
+  MRErrorThread errThread_;
+  boolean outputDone_;
+  boolean errorDone_;
+  DataOutputStream clientOut_;
+  DataInputStream  clientErr_;
+  BufferedReader   clientIn_;
+
+  String jobLog_;
+  // set in PipeMapper/PipeReducer subclasses
+  String mapredKey_;
+  int numExceptions_;
+  
+  String LOGNAME;
+  PrintStream log_;
+  
+  { // instance initializer
+    try {
+      int id = (int)((System.currentTimeMillis()/2000) % 10);
+      String sid = id+ "." + StreamUtil.env().get("USER");
+      LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
+      log_ = new PrintStream(new FileOutputStream(LOGNAME));
+      log_.println(new java.util.Date());
+      log_.flush();
+    } catch(IOException io) {
+      System.err.println("LOGNAME=" + LOGNAME);
+      io.printStackTrace();
+    } finally {
+      if(log_ == null) {
+        log_ = System.err;
+      }
+    }    
+  }
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Wed May  3 14:00:13 2006
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Mapper bridge.
+ *  It delegates operations to an external program via stdin and stdout.
+ *  @author Michel Tourn
+ */
+public class PipeMapper extends PipeMapRed implements Mapper
+{
+
+  String getPipeCommand(JobConf job)
+  {
+    return job.get("stream.map.streamprocessor");
+  }
+
+  String getKeyColPropName()
+  {
+    return "mapKeyCols";
+  }  
+
+
+  // Do NOT declare default constructor
+  // (MapRed creates it reflectively)
+
+  public void map(WritableComparable key, Writable value,
+                  OutputCollector output, Reporter reporter)
+    throws IOException
+  {
+    // init
+    if(outThread_ == null) {
+      startOutputThreads(output, reporter);
+    }
+    try {
+      // 1/4 Hadoop in
+      mapredKey_ = key.toString();
+      numRecRead_++;
+
+      maybeLogRecord();
+
+      // 2/4 Hadoop to Tool
+      if(numExceptions_==0) {
+        clientOut_.writeBytes(mapredKey_);
+        clientOut_.writeBytes("\t");
+        clientOut_.writeBytes(value.toString());
+        clientOut_.writeBytes("\n");
+        clientOut_.flush();
+      } else {
+        numRecSkipped_++;
+      }
+    } catch(IOException io) {
+      numExceptions_++;
+      if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
+        // terminate with failure
+        String msg = logFailure(io);
+        appendLogToJobLog("failure");
+        throw new IOException(msg);
+      } else {
+        // terminate with success:
+        // swallow input records although the stream processor failed/closed
+      }
+    }
+  }
+  
+  
+  public void close()
+  {
+    appendLogToJobLog("success");
+    mapRedFinished();
+  }
+
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Wed May  3 14:00:13 2006
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Reducer bridge.
+ *  It delegates operations to an external program via stdin and stdout.
+ *  @author Michel Tourn
+ */
+public class PipeReducer extends PipeMapRed implements Reducer
+{
+
+  String getPipeCommand(JobConf job)
+  {
+    return job.get("stream.reduce.streamprocessor");
+  }
+
+  String getKeyColPropName()
+  {
+    return "reduceKeyCols";
+  }  
+  
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
+
+    // init
+    if(doPipe_ && outThread_ == null) {
+      startOutputThreads(output, reporter);
+    }
+    try {
+      while (values.hasNext()) {
+        Writable val = (Writable)values.next();
+        numRecRead_++;
+        maybeLogRecord();
+        if(doPipe_) {
+          clientOut_.writeBytes(key.toString());
+          clientOut_.writeBytes("\t");
+          clientOut_.writeBytes(val.toString());
+          clientOut_.writeBytes("\n");
+          clientOut_.flush();
+        } else {
+          // "identity reduce"
+          output.collect(key, val);
+        }
+      }
+    } catch(IOException io) {
+      appendLogToJobLog("failure");
+      throw new IOException(getContext() + io.getMessage());    
+    }
+  }
+
+  public void close()
+  {
+    appendLogToJobLog("success");
+    mapRedFinished();
+  }
+
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Wed May  3 14:00:13 2006
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.LogFormatter;
+
+/** 
+ * Shared functionality for hadoopStreaming formats.
+ * A custom reader can be defined to be a RecordReader with the constructor below
+ * and is selected with the option bin/hadoopStreaming -inputreader ...
+ * @see StreamLineRecordReader
+ * @see StreamXmlRecordReader 
+ * @author Michel Tourn
+ */
+public abstract class StreamBaseRecordReader implements RecordReader
+{
+    
+  protected static final Logger LOG = LogFormatter.getLogger(StreamBaseRecordReader.class.getName());
+
+  public StreamBaseRecordReader(
+    FSDataInputStream in, long start, long end, 
+    String splitName, Reporter reporter, JobConf job)
+    throws IOException
+  {
+    in_ = in;
+    start_ = start;
+    splitName_ = splitName;
+    end_ = end;
+    reporter_ = reporter;
+    job_ = job;
+  }
+
+  /** Called once before the first call to next */
+  public void init() throws IOException
+  {
+    seekNextRecordBoundary();
+  }
+  
+  /** Implementation should seek forward in_ to the first byte of the next record.
+   *  The initial byte offset in the stream is arbitrary.
+   */
+  public abstract void seekNextRecordBoundary() throws IOException;
+  
+  
+  /** Read a record. Implementation should call numRecStats at the end
+   */  
+  public abstract boolean next(Writable key, Writable value) throws IOException;
+
+  
+  void numRecStats(CharSequence record) throws IOException
+  {
+    numRec_++;          
+    if(numRec_ == nextStatusRec_) {
+      nextStatusRec_ +=100000;//*= 10;
+      String status = getStatus(record);
+      LOG.info(status);
+      reporter_.setStatus(status);
+    }
+  }
+
+ long lastMem =0;
+ String getStatus(CharSequence record)
+ {
+    long pos = -1;
+    try { 
+      pos = getPos();
+    } catch(IOException io) {
+    }
+    final int M = 2000;
+    String recStr;
+    if(record.length() > M) {
+    	recStr = record.subSequence(0, M) + "...";
+    } else {
+    	recStr = record.toString();
+    }
+    String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " Processing record=" + recStr;
+    status += " " + splitName_;
+    return status;
+  }
+
+  /** Returns the current position in the input. */
+  public synchronized long getPos() throws IOException 
+  { 
+    return in_.getPos(); 
+  }
+
+  /** Close this to future operations.*/
+  public synchronized void close() throws IOException 
+  { 
+    in_.close(); 
+  }
+
+  FSDataInputStream in_;
+  long start_;
+  long end_;
+  String splitName_;
+  Reporter reporter_;
+  JobConf job_;
+  int numRec_ = 0;
+  int nextStatusRec_ = 1;
+  
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Wed May  3 14:00:13 2006
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.LogFormatter;
+
+
+/** An input format that performs globbing on DFS paths and 
+ * selects a RecordReader based on a JobConf property.
+ * @author Michel Tourn
+ */
+public class StreamInputFormat extends InputFormatBase
+{
+
+  // an InputFormat should be public with the synthetic public default constructor
+  // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
+  
+  protected static final Logger LOG = LogFormatter.getLogger(StreamInputFormat.class.getName());
+  static {
+    //LOG.setLevel(Level.FINE);
+  }
+  
+  protected Path[] listPaths(FileSystem fs, JobConf job)
+    throws IOException
+  {
+    Path[] globs = job.getInputPaths();
+    ArrayList list = new ArrayList();
+    int dsup = globs.length;
+    for(int d=0; d<dsup; d++) {
+      String leafName = globs[d].getName();
+      LOG.fine("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
+      Path[] paths; Path dir;
+	  PathFilter filter = new GlobFilter(fs, leafName);
+	  dir = new Path(globs[d].getParent().toString());
+      if(dir == null) dir = new Path(".");
+	  paths = fs.listPaths(dir, filter);
+      list.addAll(Arrays.asList(paths));
+    }
+    return (Path[])list.toArray(new Path[]{});
+  }
+
+  class GlobFilter implements PathFilter
+  {
+    public GlobFilter(FileSystem fs, String glob)
+    {
+      fs_ = fs;
+      pat_ = Pattern.compile(globToRegexp(glob));
+    }
+    String globToRegexp(String glob)
+	{
+	  return glob.replaceAll("\\*", ".*");
+	}
+
+    public boolean accept(Path pathname)
+    {
+      boolean acc = !fs_.isChecksumFile(pathname);
+      if(acc) {
+      	acc = pat_.matcher(pathname.getName()).matches();
+      }
+      LOG.finer("matches " + pat_ + ", " + pathname + " = " + acc);
+      return acc;
+    }
+	
+	Pattern pat_;
+    FileSystem fs_;
+  }
+
+  public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
+                                      JobConf job, Reporter reporter)
+    throws IOException {
+    LOG.finer("getRecordReader start.....");
+    reporter.setStatus(split.toString());
+
+    final long start = split.getStart();
+    final long end = start + split.getLength();
+
+    String splitName = split.getFile() + ":" + start + "-" + end;
+    final FSDataInputStream in = fs.open(split.getFile());
+    
+    // will open the file and seek to the start of the split
+    // Factory dispatch based on available params..    
+    Class readerClass;
+    String c = job.get("stream.recordreader.class");
+    if(c == null) {
+      readerClass = StreamLineRecordReader.class;
+    } else {
+      readerClass = StreamUtil.goodClassOrNull(c, null);
+      if(readerClass == null) {
+        throw new RuntimeException("Class not found: " + c);
+      }    
+    }
+    
+    Constructor ctor;
+    try {
+      // reader = new StreamLineRecordReader(in, start, end, splitName, reporter, job);
+      ctor = readerClass.getConstructor(new Class[]{
+        FSDataInputStream.class, long.class, long.class, String.class, Reporter.class, JobConf.class});
+    } catch(NoSuchMethodException nsm) {
+      throw new RuntimeException(nsm);
+    }
+
+    
+    StreamBaseRecordReader reader;
+    try {
+        reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
+            in, new Long(start), new Long(end), splitName, reporter, job});        
+    } catch(Exception nsm) {
+      throw new RuntimeException(nsm);
+    }
+        
+	reader.init();
+    
+    return reader;
+  }
+  
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Wed May  3 14:00:13 2006
@@ -0,0 +1,510 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.logging.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+
+import org.apache.hadoop.util.LogFormatter;
+
+/** All the client-side work happens here. 
+ * (Jar packaging, MapRed job submission and monitoring)
+ * @author Michel Tourn
+ */
+public class StreamJob
+{
+  protected static final Logger LOG = LogFormatter.getLogger(StreamJob.class.getName());
+    
+  public StreamJob(String[] argv, boolean mayExit)
+  {
+    argv_ = argv;
+    mayExit_ = mayExit;    
+  }
+  
+  public void go() throws IOException
+  {
+    init();
+    
+    preProcessArgs();
+    parseArgv();
+    postProcessArgs();
+    
+    setJobConf();
+    submitAndMonitorJob();
+  }
+
+  protected void init()
+  {
+     try {
+        env_ = new Environment();
+     } catch(IOException io) {
+        throw new RuntimeException(io);
+     }
+  }
+  
+  void preProcessArgs()
+  {
+    verbose_ = false;
+  }
+  
+  void postProcessArgs()
+  {
+    if(cluster_ == null) {
+        // hadoop-default.xml is standard, hadoop-local.xml is not.
+        cluster_ = "default";
+    }
+    hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml";
+    if(inputGlobs_.size() == 0) {
+        fail("Required argument: -input <name>");
+    }
+    if(output_ == null) {
+        fail("Required argument: -output ");
+    }
+    // careful with class names..
+    mapCmd_ = packageOrTrimNoShip(mapCmd_);
+    redCmd_ = packageOrTrimNoShip(redCmd_);
+    
+    // TBD -D format or sthg on cmdline. 
+    // Plus maybe a standard list originating on client or server    
+    addTaskEnvironment_ = ""; 
+  }
+  
+  String packageOrTrimNoShip(String cmd)
+  {
+    if(cmd == null) {
+      //    
+    } else if(cmd.startsWith(NOSHIP)) {
+      // don't package the file, but keep the abolute path
+      cmd = cmd.substring(NOSHIP.length());
+    } else {
+      String prog = cmd;
+      String args = "";
+      int s = cmd.indexOf(" ");
+      if(s != -1) {
+        prog = cmd.substring(0, s);
+        args = cmd.substring(s+1);
+      }
+      packageFiles_.add(new File(prog).getAbsolutePath());
+      // Change path to simple filename. 
+      // That way when PipeMapRed calls Runtime.exec(), 
+      // it will look for the excutable in Task's working dir.
+      // And this is where TaskRunner unjars our job jar.
+      prog = new File(prog).getName();
+      if(args.length() > 0) {
+        cmd = prog + " " + args;
+      } else {
+        cmd = prog;
+      }
+    }
+    return cmd;
+  }
+  
+  String getHadoopAliasConfFile()
+  {
+    return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
+  }
+  
+  void parseArgv()
+  {
+    if(argv_.length==0) {
+      exitUsage();
+    }
+    int i=0; 
+    while(i < argv_.length) {
+      String s;
+      if(argv_[i].equals("-verbose")) {
+        verbose_ = true;      
+      } else if(argv_[i].equals("-debug")) {
+        debug_++;
+      } else if((s = optionArg(argv_, i, "-input", false)) != null) {
+        i++;
+        inputGlobs_.add(s);
+      } else if((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
+        i++;
+        output_ = s;
+      } else if((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
+        i++;
+        mapCmd_ = s;
+      } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
+        i++;
+        redCmd_ = s;
+      } else if((s = optionArg(argv_, i, "-files", false)) != null) {
+        i++;
+        packageFiles_.add(s);
+      } else if((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
+        i++;
+        cluster_ = s;
+      } else if((s = optionArg(argv_, i, "-config", false)) != null) {
+        i++;
+        configPath_.add(s);
+      } else if((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
+        i++;
+        inReaderSpec_ = s;
+      } else {
+        System.err.println("Unexpected argument: " + argv_[i]);
+        exitUsage();
+      }
+      i++;
+    }
+  }
+  
+  String optionArg(String[] args, int index, String arg, boolean argSet)
+  {
+    if(index >= args.length || ! args[index].equals(arg)) {
+      return null;
+    }
+    if(argSet) {
+      throw new IllegalArgumentException("Can only have one " + arg + " option");
+    }
+    if(index >= args.length-1) {
+      throw new IllegalArgumentException("Expected argument after option " + args[index]);
+    }    
+    return args[index+1];
+  }
+  
+  protected void msg(String msg)
+  {
+    if(verbose_) {
+      System.out.println("STREAM: " + msg);
+    }
+  }
+
+  public void exitUsage()
+  {
+                      //         1         2         3         4         5         6         7         
+                      //1234567890123456789012345678901234567890123456789012345678901234567890123456789
+    System.out.println("Usage: bin/hadoop jar build/hadoop-streaming.jar [options]");
+    System.out.println("Options:");
+    System.out.println("  -input   <path>     DFS input file(s) for the Map step");
+    System.out.println("  -output  <path>     DFS output directory for the Reduce step");
+    System.out.println("  -mapper  <cmd>      The streaming command to run");
+    System.out.println("  -reducer <cmd>      The streaming command to run");
+    System.out.println("  -files   <file>     Additional files to be shipped in the Job jar file");
+    System.out.println("  -cluster <name>     Default uses hadoop-default.xml and hadoop-site.xml");
+    System.out.println("  -config  <file>     Optional. One or more paths to xml config files");
+    System.out.println("  -inputreader <spec> Optional. See below");
+    System.out.println("  -verbose");
+    System.out.println();
+    System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
+    System.out.println("Default Map input format: a line is a record in UTF-8");
+    System.out.println("  the key part ends at first TAB, the rest of the line is the value");
+    System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
+    System.out.println("  comma-separated name-values can be specified to configure the InputFormat");
+    System.out.println("  Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
+    System.out.println("Map output format, reduce input/output format:");
+    System.out.println("  Format defined by what mapper command outputs. Line-oriented");
+    System.out.println("Mapper and Reducer <cmd> syntax: ");
+    System.out.println("  If the mapper or reducer programs are prefixed with " + NOSHIP + " then ");
+    System.out.println("  the paths are assumed to be valid absolute paths on the task tracker machines");
+    System.out.println("  and are NOT packaged with the Job jar file.");
+    System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
+    System.out.println("  Hadoop clusters. ");
+    System.out.println("  The default is to use the normal hadoop-default.xml and hadoop-site.xml");
+    System.out.println("  Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
+    System.out.println();
+    System.out.println("Example: hadoopStreaming -mapper \"noship:/usr/local/bin/perl5 filter.pl\"");
+    System.out.println("           -files /local/filter.pl -input \"/logs/0604*/*\" [...]");
+    System.out.println("  Ships a script, invokes the non-shipped perl interpreter");
+    System.out.println("  Shipped files go to the working directory so filter.pl is found by perl");
+    System.out.println("  Input files are all the daily logs for days in month 2006-04");
+    fail("");    
+  }
+  
+  public void fail(String message)
+  {
+    if(mayExit_) {
+        System.err.println(message);
+        System.exit(1);
+    } else {
+       throw new IllegalArgumentException(message);
+    }
+  }
+
+  // --------------------------------------------
+  
+  
+  protected String getHadoopClientHome()
+  {
+    String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
+    if(h == null) {
+      //fail("Missing required environment variable: HADOOP_HOME");
+      h = "UNDEF";
+    }
+    return h;
+  }
+
+
+  protected boolean isLocalHadoop()
+  {
+    boolean local;
+    if(jobConf_ == null) {
+        local = getClusterNick().equals("local");
+    } else {
+        local = jobConf_.get("mapred.job.tracker", "").equals("local");
+    }
+    return local;
+  }
+  protected String getClusterNick() 
+  { 
+    return cluster_;
+  }
+  
+  /** @return path to the created Jar file or null if no files are necessary.
+  */
+  protected String packageJobJar() throws IOException
+  {
+    ArrayList unjarFiles = new ArrayList();
+
+    // Runtime code: ship same version of code as self (job submitter code)
+    // usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar
+    String runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
+    if(runtimeClasses == null) {
+        throw new IOException("runtime classes not found: " + getClass().getPackage());
+    } else {
+        msg("Found runtime classes in: " + runtimeClasses);
+    }
+    if(isLocalHadoop()) {
+      // don't package class files (they might get unpackaged in . and then 
+      //  hide the intended CLASSPATH entry)
+      // we still package everything else (so that scripts and executable are found in 
+      //  Task workdir like distributed Hadoop)
+    } else {
+      if(new File(runtimeClasses).isDirectory()) {    
+          packageFiles_.add(runtimeClasses);
+      } else {
+          unjarFiles.add(runtimeClasses);
+      }
+    }
+    if(packageFiles_.size() + unjarFiles.size()==0) {
+      return null;
+    }
+    File jobJar = File.createTempFile("streamjob", ".jar");
+    System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);    
+    if(debug_ == 0) {
+      jobJar.deleteOnExit();
+    }
+    JarBuilder builder = new JarBuilder();
+    if(verbose_) {
+      builder.setVerbose(true);
+    }
+    String jobJarName = jobJar.getAbsolutePath();
+    builder.merge(packageFiles_, unjarFiles, jobJarName);
+    return jobJarName;
+  }
+  
+  protected void setJobConf() throws IOException
+  {
+    msg("hadoopAliasConf_ = " + hadoopAliasConf_);
+    config_ = new Configuration();
+    if(!cluster_.equals("default")) {
+        config_.addFinalResource(new Path(getHadoopAliasConfFile()));
+    } else {
+      // use only defaults: hadoop-default.xml and hadoop-site.xml
+    }
+    Iterator it = configPath_.iterator();
+    while(it.hasNext()) {
+        String pathName = (String)it.next();
+        config_.addFinalResource(new Path(pathName));
+    }   
+    // general MapRed job properties
+    jobConf_ = new JobConf(config_);
+    for(int i=0; i<inputGlobs_.size(); i++) {
+      jobConf_.addInputDir(new File((String)inputGlobs_.get(i)));
+    }
+    
+    jobConf_.setInputFormat(StreamInputFormat.class);
+    jobConf_.setInputKeyClass(UTF8.class);
+    jobConf_.setInputValueClass(UTF8.class);
+    jobConf_.setOutputKeyClass(UTF8.class);
+    jobConf_.setOutputValueClass(UTF8.class);
+    //jobConf_.setCombinerClass();
+
+    jobConf_.setOutputDir(new File(output_));
+    jobConf_.setOutputFormat(StreamOutputFormat.class);
+    
+    jobConf_.set("stream.addenvironment", addTaskEnvironment_);
+    
+    String defaultPackage = this.getClass().getPackage().getName();
+    
+    Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
+    if(c != null) {
+      jobConf_.setMapperClass(c);
+    } else {
+      jobConf_.setMapperClass(PipeMapper.class);
+      jobConf_.set("stream.map.streamprocessor", mapCmd_);
+    }
+
+    if(redCmd_ != null) {
+      c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
+      if(c != null) {
+        jobConf_.setReducerClass(c);
+      } else {
+        jobConf_.setReducerClass(PipeReducer.class);
+        jobConf_.set("stream.reduce.streamprocessor", redCmd_);
+      }
+    }
+    
+    if(inReaderSpec_ != null) {
+        String[] args = inReaderSpec_.split(",");
+        String readerClass = args[0];
+        // this argument can only be a Java class
+        c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
+        if(c != null) {            
+            jobConf_.set("stream.recordreader.class", c.getName());
+        } else {
+            fail("-inputreader: class not found: " + readerClass);
+        }
+        for(int i=1; i<args.length; i++) {
+            String[] nv = args[i].split("=", 2);
+            String k = "stream.recordreader." + nv[0];
+            String v = (nv.length>1) ? nv[1] : "";
+            jobConf_.set(k, v);
+        }
+    }
+    
+    jar_ = packageJobJar();
+    if(jar_ != null) {
+        jobConf_.setJar(jar_);
+    }
+    //jobConf_.mtdump();System.exit(1);
+  }
+  
+  protected String getJobTrackerHostPort()
+  {
+    return jobConf_.get("mapred.job.tracker");
+  }
+  
+  protected void jobInfo()
+  {    
+    if(isLocalHadoop()) {
+      LOG.info("Job running in-process (local Hadoop)"); 
+    } else {
+      String hp = getJobTrackerHostPort();
+      LOG.info("To kill this job, run:"); 
+      LOG.info(getHadoopClientHome() + "/bin/hadoop job  -Dmapred.job.tracker=" + hp + " -kill " + jobId_);
+      //LOG.info("Job file: " + running_.getJobFile() );
+      LOG.info("Tracking URL: "  + StreamUtil.qualifyHost(running_.getTrackingURL()));
+    }
+  }
+  
+  // Based on JobClient
+  public void submitAndMonitorJob() throws IOException {
+    
+    if(jar_ != null && isLocalHadoop()) {
+        // getAbs became required when shell and subvm have different working dirs...
+        File wd = new File(".").getAbsoluteFile();
+        StreamUtil.unJar(new File(jar_), wd);
+    }
+    
+    // if jobConf_ changes must recreate a JobClient 
+    jc_ = new JobClient(jobConf_); 
+    boolean error = true;
+    running_ = null;
+    String lastReport = null;
+    try {
+      running_ = jc_.submitJob(jobConf_);
+      jobId_ = running_.getJobID();
+
+      LOG.info("getLocalDirs(): " + Arrays.toString(jobConf_.getLocalDirs()));     
+      LOG.info("Running job: " + jobId_);      
+      jobInfo();
+
+      while (!running_.isComplete()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {}
+        running_ = jc_.getJob(jobId_);
+        String report = null;
+        report = " map "+Math.round(running_.mapProgress()*100)
+        +"%  reduce " + Math.round(running_.reduceProgress()*100)+"%";
+
+        if (!report.equals(lastReport)) {
+          LOG.info(report);
+          lastReport = report;
+        }
+      }
+      if (!running_.isSuccessful()) {
+        jobInfo();
+        throw new IOException("Job not Successful!");
+      }
+      LOG.info("Job complete: " + jobId_);
+      LOG.info("Output: " + output_);
+      error = false;
+    } finally {    
+      if (error && (running_ != null)) {
+        LOG.info("killJob...");
+        running_.killJob();
+      }
+      jc_.close();
+    }
+  }
+  
+
+  public final static String NOSHIP = "noship:";
+  
+  protected boolean mayExit_;
+  protected String[] argv_;
+  protected boolean verbose_;
+  protected int debug_;
+
+  protected Environment env_;
+  
+  protected String jar_;
+  protected boolean localHadoop_;
+  protected Configuration config_;
+  protected JobConf jobConf_;
+  protected JobClient jc_;
+
+  // command-line arguments
+  protected ArrayList inputGlobs_   = new ArrayList(); // <String>
+  protected ArrayList packageFiles_ = new ArrayList(); // <String>
+  protected String output_;
+  protected String mapCmd_;
+  protected String redCmd_;
+  protected String cluster_;
+  protected ArrayList configPath_ = new ArrayList(); // <String>
+  protected String hadoopAliasConf_;
+  protected String inReaderSpec_;
+  
+
+  // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
+  // encoding "a=b c=d"
+  protected String addTaskEnvironment_;
+  
+  protected boolean outputSingleNode_;
+  protected long minRecWrittenToEnableSkip_;
+  
+  protected RunningJob running_;
+  protected String jobId_;
+  
+  
+}
+

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Wed May  3 14:00:13 2006
@@ -0,0 +1,114 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Similar to org.apache.hadoop.mapred.TextRecordReader, 
+ * but delimits key and value with a TAB.
+ * @author Michel Tourn
+ */
+public class StreamLineRecordReader extends StreamBaseRecordReader 
+{
+
+  public StreamLineRecordReader(
+    FSDataInputStream in, long start, long end, 
+    String splitName, Reporter reporter, JobConf job)
+    throws IOException
+  {
+    super(in, start, end, splitName, reporter, job);
+  }
+
+  public void seekNextRecordBoundary() throws IOException
+  {
+    int bytesSkipped = 0;
+    if (start_ != 0) {
+      in_.seek(start_ - 1);
+      // scan to the next newline in the file
+      while (in_.getPos() < end_) {
+        char c = (char)in_.read();
+        bytesSkipped++;
+        if (c == '\r' || c == '\n') {
+          break;
+        }
+      }
+    }
+
+    System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+  }
+
+  public synchronized boolean next(Writable key, Writable value)
+    throws IOException {
+    long pos = in_.getPos();
+    if (pos >= end_)
+      return false;
+
+    //((LongWritable)key).set(pos);           // key is position
+    //((UTF8)value).set(readLine(in));        // value is line
+    String line = readLine(in_);
+
+    // key is line up to TAB, value is rest
+    final boolean NOVAL = false;
+    if(NOVAL) {
+        ((UTF8)key).set(line);
+        ((UTF8)value).set("");
+    } else {
+      int tab = line.indexOf('\t');
+      if(tab == -1) {
+        ((UTF8)key).set(line);
+        ((UTF8)value).set("");
+      } else {
+        ((UTF8)key).set(line.substring(0, tab));
+        ((UTF8)value).set(line.substring(tab+1));
+      }
+    }
+    numRecStats(line);
+    return true;
+  }
+
+
+  // from TextInputFormat
+  private static String readLine(FSDataInputStream in) throws IOException {
+    StringBuffer buffer = new StringBuffer();
+    while (true) {
+
+      int b = in.read();
+      if (b == -1)
+        break;
+
+      char c = (char)b;              // bug: this assumes eight-bit characters.
+      if (c == '\r' || c == '\n')    // TODO || c == '\t' here
+        break;
+
+      buffer.append(c);
+    }
+
+    return buffer.toString();
+  }
+
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Wed May  3 14:00:13 2006
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.mapred.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** Similar to org.apache.hadoop.mapred.TextOutputFormat, 
+ * but delimits key and value with a TAB.
+ * @author Michel Tourn
+ */
+public class StreamOutputFormat implements OutputFormat {
+
+  public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+                                      String name) throws IOException {
+
+    File file = new File(job.getOutputDir(), name);
+
+    final FSDataOutputStream out = fs.create(file);
+
+    return new RecordWriter() {
+        public synchronized void write(WritableComparable key, Writable value)
+          throws IOException {
+          out.write(key.toString().getBytes("UTF-8"));
+          out.writeByte('\t');
+          out.write(value.toString().getBytes("UTF-8"));
+          out.writeByte('\n');
+        }
+        public synchronized void close(Reporter reporter) throws IOException {
+          out.close();
+        }
+      };
+  }
+  
+  
+  /** Check whether the output specification for a job is appropriate.  Called
+   * when a job is submitted.  Typically checks that it does not already exist,
+   * throwing an exception when it already exists, so that output is not
+   * overwritten.
+   *
+   * @param job the job whose output will be written
+   * @throws IOException when output should not be attempted
+   */
+  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
+  {
+    // allow existing data (for app-level restartability)
+  }
+  
+}