You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/03 23:00:17 UTC
svn commit: r399432 [1/2] - in /lucene/hadoop/trunk: ./ src/contrib/
src/contrib/streaming/ src/contrib/streaming/src/
src/contrib/streaming/src/java/ src/contrib/streaming/src/java/org/
src/contrib/streaming/src/java/org/apache/ src/contrib/streaming/...
Author: cutting
Date: Wed May 3 14:00:13 2006
New Revision: 399432
URL: http://svn.apache.org/viewcvs?rev=399432&view=rev
Log:
HADOOP-191. Add streaming contrib package. Contributed by Michel Tourn.
Added:
lucene/hadoop/trunk/src/contrib/
lucene/hadoop/trunk/src/contrib/build-contrib.xml
lucene/hadoop/trunk/src/contrib/build.xml
lucene/hadoop/trunk/src/contrib/streaming/
lucene/hadoop/trunk/src/contrib/streaming/build.xml
lucene/hadoop/trunk/src/contrib/streaming/src/
lucene/hadoop/trunk/src/contrib/streaming/src/java/
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399432&r1=399431&r2=399432&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed May 3 14:00:13 2006
@@ -161,9 +161,15 @@
42. HADOOP-184. Re-structure some test code to better support testing
on a cluster. (Mahadev Konar via cutting)
-40. HADOOP-189. Fix MapReduce in standalone configuration to
+43. HADOOP-189. Fix MapReduce in standalone configuration to
correctly handle job jar files that contain a lib directory with
nested jar files. (cutting)
+
+44. HADOOP-191 Add streaming package, Hadoop's first contrib module.
+ This permits folks to easily submit MapReduce jobs whose map and
+ reduce functions are implemented by shell commands. Use
+ 'bin/hadoop jar build/hadoop-streaming.jar' to get details.
+ (Michel Tourn via cutting)
Release 0.1.1 - 2006-04-08
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/build.xml?rev=399432&r1=399431&r2=399432&view=diff
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Wed May 3 14:00:13 2006
@@ -353,4 +353,24 @@
<delete dir="${build.dir}"/>
</target>
+ <!-- ================================================================== -->
+ <!-- Contrib targets. For now, they must be called explicitely -->
+ <!-- Using subant instead of ant as a workaround for 30569 -->
+ <!-- ================================================================== -->
+ <target name="deploy-contrib" depends="compile">
+ <subant target="deploy">
+ <fileset file="src/contrib/build.xml"/>
+ </subant>
+ </target>
+ <target name="test-contrib" depends="compile">
+ <subant target="test">
+ <fileset file="src/contrib/build.xml"/>
+ </subant>
+ </target>
+ <target name="clean-contrib" depends="compile">
+ <subant target="clean">
+ <fileset file="src/contrib/build.xml"/>
+ </subant>
+ </target>
+
</project>
Added: lucene/hadoop/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/build-contrib.xml?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build-contrib.xml (added)
+++ lucene/hadoop/trunk/src/contrib/build-contrib.xml Wed May 3 14:00:13 2006
@@ -0,0 +1,166 @@
+<?xml version="1.0"?>
+
+<!-- Imported by contrib/*/build.xml files to share generic targets. -->
+
+<project name="hadoopbuildcontrib">
+
+ <property name="name" value="${ant.project.name}"/>
+ <property name="root" value="${basedir}"/>
+
+ <!-- Load all the default properties, and any the user wants -->
+ <!-- to contribute (without having to type -D or edit this file -->
+ <property file="${user.home}/${name}.build.properties" />
+ <property file="${root}/build.properties" />
+
+ <property name="hadoop.root" location="${root}/../../../"/>
+
+ <property name="src.dir" location="${root}/src/java"/>
+ <property name="src.test" location="${root}/src/test"/>
+
+ <available file="${src.test}" type="dir" property="test.available"/>
+
+ <property name="conf.dir" location="${hadoop.root}/conf"/>
+
+ <property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
+ <property name="build.classes" location="${build.dir}/classes"/>
+ <property name="build.test" location="${build.dir}/test"/>
+ <!-- all jars together -->
+ <property name="deploy.dir" location="${hadoop.root}/build/"/>
+
+ <property name="javac.deprecation" value="off"/>
+ <property name="javac.debug" value="on"/>
+
+ <property name="javadoc.link"
+ value="http://java.sun.com/j2se/1.4/docs/api/"/>
+
+ <property name="build.encoding" value="ISO-8859-1"/>
+
+ <fileset id="lib.jars" dir="${root}" includes="lib/*.jar"/>
+
+ <!-- the normal classpath -->
+ <path id="classpath">
+ <pathelement location="${build.classes}"/>
+ <fileset refid="lib.jars"/>
+ <pathelement location="${hadoop.root}/build/classes"/>
+ <fileset dir="${hadoop.root}/lib">
+ <include name="*.jar" />
+ </fileset>
+ </path>
+
+ <!-- the unit test classpath -->
+ <path id="test.classpath">
+ <pathelement location="${build.test}" />
+ <pathelement location="${hadoop.root}/build/test/classes"/>
+ <pathelement location="${hadoop.root}/src/test"/>
+ <pathelement location="${conf.dir}"/>
+ <pathelement location="${hadoop.root}/build"/>
+ <path refid="classpath"/>
+ </path>
+
+
+ <!-- to be overridden by sub-projects -->
+ <target name="init-contrib"/>
+
+ <!-- ====================================================== -->
+ <!-- Stuff needed by all targets -->
+ <!-- ====================================================== -->
+ <target name="init">
+ <mkdir dir="${build.dir}"/>
+ <mkdir dir="${build.classes}"/>
+ <mkdir dir="${build.test}"/>
+
+ <antcall target="init-contrib"/>
+ </target>
+
+
+ <!-- ====================================================== -->
+ <!-- Compile a Hadoop contrib's files -->
+ <!-- ====================================================== -->
+ <target name="compile" depends="init">
+ <echo message="Compiling contrib: ${name} destdir=${build.classes}"/>
+ <javac
+ encoding="${build.encoding}"
+ srcdir="${src.dir}"
+ includes="**/*.java"
+ destdir="${build.classes}"
+ debug="${javac.debug}"
+ deprecation="${javac.deprecation}">
+ <classpath refid="classpath"/>
+ </javac>
+ </target>
+
+ <!-- ================================================================== -->
+ <!-- Compile test code -->
+ <!-- ================================================================== -->
+ <target name="compile-test" depends="compile" if="test.available">
+ <javac
+ encoding="${build.encoding}"
+ srcdir="${src.test}"
+ includes="**/*.java"
+ destdir="${build.test}"
+ debug="${debug}">
+ <classpath refid="test.classpath"/>
+ </javac>
+ </target>
+
+
+ <!-- ====================================================== -->
+ <!-- Make a Hadoop contrib's jar -->
+ <!-- ====================================================== -->
+ <target name="jar" depends="compile">
+ <jar
+ jarfile="${build.dir}/hadoop-${name}.jar"
+ basedir="${build.classes}"
+ />
+ </target>
+
+ <target name="deploy" depends="jar">
+ <mkdir dir="${deploy.dir}"/>
+ <copy file="${build.dir}/hadoop-${name}.jar" todir="${deploy.dir}"/>
+ <!-- <copy todir="${deploy.dir}" flatten="true">
+ <fileset refid="lib.jars"/>
+ </copy> -->
+ </target>
+
+ <!-- ================================================================== -->
+ <!-- Run unit tests -->
+ <!-- ================================================================== -->
+ <target name="test" depends="compile-test, deploy" if="test.available">
+ <echo message="Testing contrib: ${name}"/>
+ <junit
+ printsummary="withOutAndErr" haltonfailure="no" fork="yes"
+ errorProperty="tests.failed" failureProperty="tests.failed">
+
+ <sysproperty key="test.build.data" value="${build.test}/data"/>
+
+ <!-- requires fork=yes for:
+ relative File paths to use the specified user.dir
+ classpath to use build/contrib/*.jar
+ -->
+ <sysproperty key="user.dir" value="${build.test}/data"/>
+
+ <classpath refid="test.classpath"/>
+ <formatter type="plain" />
+ <batchtest todir="${build.test}" unless="testcase">
+ <fileset dir="${src.test}"
+ includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+ </batchtest>
+ <batchtest todir="${build.test}" if="testcase">
+ <fileset dir="${src.test}" includes="**/${testcase}.java"/>
+ </batchtest>
+ </junit>
+ <fail if="tests.failed">Tests failed!</fail>
+
+ </target>
+
+
+ <!-- ================================================================== -->
+ <!-- Clean. Delete the build files, and their directories -->
+ <!-- ================================================================== -->
+ <target name="clean">
+ <echo message="Cleaning: ${name} builddir=${build.dir}"/>
+ <delete dir="${build.dir}"/>
+ <!--<delete dir="${deploy.dir}"/> -->
+ </target>
+
+</project>
Added: lucene/hadoop/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/build.xml?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/build.xml (added)
+++ lucene/hadoop/trunk/src/contrib/build.xml Wed May 3 14:00:13 2006
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+
+<project name="hadoopcontrib" default="deploy" basedir=".">
+
+ <!-- ====================================================== -->
+ <!-- Build & deploy all the contrib jars. -->
+ <!-- ====================================================== -->
+
+ <target name="deploy">
+ <subant target="deploy">
+ <fileset dir="." includes="*/build.xml"/>
+ </subant>
+ </target>
+
+ <!-- ====================================================== -->
+ <!-- Test all the contribs. -->
+ <!-- ====================================================== -->
+ <target name="test">
+ <subant target="test">
+ <fileset dir="." includes="*/build.xml"/>
+ </subant>
+ </target>
+
+
+ <!-- ====================================================== -->
+ <!-- Clean all the contribs. -->
+ <!-- ====================================================== -->
+ <target name="clean">
+ <subant target="clean">
+ <fileset dir="." includes="*/build.xml"/>
+ </subant>
+ </target>
+
+</project>
Added: lucene/hadoop/trunk/src/contrib/streaming/build.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/build.xml?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/build.xml (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/build.xml Wed May 3 14:00:13 2006
@@ -0,0 +1,19 @@
+<?xml version="1.0"?>
+
+<project name="streaming" default="jar">
+
+ <import file="../build-contrib.xml"/>
+
+ <!-- Override jar target to specify main class -->
+ <target name="jar" depends="compile">
+ <jar
+ jarfile="${build.dir}/hadoop-${name}.jar"
+ basedir="${build.classes}"
+ >
+ <manifest>
+ <attribute name="Main-Class" value="org.apache.hadoop.streaming.HadoopStreaming"/>
+ </manifest>
+ </jar>
+ </target>
+
+</project>
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java Wed May 3 14:00:13 2006
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.*;
+
+public class Environment extends Properties
+{
+ public Environment()
+ throws IOException
+ {
+ // Extend this code to fit all operating
+ // environments that you expect to run in
+
+ String command = null;
+ String OS = System.getProperty("os.name");
+ if (OS.equals("Windows NT")) {
+ command = "cmd /C set";
+ } else if (OS.indexOf("ix") > -1 || OS.indexOf("inux") > -1) {
+ command = "env";
+ } else {
+ // Add others here
+ }
+
+ if (command == null) {
+ throw new RuntimeException("Operating system " + OS
+ + " not supported by this class");
+ }
+
+ // Read the environment variables
+
+ Process pid = Runtime.getRuntime().exec(command);
+ BufferedReader in =
+ new BufferedReader(
+ new InputStreamReader(
+ pid.getInputStream()));
+ while(true) {
+ String line = in.readLine();
+ if (line == null)
+ break;
+ int p = line.indexOf("=");
+ if (p != -1) {
+ String name = line.substring(0, p);
+ String value = line.substring(p+1);
+ setProperty(name, value);
+ }
+ }
+ in.close();
+ try {
+ pid.waitFor();
+ }
+ catch (InterruptedException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ // to be used with Runtime.exec(String[] cmdarray, String[] envp)
+ String[] toArray()
+ {
+ String[] arr = new String[super.size()];
+ Enumeration it = super.keys();
+ int i = -1;
+ while(it.hasMoreElements()) {
+ String key = (String)it.nextElement();
+ String val = (String)get(key);
+ i++;
+ arr[i] = key + "=" + val;
+ }
+ return arr;
+ }
+}
\ No newline at end of file
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Wed May 3 14:00:13 2006
@@ -0,0 +1,31 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+/** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
+*/
+public class HadoopStreaming
+{
+ public static void main(String[] args) throws IOException
+ {
+ boolean mayExit = true;
+ StreamJob job = new StreamJob(args, mayExit);
+ job.go();
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java Wed May 3 14:00:13 2006
@@ -0,0 +1,210 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.*;
+import java.util.jar.*;
+import java.util.zip.ZipException;
+
+public class JarBuilder
+{
+ public JarBuilder()
+ {
+ }
+
+ public void setVerbose(boolean v)
+ {
+ this.verbose = v;
+ }
+
+ public void merge(List srcNames, List srcUnjar, String dstJar)
+ throws IOException
+ {
+ String source = null;
+ JarOutputStream jarOut = null;
+ JarFile jarSource = null;
+ jarOut = new JarOutputStream(new FileOutputStream(dstJar));
+ boolean throwing = false;
+ try {
+ if(srcNames != null) {
+ Iterator iter = srcNames.iterator();
+ while(iter.hasNext()) {
+ source = (String)iter.next();
+ File fsource = new File(source);
+ String base = getBasePathInJarOut(source);
+ if(!fsource.exists()) {
+ throwing = true;
+ throw new FileNotFoundException(fsource.getAbsolutePath());
+ }
+ if(fsource.isDirectory()) {
+ addDirectory(jarOut, base, fsource, 0);
+ } else {
+ addFileStream(jarOut, base, fsource);
+ }
+ }
+ }
+ if(srcUnjar != null) {
+ Iterator iter = srcUnjar.iterator();
+ while(iter.hasNext()) {
+ source = (String)iter.next();
+ jarSource = new JarFile(source);
+ addJarEntries(jarOut, jarSource);
+ jarSource.close();
+ }
+
+ }
+ } finally {
+ try {
+ jarOut.close();
+ } catch(ZipException z) {
+ if(! throwing) {
+ throw new IOException(z.toString());
+ }
+ }
+ }
+ }
+
+ protected String fileExtension(String file)
+ {
+ int leafPos = file.lastIndexOf('/');
+ if(leafPos == file.length()-1) return "";
+ String leafName = file.substring(leafPos+1);
+ int dotPos = leafName.lastIndexOf('.');
+ if(dotPos == -1) return "";
+ String ext = leafName.substring(dotPos+1);
+ return ext;
+ }
+
+ /** @return empty or a jar base path. Must not start with '/' */
+ protected String getBasePathInJarOut(String sourceFile)
+ {
+ // TaskRunner will unjar and append to classpath: .:classes/:lib/*
+ String ext = fileExtension(sourceFile);
+ if(ext.equals("class")) {
+ return "classes/"; // or ""
+ } else if(ext.equals("jar") || ext.equals("zip")) {
+ return "lib/";
+ } else {
+ return "";
+ }
+ }
+
+ private void addJarEntries(JarOutputStream dst, JarFile src)
+ throws IOException
+ {
+ Enumeration entries = src.entries();
+ JarEntry entry = null;
+ while(entries.hasMoreElements()) {
+ entry = (JarEntry)entries.nextElement();
+ //if(entry.getName().startsWith("META-INF/")) continue;
+ InputStream in = src.getInputStream(entry);
+ addNamedStream(dst, entry.getName(), in);
+ }
+ }
+
+ /** @param name path in jar for this jar element. Must not start with '/' */
+ void addNamedStream(JarOutputStream dst, String name, InputStream in)
+ throws IOException
+ {
+ if(verbose) {
+ System.err.println("JarBuilder.addNamedStream " + name);
+ }
+ try {
+ dst.putNextEntry(new JarEntry(name));
+ int bytesRead = 0;
+ while((bytesRead = in.read(buffer, 0, BUFF_SIZE)) != -1) {
+ dst.write(buffer, 0, bytesRead);
+ }
+ } catch(ZipException ze) {
+ if(ze.getMessage().indexOf("duplicate entry") >= 0) {
+ if(verbose) {
+ System.err.println(ze + " Skip duplicate entry " + name);
+ }
+ } else {
+ throw ze;
+ }
+ } finally {
+ in.close();
+ dst.flush();
+ dst.closeEntry();
+ }
+ }
+
+ void addFileStream(JarOutputStream dst, String jarBaseName, File file)
+ throws IOException
+ {
+ FileInputStream in = new FileInputStream(file);
+ String name = jarBaseName + file.getName();
+ addNamedStream(dst, name, in);
+ in.close();
+ }
+
+ void addDirectory(JarOutputStream dst, String jarBaseName, File dir, int depth)
+ throws IOException
+ {
+ File[] contents = dir.listFiles();
+ if(contents != null) {
+ for(int i=0; i<contents.length; i++) {
+ File f = contents[i];
+ String fBaseName = (depth==0) ? "" : dir.getName();
+ if(jarBaseName.length()>0) {
+ fBaseName = jarBaseName + "/" + fBaseName;
+ }
+ if(f.isDirectory()) {
+ addDirectory(dst, fBaseName, f, depth+1);
+ } else {
+ addFileStream(dst, fBaseName+"/", f);
+ }
+ }
+ }
+ }
+
+ /** Test program */
+ public static void main(String args[])
+ {
+ // args = new String[] { "C:/Temp/merged.jar", "C:/jdk1.5.0/jre/lib/ext/dnsns.jar", "/Temp/addtojar2.log", "C:/jdk1.5.0/jre/lib/ext/mtest.jar", "C:/Temp/base"};
+ if(args.length < 2) {
+ System.err.println("Usage: JarFiles merged.jar [src.jar | dir | file ]+");
+ } else {
+ JarBuilder jarFiles = new JarBuilder();
+ List names = new ArrayList();
+ List unjar = new ArrayList();
+ for(int i = 1; i < args.length; i++) {
+ String f = args[i];
+ String ext = jarFiles.fileExtension(f);
+ boolean expandAsJar = ext.equals("jar") || ext.equals("zip");
+ if(expandAsJar) {
+ unjar.add(f);
+ } else {
+ names.add(f);
+ }
+ }
+ try {
+ jarFiles.merge(names, unjar, args[0]);
+ Date lastMod = new Date(new File(args[0]).lastModified());
+ System.out.println("Merge done to " + args[0] + " " + lastMod);
+ } catch(Exception ge) {
+ ge.printStackTrace(System.err);
+ }
+ }
+ }
+
+ private static final int BUFF_SIZE = 32*1024;
+ private byte buffer[] = new byte[BUFF_SIZE];
+ protected boolean verbose = false;
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java Wed May 3 14:00:13 2006
@@ -0,0 +1,100 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * A simulation of some Java SE 6 File methods.
+ * http://java.sun.com/developer/technicalArticles/J2SE/Desktop/mustang/enhancements/
+ *
+ * Limitations of this version: requires Cygwin on Windows, does not perform SecurityManager checks,
+ * always returns true (success) without verifying that the operation worked.
+ *
+ * Note: not specifying ownerOnly maps to ownerOnly = false
+ * From man chmod: If no user specs are given, the effect is as if `a' were given.
+ *
+ */
+public class MustangFile extends File
+{
+
+ public MustangFile(File parent, String child)
+ {
+ super(parent, child);
+ }
+
+ public MustangFile(String pathname)
+ {
+ super(pathname);
+ }
+
+ public MustangFile(String parent, String child)
+ {
+ super(parent, child);
+ }
+
+ public boolean setReadable(boolean readable, boolean ownerOnly)
+ {
+ chmod("r", readable, ownerOnly);
+ return SUCCESS;
+ }
+
+ public boolean setReadable(boolean readable)
+ {
+ chmod("r", readable, false);
+ return SUCCESS;
+ }
+
+ public boolean setWritable(boolean writable, boolean ownerOnly)
+ {
+ chmod("w", writable, ownerOnly);
+ return SUCCESS;
+ }
+
+ public boolean setWritable(boolean writable)
+ {
+ chmod("w", writable, false);
+ return SUCCESS;
+ }
+
+ public boolean setExecutable(boolean executable, boolean ownerOnly)
+ {
+ chmod("x", executable, ownerOnly);
+ return SUCCESS;
+ }
+
+ public boolean setExecutable(boolean executable)
+ {
+ chmod("x", executable, false);
+ return SUCCESS;
+ }
+
+ void chmod(String perms, boolean plus, boolean ownerOnly)
+ {
+ String[] argv = new String[3];
+ argv[0] = "/bin/chmod";
+ String spec = ownerOnly ? "u" : "ugoa";
+ spec += (plus ? "+" : "-");
+ spec += perms;
+ argv[1] = spec;
+ argv[2] = getAbsolutePath();
+ StreamUtil.exec(argv, System.err);
+ }
+
+ final static boolean SUCCESS = true;
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Wed May 3 14:00:13 2006
@@ -0,0 +1,443 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.nio.channels.*;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.regex.*;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+
+/** Shared functionality for PipeMapper, PipeReducer.
+ * @author Michel Tourn
+ */
+public abstract class PipeMapRed {
+
+ /** The command to be spawned as a subprocess.
+ * Mapper/Reducer operations will delegate to it
+ */
+ abstract String getPipeCommand(JobConf job);
+ /*
+ */
+ abstract String getKeyColPropName();
+
+
+ /**
+ * @returns ow many TABS before the end of the key part
+ * usually: 1 or "ALL"
+ * used both for tool output of both Map and Reduce
+ * configured via tool's argv: splitKeyVal=ALL or 1..
+ * although it is interpreted here, not by tool
+ */
+ int getKeyColsFromPipeCommand(String cmd)
+ {
+ String key = getKeyColPropName();
+ Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
+ Matcher match = kcPat.matcher(cmd);
+ String kc;
+ if(!match.matches()) {
+ kc = null;
+ } else {
+ kc = match.group(1);
+ }
+
+ int cols;
+ if(kc== null) {
+ // default value is 1 and the Stream applications could instead
+ // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
+ cols = 1;
+ } else if(kc.equals("ALL")) {
+ cols = ALL_COLS;
+ } else {
+ try {
+ cols = Integer.parseInt(kc);
+ } catch(NumberFormatException nf) {
+ cols = Integer.MAX_VALUE;
+ }
+ }
+
+ System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
+
+ return cols;
+ }
+
+ String[] splitArgs(String args)
+ {
+ String regex = "\\s(?=(?:[^\"]*\"[^\"]*\")*[^\"]*\\z)";
+ String[] split = args.split(regex);
+ // remove outer quotes
+ for(int i=0; i<split.length; i++) {
+ String si = split[i].trim();
+ if(si.charAt(0)=='"' && si.charAt(si.length()-1)=='"') {
+ si = si.substring(1, si.length()-1);
+ split[i] = si;
+ }
+ }
+ return split;
+ }
+ public void configure(JobConf job)
+ {
+
+ try {
+ String argv = getPipeCommand(job);
+ keyCols_ = getKeyColsFromPipeCommand(argv);
+
+ doPipe_ = (argv != null);
+ if(!doPipe_) return;
+
+ setStreamJobDetails(job);
+ setStreamProperties();
+
+ String[] argvSplit = splitArgs(argv);
+ String prog = argvSplit[0];
+ String userdir = System.getProperty("user.dir");
+ if(new File(prog).isAbsolute()) {
+ // we don't own it. Hope it is executable
+ } else {
+ new MustangFile(prog).setExecutable(true, true);
+ }
+
+ // argvSplit[0]:
+ // An absolute path should be a preexisting valid path on all TaskTrackers
+ // A relative path should match in the unjarred Job data
+ // In this case, force an absolute path to make sure exec finds it.
+ argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
+ log_.println("PipeMapRed exec " + Arrays.toString(argvSplit));
+
+
+ Environment childEnv = (Environment)StreamUtil.env().clone();
+ addEnvironment(childEnv, job.get("stream.addenvironment"));
+ sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
+
+ /* // This way required jdk1.5
+ ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
+ Map<String, String> env = processBuilder.environment();
+ addEnvironment(env, job.get("stream.addenvironment"));
+ sim = processBuilder.start();
+ */
+
+ clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
+ clientIn_ = new BufferedReader(new InputStreamReader(sim.getInputStream()));
+ clientErr_ = new DataInputStream(sim.getErrorStream());
+ doneLock_ = new Object();
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ e.printStackTrace(log_);
+ }
+ }
+
+ void setStreamJobDetails(JobConf job)
+ {
+ jobLog_ = job.get("stream.jobLog_");
+ String s = job.get("stream.minRecWrittenToEnableSkip_");
+ if(s != null) {
+ minRecWrittenToEnableSkip_ = Long.parseLong(s);
+ log_.println("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+ }
+ }
+
+ void setStreamProperties()
+ {
+ taskid_ = System.getProperty("stream.taskid");
+ if(taskid_ == null) {
+ taskid_ = "noid" + System.currentTimeMillis();
+ }
+ String s = System.getProperty("stream.port");
+ if(s != null) {
+ reportPortPlusOne_ = Integer.parseInt(s);
+ }
+
+ }
+
+ void addEnvironment(Properties env, String nameVals)
+ {
+ // encoding "a=b c=d" from StreamJob
+ if(nameVals == null) return;
+ String[] nv = nameVals.split(" ");
+ for(int i=0; i<nv.length; i++) {
+ String[] pair = nv[i].split("=", 2);
+ if(pair.length != 2) {
+ log_.println("Skip ev entry:" + nv[i]);
+ } else {
+ log_.println("Add ev entry:" + nv[i]);
+ env.put(pair[0], pair[1]);
+ }
+ }
+ }
+
+ /** .. and if successful: delete the task log */
+ void appendLogToJobLog(String status)
+ {
+ if(jobLog_ == null) {
+ return; // not using a common joblog
+ }
+ StreamUtil.exec("/bin/rm " + LOGNAME, log_);
+ // TODO socket-based aggregator (in JobTrackerInfoServer)
+ }
+
+
+ void startOutputThreads(OutputCollector output, Reporter reporter)
+ {
+ outputDone_ = false;
+ errorDone_ = false;
+ outThread_ = new MROutputThread(output, reporter);
+ outThread_.start();
+ errThread_ = new MRErrorThread(reporter);
+ errThread_.start();
+ }
+
+ void splitKeyVal(String line, UTF8 key, UTF8 val)
+ {
+ int pos;
+ if(keyCols_ == ALL_COLS) {
+ pos = -1;
+ } else {
+ pos = line.indexOf('\t');
+ }
+ if(pos == -1) {
+ key.set(line);
+ val.set("");
+ } else {
+ key.set(line.substring(0, pos));
+ val.set(line.substring(pos+1));
+ }
+ }
+
+ class MROutputThread extends Thread
+ {
+ MROutputThread(OutputCollector output, Reporter reporter)
+ {
+ setDaemon(true);
+ this.output = output;
+ this.reporter = reporter;
+ }
+ public void run() {
+ try {
+ try {
+ UTF8 EMPTY = new UTF8("");
+ UTF8 key = new UTF8();
+ UTF8 val = new UTF8();
+ // 3/4 Tool to Hadoop
+ while((answer = clientIn_.readLine()) != null) {
+ // 4/4 Hadoop out
+ splitKeyVal(answer, key, val);
+ output.collect(key, val);
+ numRecWritten_++;
+ if(numRecWritten_ % 100 == 0) {
+ log_.println(numRecRead_+"/"+numRecWritten_);
+ log_.flush();
+ }
+ }
+ } catch(IOException io) {
+ io.printStackTrace(log_);
+ }
+ log_.println("MROutputThread done");
+ } finally {
+ outputDone_ = true;
+ synchronized(doneLock_) {
+ doneLock_.notifyAll();
+ }
+ }
+ }
+ OutputCollector output;
+ Reporter reporter;
+ String answer;
+ }
+
+ class MRErrorThread extends Thread
+ {
+ public MRErrorThread(Reporter reporter)
+ {
+ this.reporter = reporter;
+ setDaemon(true);
+ }
+ public void run()
+ {
+ String line;
+ try {
+ long num = 0;
+ int bucket = 100;
+ while((line=clientErr_.readLine()) != null) {
+ num++;
+ log_.println(line);
+ if(num < 10) {
+ String hline = "MRErr: " + line;
+ System.err.println(hline);
+ reporter.setStatus(hline);
+ }
+ }
+ } catch(IOException io) {
+ io.printStackTrace(log_);
+ } finally {
+ errorDone_ = true;
+ synchronized(doneLock_) {
+ doneLock_.notifyAll();
+ }
+ }
+ }
+ Reporter reporter;
+ }
+
+ public void mapRedFinished()
+ {
+ log_.println("mapRedFinished");
+ try {
+ if(!doPipe_) return;
+ try {
+ if(clientOut_ != null) {
+ clientOut_.close();
+ }
+ } catch(IOException io) {
+ }
+ if(outThread_ == null) {
+ // no input records: threads were never spawned
+ } else {
+ try {
+ while(!outputDone_ || !errorDone_) {
+ synchronized(doneLock_) {
+ doneLock_.wait();
+ }
+ }
+ } catch(InterruptedException ie) {
+ ie.printStackTrace();
+ }
+ }
+ sim.destroy();
+ } catch(RuntimeException e) {
+ e.printStackTrace(log_);
+ throw e;
+ }
+ }
+
+ void maybeLogRecord()
+ {
+ if(numRecRead_ >= nextRecReadLog_) {
+ log_.println(numRecInfo());
+ log_.flush();
+ nextRecReadLog_ *= 10;
+ //nextRecReadLog_ += 1000;
+ }
+ }
+
+ public String getContext()
+ {
+
+ String s = numRecInfo() + "\n";
+ s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
+ s += "LOGNAME=" + LOGNAME + "\n";
+ s += envline("HOST");
+ s += envline("USER");
+ s += envline("HADOOP_USER");
+ //s += envline("PWD"); // =/home/crawler/hadoop/trunk
+ s += "last Hadoop input: |" + mapredKey_ + "|\n";
+ s += "last tool output: |" + outThread_.answer + "|\n";
+ s += "Date: " + new Date() + "\n";
+ // s += envline("HADOOP_HOME");
+ // s += envline("REMOTE_HOST");
+ return s;
+ }
+
+ String envline(String var)
+ {
+ return var + "=" + StreamUtil.env().get(var) + "\n";
+ }
+
+ String numRecInfo()
+ {
+ return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_;
+ }
+ String logFailure(Exception e)
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
+ log_.println(msg);
+ return msg;
+ }
+
+
+ long numRecRead_ = 0;
+ long numRecWritten_ = 0;
+ long numRecSkipped_ = 0;
+
+ long nextRecReadLog_ = 1;
+
+ long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
+
+ int keyCols_;
+ final static int ALL_COLS = Integer.MAX_VALUE;
+
+ // generic MapRed parameters passed on by hadoopStreaming
+ String taskid_;
+ int reportPortPlusOne_;
+
+ boolean doPipe_;
+
+ Process sim;
+ Object doneLock_;
+ MROutputThread outThread_;
+ MRErrorThread errThread_;
+ boolean outputDone_;
+ boolean errorDone_;
+ DataOutputStream clientOut_;
+ DataInputStream clientErr_;
+ BufferedReader clientIn_;
+
+ String jobLog_;
+ // set in PipeMapper/PipeReducer subclasses
+ String mapredKey_;
+ int numExceptions_;
+
+ String LOGNAME;
+ PrintStream log_;
+
+ { // instance initializer
+ try {
+ int id = (int)((System.currentTimeMillis()/2000) % 10);
+ String sid = id+ "." + StreamUtil.env().get("USER");
+ LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
+ log_ = new PrintStream(new FileOutputStream(LOGNAME));
+ log_.println(new java.util.Date());
+ log_.flush();
+ } catch(IOException io) {
+ System.err.println("LOGNAME=" + LOGNAME);
+ io.printStackTrace();
+ } finally {
+ if(log_ == null) {
+ log_ = System.err;
+ }
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Wed May 3 14:00:13 2006
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Mapper bridge.
+ * It delegates operations to an external program via stdin and stdout.
+ * @author Michel Tourn
+ */
+public class PipeMapper extends PipeMapRed implements Mapper
+{
+
+ String getPipeCommand(JobConf job)
+ {
+ return job.get("stream.map.streamprocessor");
+ }
+
+ String getKeyColPropName()
+ {
+ return "mapKeyCols";
+ }
+
+
+ // Do NOT declare default constructor
+ // (MapRed creates it reflectively)
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException
+ {
+ // init
+ if(outThread_ == null) {
+ startOutputThreads(output, reporter);
+ }
+ try {
+ // 1/4 Hadoop in
+ mapredKey_ = key.toString();
+ numRecRead_++;
+
+ maybeLogRecord();
+
+ // 2/4 Hadoop to Tool
+ if(numExceptions_==0) {
+ clientOut_.writeBytes(mapredKey_);
+ clientOut_.writeBytes("\t");
+ clientOut_.writeBytes(value.toString());
+ clientOut_.writeBytes("\n");
+ clientOut_.flush();
+ } else {
+ numRecSkipped_++;
+ }
+ } catch(IOException io) {
+ numExceptions_++;
+ if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
+ // terminate with failure
+ String msg = logFailure(io);
+ appendLogToJobLog("failure");
+ throw new IOException(msg);
+ } else {
+ // terminate with success:
+ // swallow input records although the stream processor failed/closed
+ }
+ }
+ }
+
+
+ public void close()
+ {
+ appendLogToJobLog("success");
+ mapRedFinished();
+ }
+
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Wed May 3 14:00:13 2006
@@ -0,0 +1,87 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Reducer bridge.
+ * It delegates operations to an external program via stdin and stdout.
+ * @author Michel Tourn
+ */
+public class PipeReducer extends PipeMapRed implements Reducer
+{
+
+ String getPipeCommand(JobConf job)
+ {
+ return job.get("stream.reduce.streamprocessor");
+ }
+
+ String getKeyColPropName()
+ {
+ return "reduceKeyCols";
+ }
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ // init
+ if(doPipe_ && outThread_ == null) {
+ startOutputThreads(output, reporter);
+ }
+ try {
+ while (values.hasNext()) {
+ Writable val = (Writable)values.next();
+ numRecRead_++;
+ maybeLogRecord();
+ if(doPipe_) {
+ clientOut_.writeBytes(key.toString());
+ clientOut_.writeBytes("\t");
+ clientOut_.writeBytes(val.toString());
+ clientOut_.writeBytes("\n");
+ clientOut_.flush();
+ } else {
+ // "identity reduce"
+ output.collect(key, val);
+ }
+ }
+ } catch(IOException io) {
+ appendLogToJobLog("failure");
+ throw new IOException(getContext() + io.getMessage());
+ }
+ }
+
+ public void close()
+ {
+ appendLogToJobLog("success");
+ mapRedFinished();
+ }
+
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Wed May 3 14:00:13 2006
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.LogFormatter;
+
+/**
+ * Shared functionality for hadoopStreaming formats.
+ * A custom reader can be defined to be a RecordReader with the constructor below
+ * and is selected with the option bin/hadoopStreaming -inputreader ...
+ * @see StreamLineRecordReader
+ * @see StreamXmlRecordReader
+ * @author Michel Tourn
+ */
+public abstract class StreamBaseRecordReader implements RecordReader
+{
+
+ protected static final Logger LOG = LogFormatter.getLogger(StreamBaseRecordReader.class.getName());
+
+ public StreamBaseRecordReader(
+ FSDataInputStream in, long start, long end,
+ String splitName, Reporter reporter, JobConf job)
+ throws IOException
+ {
+ in_ = in;
+ start_ = start;
+ splitName_ = splitName;
+ end_ = end;
+ reporter_ = reporter;
+ job_ = job;
+ }
+
+ /** Called once before the first call to next */
+ public void init() throws IOException
+ {
+ seekNextRecordBoundary();
+ }
+
+ /** Implementation should seek forward in_ to the first byte of the next record.
+ * The initial byte offset in the stream is arbitrary.
+ */
+ public abstract void seekNextRecordBoundary() throws IOException;
+
+
+ /** Read a record. Implementation should call numRecStats at the end
+ */
+ public abstract boolean next(Writable key, Writable value) throws IOException;
+
+
+ void numRecStats(CharSequence record) throws IOException
+ {
+ numRec_++;
+ if(numRec_ == nextStatusRec_) {
+ nextStatusRec_ +=100000;//*= 10;
+ String status = getStatus(record);
+ LOG.info(status);
+ reporter_.setStatus(status);
+ }
+ }
+
+ long lastMem =0;
+ String getStatus(CharSequence record)
+ {
+ long pos = -1;
+ try {
+ pos = getPos();
+ } catch(IOException io) {
+ }
+ final int M = 2000;
+ String recStr;
+ if(record.length() > M) {
+ recStr = record.subSequence(0, M) + "...";
+ } else {
+ recStr = record.toString();
+ }
+ String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " Processing record=" + recStr;
+ status += " " + splitName_;
+ return status;
+ }
+
+ /** Returns the current position in the input. */
+ public synchronized long getPos() throws IOException
+ {
+ return in_.getPos();
+ }
+
+ /** Close this to future operations.*/
+ public synchronized void close() throws IOException
+ {
+ in_.close();
+ }
+
+ FSDataInputStream in_;
+ long start_;
+ long end_;
+ String splitName_;
+ Reporter reporter_;
+ JobConf job_;
+ int numRec_ = 0;
+ int nextStatusRec_ = 1;
+
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Wed May 3 14:00:13 2006
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.LogFormatter;
+
+
+/** An input format that performs globbing on DFS paths and
+ * selects a RecordReader based on a JobConf property.
+ * @author Michel Tourn
+ */
+public class StreamInputFormat extends InputFormatBase
+{
+
+ // an InputFormat should be public with the synthetic public default constructor
+ // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
+
+ protected static final Logger LOG = LogFormatter.getLogger(StreamInputFormat.class.getName());
+ static {
+ //LOG.setLevel(Level.FINE);
+ }
+
+ protected Path[] listPaths(FileSystem fs, JobConf job)
+ throws IOException
+ {
+ Path[] globs = job.getInputPaths();
+ ArrayList list = new ArrayList();
+ int dsup = globs.length;
+ for(int d=0; d<dsup; d++) {
+ String leafName = globs[d].getName();
+ LOG.fine("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
+ Path[] paths; Path dir;
+ PathFilter filter = new GlobFilter(fs, leafName);
+ dir = new Path(globs[d].getParent().toString());
+ if(dir == null) dir = new Path(".");
+ paths = fs.listPaths(dir, filter);
+ list.addAll(Arrays.asList(paths));
+ }
+ return (Path[])list.toArray(new Path[]{});
+ }
+
+ class GlobFilter implements PathFilter
+ {
+ public GlobFilter(FileSystem fs, String glob)
+ {
+ fs_ = fs;
+ pat_ = Pattern.compile(globToRegexp(glob));
+ }
+ String globToRegexp(String glob)
+ {
+ return glob.replaceAll("\\*", ".*");
+ }
+
+ public boolean accept(Path pathname)
+ {
+ boolean acc = !fs_.isChecksumFile(pathname);
+ if(acc) {
+ acc = pat_.matcher(pathname.getName()).matches();
+ }
+ LOG.finer("matches " + pat_ + ", " + pathname + " = " + acc);
+ return acc;
+ }
+
+ Pattern pat_;
+ FileSystem fs_;
+ }
+
+ public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
+ JobConf job, Reporter reporter)
+ throws IOException {
+ LOG.finer("getRecordReader start.....");
+ reporter.setStatus(split.toString());
+
+ final long start = split.getStart();
+ final long end = start + split.getLength();
+
+ String splitName = split.getFile() + ":" + start + "-" + end;
+ final FSDataInputStream in = fs.open(split.getFile());
+
+ // will open the file and seek to the start of the split
+ // Factory dispatch based on available params..
+ Class readerClass;
+ String c = job.get("stream.recordreader.class");
+ if(c == null) {
+ readerClass = StreamLineRecordReader.class;
+ } else {
+ readerClass = StreamUtil.goodClassOrNull(c, null);
+ if(readerClass == null) {
+ throw new RuntimeException("Class not found: " + c);
+ }
+ }
+
+ Constructor ctor;
+ try {
+ // reader = new StreamLineRecordReader(in, start, end, splitName, reporter, job);
+ ctor = readerClass.getConstructor(new Class[]{
+ FSDataInputStream.class, long.class, long.class, String.class, Reporter.class, JobConf.class});
+ } catch(NoSuchMethodException nsm) {
+ throw new RuntimeException(nsm);
+ }
+
+
+ StreamBaseRecordReader reader;
+ try {
+ reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
+ in, new Long(start), new Long(end), splitName, reporter, job});
+ } catch(Exception nsm) {
+ throw new RuntimeException(nsm);
+ }
+
+ reader.init();
+
+ return reader;
+ }
+
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Wed May 3 14:00:13 2006
@@ -0,0 +1,510 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.logging.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+
+import org.apache.hadoop.util.LogFormatter;
+
+/** All the client-side work happens here.
+ * (Jar packaging, MapRed job submission and monitoring)
+ * @author Michel Tourn
+ */
+public class StreamJob
+{
+ protected static final Logger LOG = LogFormatter.getLogger(StreamJob.class.getName());
+
+ public StreamJob(String[] argv, boolean mayExit)
+ {
+ argv_ = argv;
+ mayExit_ = mayExit;
+ }
+
+ public void go() throws IOException
+ {
+ init();
+
+ preProcessArgs();
+ parseArgv();
+ postProcessArgs();
+
+ setJobConf();
+ submitAndMonitorJob();
+ }
+
+ protected void init()
+ {
+ try {
+ env_ = new Environment();
+ } catch(IOException io) {
+ throw new RuntimeException(io);
+ }
+ }
+
+ void preProcessArgs()
+ {
+ verbose_ = false;
+ }
+
+ void postProcessArgs()
+ {
+ if(cluster_ == null) {
+ // hadoop-default.xml is standard, hadoop-local.xml is not.
+ cluster_ = "default";
+ }
+ hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml";
+ if(inputGlobs_.size() == 0) {
+ fail("Required argument: -input <name>");
+ }
+ if(output_ == null) {
+ fail("Required argument: -output ");
+ }
+ // careful with class names..
+ mapCmd_ = packageOrTrimNoShip(mapCmd_);
+ redCmd_ = packageOrTrimNoShip(redCmd_);
+
+ // TBD -D format or sthg on cmdline.
+ // Plus maybe a standard list originating on client or server
+ addTaskEnvironment_ = "";
+ }
+
+ String packageOrTrimNoShip(String cmd)
+ {
+ if(cmd == null) {
+ //
+ } else if(cmd.startsWith(NOSHIP)) {
+ // don't package the file, but keep the abolute path
+ cmd = cmd.substring(NOSHIP.length());
+ } else {
+ String prog = cmd;
+ String args = "";
+ int s = cmd.indexOf(" ");
+ if(s != -1) {
+ prog = cmd.substring(0, s);
+ args = cmd.substring(s+1);
+ }
+ packageFiles_.add(new File(prog).getAbsolutePath());
+ // Change path to simple filename.
+ // That way when PipeMapRed calls Runtime.exec(),
+ // it will look for the excutable in Task's working dir.
+ // And this is where TaskRunner unjars our job jar.
+ prog = new File(prog).getName();
+ if(args.length() > 0) {
+ cmd = prog + " " + args;
+ } else {
+ cmd = prog;
+ }
+ }
+ return cmd;
+ }
+
+ String getHadoopAliasConfFile()
+ {
+ return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
+ }
+
+ void parseArgv()
+ {
+ if(argv_.length==0) {
+ exitUsage();
+ }
+ int i=0;
+ while(i < argv_.length) {
+ String s;
+ if(argv_[i].equals("-verbose")) {
+ verbose_ = true;
+ } else if(argv_[i].equals("-debug")) {
+ debug_++;
+ } else if((s = optionArg(argv_, i, "-input", false)) != null) {
+ i++;
+ inputGlobs_.add(s);
+ } else if((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
+ i++;
+ output_ = s;
+ } else if((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
+ i++;
+ mapCmd_ = s;
+ } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
+ i++;
+ redCmd_ = s;
+ } else if((s = optionArg(argv_, i, "-files", false)) != null) {
+ i++;
+ packageFiles_.add(s);
+ } else if((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
+ i++;
+ cluster_ = s;
+ } else if((s = optionArg(argv_, i, "-config", false)) != null) {
+ i++;
+ configPath_.add(s);
+ } else if((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
+ i++;
+ inReaderSpec_ = s;
+ } else {
+ System.err.println("Unexpected argument: " + argv_[i]);
+ exitUsage();
+ }
+ i++;
+ }
+ }
+
+ String optionArg(String[] args, int index, String arg, boolean argSet)
+ {
+ if(index >= args.length || ! args[index].equals(arg)) {
+ return null;
+ }
+ if(argSet) {
+ throw new IllegalArgumentException("Can only have one " + arg + " option");
+ }
+ if(index >= args.length-1) {
+ throw new IllegalArgumentException("Expected argument after option " + args[index]);
+ }
+ return args[index+1];
+ }
+
+ protected void msg(String msg)
+ {
+ if(verbose_) {
+ System.out.println("STREAM: " + msg);
+ }
+ }
+
+ public void exitUsage()
+ {
+ // 1 2 3 4 5 6 7
+ //1234567890123456789012345678901234567890123456789012345678901234567890123456789
+ System.out.println("Usage: bin/hadoop jar build/hadoop-streaming.jar [options]");
+ System.out.println("Options:");
+ System.out.println(" -input <path> DFS input file(s) for the Map step");
+ System.out.println(" -output <path> DFS output directory for the Reduce step");
+ System.out.println(" -mapper <cmd> The streaming command to run");
+ System.out.println(" -reducer <cmd> The streaming command to run");
+ System.out.println(" -files <file> Additional files to be shipped in the Job jar file");
+ System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
+ System.out.println(" -config <file> Optional. One or more paths to xml config files");
+ System.out.println(" -inputreader <spec> Optional. See below");
+ System.out.println(" -verbose");
+ System.out.println();
+ System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
+ System.out.println("Default Map input format: a line is a record in UTF-8");
+ System.out.println(" the key part ends at first TAB, the rest of the line is the value");
+ System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
+ System.out.println(" comma-separated name-values can be specified to configure the InputFormat");
+ System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
+ System.out.println("Map output format, reduce input/output format:");
+ System.out.println(" Format defined by what mapper command outputs. Line-oriented");
+ System.out.println("Mapper and Reducer <cmd> syntax: ");
+ System.out.println(" If the mapper or reducer programs are prefixed with " + NOSHIP + " then ");
+ System.out.println(" the paths are assumed to be valid absolute paths on the task tracker machines");
+ System.out.println(" and are NOT packaged with the Job jar file.");
+ System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
+ System.out.println(" Hadoop clusters. ");
+ System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml");
+ System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
+ System.out.println();
+ System.out.println("Example: hadoopStreaming -mapper \"noship:/usr/local/bin/perl5 filter.pl\"");
+ System.out.println(" -files /local/filter.pl -input \"/logs/0604*/*\" [...]");
+ System.out.println(" Ships a script, invokes the non-shipped perl interpreter");
+ System.out.println(" Shipped files go to the working directory so filter.pl is found by perl");
+ System.out.println(" Input files are all the daily logs for days in month 2006-04");
+ fail("");
+ }
+
+ public void fail(String message)
+ {
+ if(mayExit_) {
+ System.err.println(message);
+ System.exit(1);
+ } else {
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ // --------------------------------------------
+
+
+ protected String getHadoopClientHome()
+ {
+ String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
+ if(h == null) {
+ //fail("Missing required environment variable: HADOOP_HOME");
+ h = "UNDEF";
+ }
+ return h;
+ }
+
+
+ protected boolean isLocalHadoop()
+ {
+ boolean local;
+ if(jobConf_ == null) {
+ local = getClusterNick().equals("local");
+ } else {
+ local = jobConf_.get("mapred.job.tracker", "").equals("local");
+ }
+ return local;
+ }
+ protected String getClusterNick()
+ {
+ return cluster_;
+ }
+
+ /** @return path to the created Jar file or null if no files are necessary.
+ */
+ protected String packageJobJar() throws IOException
+ {
+ ArrayList unjarFiles = new ArrayList();
+
+ // Runtime code: ship same version of code as self (job submitter code)
+ // usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar
+ String runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
+ if(runtimeClasses == null) {
+ throw new IOException("runtime classes not found: " + getClass().getPackage());
+ } else {
+ msg("Found runtime classes in: " + runtimeClasses);
+ }
+ if(isLocalHadoop()) {
+ // don't package class files (they might get unpackaged in . and then
+ // hide the intended CLASSPATH entry)
+ // we still package everything else (so that scripts and executable are found in
+ // Task workdir like distributed Hadoop)
+ } else {
+ if(new File(runtimeClasses).isDirectory()) {
+ packageFiles_.add(runtimeClasses);
+ } else {
+ unjarFiles.add(runtimeClasses);
+ }
+ }
+ if(packageFiles_.size() + unjarFiles.size()==0) {
+ return null;
+ }
+ File jobJar = File.createTempFile("streamjob", ".jar");
+ System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);
+ if(debug_ == 0) {
+ jobJar.deleteOnExit();
+ }
+ JarBuilder builder = new JarBuilder();
+ if(verbose_) {
+ builder.setVerbose(true);
+ }
+ String jobJarName = jobJar.getAbsolutePath();
+ builder.merge(packageFiles_, unjarFiles, jobJarName);
+ return jobJarName;
+ }
+
+ protected void setJobConf() throws IOException
+ {
+ msg("hadoopAliasConf_ = " + hadoopAliasConf_);
+ config_ = new Configuration();
+ if(!cluster_.equals("default")) {
+ config_.addFinalResource(new Path(getHadoopAliasConfFile()));
+ } else {
+ // use only defaults: hadoop-default.xml and hadoop-site.xml
+ }
+ Iterator it = configPath_.iterator();
+ while(it.hasNext()) {
+ String pathName = (String)it.next();
+ config_.addFinalResource(new Path(pathName));
+ }
+ // general MapRed job properties
+ jobConf_ = new JobConf(config_);
+ for(int i=0; i<inputGlobs_.size(); i++) {
+ jobConf_.addInputDir(new File((String)inputGlobs_.get(i)));
+ }
+
+ jobConf_.setInputFormat(StreamInputFormat.class);
+ jobConf_.setInputKeyClass(UTF8.class);
+ jobConf_.setInputValueClass(UTF8.class);
+ jobConf_.setOutputKeyClass(UTF8.class);
+ jobConf_.setOutputValueClass(UTF8.class);
+ //jobConf_.setCombinerClass();
+
+ jobConf_.setOutputDir(new File(output_));
+ jobConf_.setOutputFormat(StreamOutputFormat.class);
+
+ jobConf_.set("stream.addenvironment", addTaskEnvironment_);
+
+ String defaultPackage = this.getClass().getPackage().getName();
+
+ Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
+ if(c != null) {
+ jobConf_.setMapperClass(c);
+ } else {
+ jobConf_.setMapperClass(PipeMapper.class);
+ jobConf_.set("stream.map.streamprocessor", mapCmd_);
+ }
+
+ if(redCmd_ != null) {
+ c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
+ if(c != null) {
+ jobConf_.setReducerClass(c);
+ } else {
+ jobConf_.setReducerClass(PipeReducer.class);
+ jobConf_.set("stream.reduce.streamprocessor", redCmd_);
+ }
+ }
+
+ if(inReaderSpec_ != null) {
+ String[] args = inReaderSpec_.split(",");
+ String readerClass = args[0];
+ // this argument can only be a Java class
+ c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
+ if(c != null) {
+ jobConf_.set("stream.recordreader.class", c.getName());
+ } else {
+ fail("-inputreader: class not found: " + readerClass);
+ }
+ for(int i=1; i<args.length; i++) {
+ String[] nv = args[i].split("=", 2);
+ String k = "stream.recordreader." + nv[0];
+ String v = (nv.length>1) ? nv[1] : "";
+ jobConf_.set(k, v);
+ }
+ }
+
+ jar_ = packageJobJar();
+ if(jar_ != null) {
+ jobConf_.setJar(jar_);
+ }
+ //jobConf_.mtdump();System.exit(1);
+ }
+
+ protected String getJobTrackerHostPort()
+ {
+ return jobConf_.get("mapred.job.tracker");
+ }
+
+ protected void jobInfo()
+ {
+ if(isLocalHadoop()) {
+ LOG.info("Job running in-process (local Hadoop)");
+ } else {
+ String hp = getJobTrackerHostPort();
+ LOG.info("To kill this job, run:");
+ LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill " + jobId_);
+ //LOG.info("Job file: " + running_.getJobFile() );
+ LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
+ }
+ }
+
+ // Based on JobClient
+ public void submitAndMonitorJob() throws IOException {
+
+ if(jar_ != null && isLocalHadoop()) {
+ // getAbs became required when shell and subvm have different working dirs...
+ File wd = new File(".").getAbsoluteFile();
+ StreamUtil.unJar(new File(jar_), wd);
+ }
+
+ // if jobConf_ changes must recreate a JobClient
+ jc_ = new JobClient(jobConf_);
+ boolean error = true;
+ running_ = null;
+ String lastReport = null;
+ try {
+ running_ = jc_.submitJob(jobConf_);
+ jobId_ = running_.getJobID();
+
+ LOG.info("getLocalDirs(): " + Arrays.toString(jobConf_.getLocalDirs()));
+ LOG.info("Running job: " + jobId_);
+ jobInfo();
+
+ while (!running_.isComplete()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+ running_ = jc_.getJob(jobId_);
+ String report = null;
+ report = " map "+Math.round(running_.mapProgress()*100)
+ +"% reduce " + Math.round(running_.reduceProgress()*100)+"%";
+
+ if (!report.equals(lastReport)) {
+ LOG.info(report);
+ lastReport = report;
+ }
+ }
+ if (!running_.isSuccessful()) {
+ jobInfo();
+ throw new IOException("Job not Successful!");
+ }
+ LOG.info("Job complete: " + jobId_);
+ LOG.info("Output: " + output_);
+ error = false;
+ } finally {
+ if (error && (running_ != null)) {
+ LOG.info("killJob...");
+ running_.killJob();
+ }
+ jc_.close();
+ }
+ }
+
+
+ public final static String NOSHIP = "noship:";
+
+ protected boolean mayExit_;
+ protected String[] argv_;
+ protected boolean verbose_;
+ protected int debug_;
+
+ protected Environment env_;
+
+ protected String jar_;
+ protected boolean localHadoop_;
+ protected Configuration config_;
+ protected JobConf jobConf_;
+ protected JobClient jc_;
+
+ // command-line arguments
+ protected ArrayList inputGlobs_ = new ArrayList(); // <String>
+ protected ArrayList packageFiles_ = new ArrayList(); // <String>
+ protected String output_;
+ protected String mapCmd_;
+ protected String redCmd_;
+ protected String cluster_;
+ protected ArrayList configPath_ = new ArrayList(); // <String>
+ protected String hadoopAliasConf_;
+ protected String inReaderSpec_;
+
+
+ // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
+ // encoding "a=b c=d"
+ protected String addTaskEnvironment_;
+
+ protected boolean outputSingleNode_;
+ protected long minRecWrittenToEnableSkip_;
+
+ protected RunningJob running_;
+ protected String jobId_;
+
+
+}
+
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Wed May 3 14:00:13 2006
@@ -0,0 +1,114 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Similar to org.apache.hadoop.mapred.TextRecordReader,
+ * but delimits key and value with a TAB.
+ * @author Michel Tourn
+ */
+public class StreamLineRecordReader extends StreamBaseRecordReader
+{
+
+ public StreamLineRecordReader(
+ FSDataInputStream in, long start, long end,
+ String splitName, Reporter reporter, JobConf job)
+ throws IOException
+ {
+ super(in, start, end, splitName, reporter, job);
+ }
+
+ public void seekNextRecordBoundary() throws IOException
+ {
+ int bytesSkipped = 0;
+ if (start_ != 0) {
+ in_.seek(start_ - 1);
+ // scan to the next newline in the file
+ while (in_.getPos() < end_) {
+ char c = (char)in_.read();
+ bytesSkipped++;
+ if (c == '\r' || c == '\n') {
+ break;
+ }
+ }
+ }
+
+ System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+ }
+
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ long pos = in_.getPos();
+ if (pos >= end_)
+ return false;
+
+ //((LongWritable)key).set(pos); // key is position
+ //((UTF8)value).set(readLine(in)); // value is line
+ String line = readLine(in_);
+
+ // key is line up to TAB, value is rest
+ final boolean NOVAL = false;
+ if(NOVAL) {
+ ((UTF8)key).set(line);
+ ((UTF8)value).set("");
+ } else {
+ int tab = line.indexOf('\t');
+ if(tab == -1) {
+ ((UTF8)key).set(line);
+ ((UTF8)value).set("");
+ } else {
+ ((UTF8)key).set(line.substring(0, tab));
+ ((UTF8)value).set(line.substring(tab+1));
+ }
+ }
+ numRecStats(line);
+ return true;
+ }
+
+
+ // from TextInputFormat
+ private static String readLine(FSDataInputStream in) throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ while (true) {
+
+ int b = in.read();
+ if (b == -1)
+ break;
+
+ char c = (char)b; // bug: this assumes eight-bit characters.
+ if (c == '\r' || c == '\n') // TODO || c == '\t' here
+ break;
+
+ buffer.append(c);
+ }
+
+ return buffer.toString();
+ }
+
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Wed May 3 14:00:13 2006
@@ -0,0 +1,71 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.mapred.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** Similar to org.apache.hadoop.mapred.TextOutputFormat,
+ * but delimits key and value with a TAB.
+ * @author Michel Tourn
+ */
+public class StreamOutputFormat implements OutputFormat {
+
+ public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+ String name) throws IOException {
+
+ File file = new File(job.getOutputDir(), name);
+
+ final FSDataOutputStream out = fs.create(file);
+
+ return new RecordWriter() {
+ public synchronized void write(WritableComparable key, Writable value)
+ throws IOException {
+ out.write(key.toString().getBytes("UTF-8"));
+ out.writeByte('\t');
+ out.write(value.toString().getBytes("UTF-8"));
+ out.writeByte('\n');
+ }
+ public synchronized void close(Reporter reporter) throws IOException {
+ out.close();
+ }
+ };
+ }
+
+
+ /** Check whether the output specification for a job is appropriate. Called
+ * when a job is submitted. Typically checks that it does not already exist,
+ * throwing an exception when it already exists, so that output is not
+ * overwritten.
+ *
+ * @param job the job whose output will be written
+ * @throws IOException when output should not be attempted
+ */
+ public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
+ {
+ // allow existing data (for app-level restartability)
+ }
+
+}