You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/14 09:55:16 UTC
svn commit: r421829 [1/2] - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Author: cutting
Date: Fri Jul 14 00:55:15 2006
New Revision: 421829
URL: http://svn.apache.org/viewvc?rev=421829&view=rev
Log:
HADOOP-361. Remove unix dependencies from streaming contrib module tests. Contributed by Michel.
Added:
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Jul 14 00:55:15 2006
@@ -31,6 +31,9 @@
protocol. This lets one, e.g., more easily copy log files into
DFS. (Arun C Murthy via cutting)
+ 9. HADOOP-361. Remove unix dependencies from streaming contrib
+ module tests, making them pure java. (Michel Tourn via cutting)
+
Release 0.4.0 - 2006-06-28
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java?rev=421829&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java Fri Jul 14 00:55:15 2006
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/** A generic Combiner bridge.<br>
+ * To use a Combiner specify -combiner myprogram in hadoopStreaming.
+ * It delegates operations to an external program via stdin and stdout.
+ * In one run of the external program, you can expect all records with
+ * the same key to appear together.
+ * You should not make assumptions about how many times the combiner is
+ * run on your data.
+ * Ideally the combiner and the reducer are the same program, the combiner
+ * partially aggregates the data zero or more times and the reducer
+ * applies the last aggregation pass.
+ * Do not use a Combiner if your reduce logic does not suport
+ * such a multipass aggregation.
+ * @author Michel Tourn
+ */
+public class PipeCombiner extends PipeReducer
+{
+
+ String getPipeCommand(JobConf job)
+ {
+ return job.get("stream.combine.streamprocessor");
+ }
+
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Fri Jul 14 00:55:15 2006
@@ -1,566 +1,566 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-import java.nio.channels.*;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Map;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Properties;
-import java.util.regex.*;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-/** Shared functionality for PipeMapper, PipeReducer.
- * @author Michel Tourn
- */
-public abstract class PipeMapRed {
-
- protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
-
- /** The command to be spawned as a subprocess.
- * Mapper/Reducer operations will delegate to it
- */
- abstract String getPipeCommand(JobConf job);
- /*
- */
- abstract String getKeyColPropName();
-
- /** Write output as side-effect files rather than as map outputs.
- This is useful to do "Map" tasks rather than "MapReduce" tasks. */
- boolean getUseSideEffect()
- {
- return false;
- }
-
- /**
- * @returns how many TABS before the end of the key part
- * usually: 1 or "ALL"
- * used for tool output of both Map and Reduce
- * configured via tool's argv: splitKeyVal=ALL or 1..
- * although it is interpreted here, not by tool
- */
- int getKeyColsFromPipeCommand(String cmd)
- {
- String key = getKeyColPropName();
- Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
- Matcher match = kcPat.matcher(cmd);
- String kc;
- if(!match.matches()) {
- kc = null;
- } else {
- kc = match.group(1);
- }
-
- int cols;
- if(kc== null) {
- // default value is 1 and the Stream applications could instead
- // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
- cols = 1;
- } else if(kc.equals("ALL")) {
- cols = ALL_COLS;
- } else {
- try {
- cols = Integer.parseInt(kc);
- } catch(NumberFormatException nf) {
- cols = Integer.MAX_VALUE;
- }
- }
-
- System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
-
- return cols;
- }
-
- final static int OUTSIDE = 1;
- final static int SINGLEQ = 2;
- final static int DOUBLEQ = 3;
-
- static String[] splitArgs(String args)
- {
- ArrayList argList = new ArrayList();
- char[] ch = args.toCharArray();
- int clen = ch.length;
- int state = OUTSIDE;
- int argstart = 0;
- for(int c=0; c<=clen; c++) {
- boolean last = (c==clen);
- int lastState = state;
- boolean endToken = false;
- if(!last) {
- if(ch[c]=='\'') {
- if(state == OUTSIDE) {
- state = SINGLEQ;
- } else if(state == SINGLEQ) {
- state = OUTSIDE;
- }
- endToken = (state != lastState);
- } else if(ch[c]=='"') {
- if(state == OUTSIDE) {
- state = DOUBLEQ;
- } else if(state == DOUBLEQ) {
- state = OUTSIDE;
- }
- endToken = (state != lastState);
- } else if(ch[c]==' ') {
- if(state == OUTSIDE) {
- endToken = true;
- }
- }
- }
- if(last || endToken) {
- if(c == argstart) {
- // unquoted space
- } else {
- String a;
- a = args.substring(argstart, c);
- argList.add(a);
- }
- argstart = c+1;
- lastState = state;
- }
- }
- return (String[])argList.toArray(new String[0]);
- }
-
- public void configure(JobConf job)
- {
-
- try {
- String argv = getPipeCommand(job);
- keyCols_ = getKeyColsFromPipeCommand(argv);
-
- job_ = job;
-
- // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
- doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
- if(!doPipe_) return;
-
- setStreamJobDetails(job);
- setStreamProperties();
-
- String[] argvSplit = splitArgs(argv);
- String prog = argvSplit[0];
- String userdir = System.getProperty("user.dir");
- if(new File(prog).isAbsolute()) {
- // we don't own it. Hope it is executable
- } else {
- new MustangFile(prog).setExecutable(true, true);
- }
-
-
- if(job_.getInputValueClass().equals(BytesWritable.class)) {
- // TODO expose as separate config:
- // job or semistandard inputformat property
- optUseKey_ = false;
- }
-
- optSideEffect_ = getUseSideEffect();
-
- if(optSideEffect_) {
- String fileName = job_.get("mapred.task.id");
- sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
- FileSystem fs = FileSystem.get(job_);
- sideEffectOut_ = fs.create(sideEffectPath_);
- }
-
- // argvSplit[0]:
- // An absolute path should be a preexisting valid path on all TaskTrackers
- // A relative path should match in the unjarred Job data
- // In this case, force an absolute path to make sure exec finds it.
- argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
- logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
- logprintln("sideEffectPath_=" + sideEffectPath_);
-
- Environment childEnv = (Environment)StreamUtil.env().clone();
- addEnvironment(childEnv, job_.get("stream.addenvironment"));
- sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
-
- /* // This way required jdk1.5
- ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
- Map<String, String> env = processBuilder.environment();
- addEnvironment(env, job_.get("stream.addenvironment"));
- sim = processBuilder.start();
- */
-
- clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
- clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
- clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
- doneLock_ = new Object();
- startTime_ = System.currentTimeMillis();
-
- } catch(Exception e) {
- e.printStackTrace();
- e.printStackTrace(log_);
- }
- }
-
- void setStreamJobDetails(JobConf job)
- {
- jobLog_ = job.get("stream.jobLog_");
- String s = job.get("stream.minRecWrittenToEnableSkip_");
- if(s != null) {
- minRecWrittenToEnableSkip_ = Long.parseLong(s);
- logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
- }
- }
-
- void setStreamProperties()
- {
- taskid_ = System.getProperty("stream.taskid");
- if(taskid_ == null) {
- taskid_ = "noid" + System.currentTimeMillis();
- }
- String s = System.getProperty("stream.port");
- if(s != null) {
- reportPortPlusOne_ = Integer.parseInt(s);
- }
-
- }
-
- void logprintln(String s)
- {
- if(log_ != null) {
- log_.println(s);
- } else {
- System.err.println(s); // or LOG.info()
- }
- }
-
- void logflush()
- {
- if(log_ != null) {
- log_.flush();
- }
- }
-
- void addEnvironment(Properties env, String nameVals)
- {
- // encoding "a=b c=d" from StreamJob
- if(nameVals == null) return;
- String[] nv = nameVals.split(" ");
- for(int i=0; i<nv.length; i++) {
- String[] pair = nv[i].split("=", 2);
- if(pair.length != 2) {
- logprintln("Skip ev entry:" + nv[i]);
- } else {
- logprintln("Add ev entry:" + nv[i]);
- env.put(pair[0], pair[1]);
- }
- }
- }
-
- /** .. and if successful: delete the task log */
- void appendLogToJobLog(String status)
- {
- if(jobLog_ == null) {
- return; // not using a common joblog
- }
- StreamUtil.exec("/bin/rm " + LOGNAME, log_);
- // TODO socket-based aggregator (in JobTrackerInfoServer)
- }
-
-
- void startOutputThreads(OutputCollector output, Reporter reporter)
- {
- outputDone_ = false;
- errorDone_ = false;
- outThread_ = new MROutputThread(output, reporter);
- outThread_.start();
- errThread_ = new MRErrorThread(reporter);
- errThread_.start();
- }
-
- void splitKeyVal(String line, UTF8 key, UTF8 val)
- {
- int pos;
- if(keyCols_ == ALL_COLS) {
- pos = -1;
- } else {
- pos = line.indexOf('\t');
- }
- if(pos == -1) {
- key.set(line);
- val.set("");
- } else {
- key.set(line.substring(0, pos));
- val.set(line.substring(pos+1));
- }
- }
-
- class MROutputThread extends Thread
- {
- MROutputThread(OutputCollector output, Reporter reporter)
- {
- setDaemon(true);
- this.output = output;
- this.reporter = reporter;
- }
- public void run() {
- try {
- try {
- UTF8 EMPTY = new UTF8("");
- UTF8 key = new UTF8();
- UTF8 val = new UTF8();
- // 3/4 Tool to Hadoop
- while((answer = clientIn_.readLine()) != null) {
- // 4/4 Hadoop out
- if(optSideEffect_) {
- sideEffectOut_.write(answer.getBytes());
- sideEffectOut_.write('\n');
- } else {
- splitKeyVal(answer, key, val);
- output.collect(key, val);
- numRecWritten_++;
- if(numRecWritten_ % 100 == 0) {
- logprintln(numRecRead_+"/"+numRecWritten_);
- logflush();
- }
- }
- }
- } catch(IOException io) {
- io.printStackTrace(log_);
- }
- logprintln("MROutputThread done");
- } finally {
- outputDone_ = true;
- synchronized(doneLock_) {
- doneLock_.notifyAll();
- }
- }
- }
- OutputCollector output;
- Reporter reporter;
- String answer;
- }
-
- class MRErrorThread extends Thread
- {
- public MRErrorThread(Reporter reporter)
- {
- this.reporter = reporter;
- setDaemon(true);
- }
- public void run()
- {
- String line;
- try {
- long num = 0;
- int bucket = 100;
- while((line=clientErr_.readLine()) != null) {
- num++;
- logprintln(line);
- if(num < 10) {
- String hline = "MRErr: " + line;
- System.err.println(hline);
- reporter.setStatus(hline);
- }
- }
- } catch(IOException io) {
- io.printStackTrace(log_);
- } finally {
- errorDone_ = true;
- synchronized(doneLock_) {
- doneLock_.notifyAll();
- }
- }
- }
- Reporter reporter;
- }
-
- public void mapRedFinished()
- {
- logprintln("mapRedFinished");
- try {
- if(!doPipe_) return;
- try {
- if(optSideEffect_) {
- logprintln("closing " + sideEffectPath_);
- sideEffectOut_.close();
- logprintln("closed " + sideEffectPath_);
- }
- } catch(IOException io) {
- io.printStackTrace();
- }
- try {
- if(clientOut_ != null) {
- clientOut_.close();
- }
- } catch(IOException io) {
- }
- if(outThread_ == null) {
- // no input records: threads were never spawned
- } else {
- try {
- while(!outputDone_ || !errorDone_) {
- synchronized(doneLock_) {
- doneLock_.wait();
- }
- }
- } catch(InterruptedException ie) {
- ie.printStackTrace();
- }
- }
- sim.destroy();
- } catch(RuntimeException e) {
- e.printStackTrace(log_);
- throw e;
- }
- }
-
- void maybeLogRecord()
- {
- if(numRecRead_ >= nextRecReadLog_) {
- String info = numRecInfo();
- logprintln(info);
- logflush();
- System.err.println(info);
- //nextRecReadLog_ *= 10;
- nextRecReadLog_ += 100;
- }
- }
-
- public String getContext()
- {
-
- String s = numRecInfo() + "\n";
- s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
- s += "LOGNAME=" + LOGNAME + "\n";
- s += envline("HOST");
- s += envline("USER");
- s += envline("HADOOP_USER");
- //s += envline("PWD"); // =/home/crawler/hadoop/trunk
- s += "last Hadoop input: |" + mapredKey_ + "|\n";
- s += "last tool output: |" + outThread_.answer + "|\n";
- s += "Date: " + new Date() + "\n";
- // s += envline("HADOOP_HOME");
- // s += envline("REMOTE_HOST");
- return s;
- }
-
- String envline(String var)
- {
- return var + "=" + StreamUtil.env().get(var) + "\n";
- }
-
- String numRecInfo()
- {
- long elapsed = (System.currentTimeMillis() - startTime_)/1000;
- long total = numRecRead_+numRecWritten_+numRecSkipped_;
- return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
- + " in:" + safeDiv(numRecRead_, elapsed) + " [rec/s]"
- + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
- }
- String safeDiv(long n, long d)
- {
- return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
- }
- String logFailure(Exception e)
- {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
- logprintln(msg);
- return msg;
- }
-
-
- long startTime_;
- long numRecRead_ = 0;
- long numRecWritten_ = 0;
- long numRecSkipped_ = 0;
- long nextRecReadLog_ = 1;
-
- long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
-
- int keyCols_;
- final static int ALL_COLS = Integer.MAX_VALUE;
-
- JobConf job_;
-
- // generic MapRed parameters passed on by hadoopStreaming
- String taskid_;
- int reportPortPlusOne_;
-
- boolean doPipe_;
-
- Process sim;
- Object doneLock_;
- MROutputThread outThread_;
- MRErrorThread errThread_;
- boolean outputDone_;
- boolean errorDone_;
- DataOutputStream clientOut_;
- DataInputStream clientErr_;
- DataInputStream clientIn_;
-
- String jobLog_;
- // set in PipeMapper/PipeReducer subclasses
- String mapredKey_;
- int numExceptions_;
-
- boolean optUseKey_ = true;
-
- boolean optSideEffect_;
- Path sideEffectPath_;
- FSDataOutputStream sideEffectOut_;
-
- String LOGNAME;
- PrintStream log_;
-
- /* curr. going to stderr so that it is preserved
- { // instance initializer
- try {
- int id = (int)((System.currentTimeMillis()/2000) % 10);
- String sid = id+ "." + StreamUtil.env().get("USER");
- LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
- log_ = new PrintStream(new FileOutputStream(LOGNAME));
- logprintln(new java.util.Date());
- logflush();
- } catch(IOException io) {
- System.err.println("LOGNAME=" + LOGNAME);
- io.printStackTrace();
- } finally {
- if(log_ == null) {
- log_ = System.err;
- }
- }
- }
- */
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.nio.channels.*;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.regex.*;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+/** Shared functionality for PipeMapper, PipeReducer.
+ * @author Michel Tourn
+ */
+public abstract class PipeMapRed {
+
+ protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
+
+ /** The command to be spawned as a subprocess.
+ * Mapper/Reducer operations will delegate to it
+ */
+ abstract String getPipeCommand(JobConf job);
+ /*
+ */
+ abstract String getKeyColPropName();
+
+ /** Write output as side-effect files rather than as map outputs.
+ This is useful to do "Map" tasks rather than "MapReduce" tasks. */
+ boolean getUseSideEffect()
+ {
+ return false;
+ }
+
+ /**
+ * @returns how many TABS before the end of the key part
+ * usually: 1 or "ALL"
+ * used for tool output of both Map and Reduce
+ * configured via tool's argv: splitKeyVal=ALL or 1..
+ * although it is interpreted here, not by tool
+ */
+ int getKeyColsFromPipeCommand(String cmd)
+ {
+ String key = getKeyColPropName();
+ Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*");
+ Matcher match = kcPat.matcher(cmd);
+ String kc;
+ if(!match.matches()) {
+ kc = null;
+ } else {
+ kc = match.group(1);
+ }
+
+ int cols;
+ if(kc== null) {
+ // default value is 1 and the Stream applications could instead
+ // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL
+ cols = 1;
+ } else if(kc.equals("ALL")) {
+ cols = ALL_COLS;
+ } else {
+ try {
+ cols = Integer.parseInt(kc);
+ } catch(NumberFormatException nf) {
+ cols = Integer.MAX_VALUE;
+ }
+ }
+
+ System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd);
+
+ return cols;
+ }
+
+ final static int OUTSIDE = 1;
+ final static int SINGLEQ = 2;
+ final static int DOUBLEQ = 3;
+
+ static String[] splitArgs(String args)
+ {
+ ArrayList argList = new ArrayList();
+ char[] ch = args.toCharArray();
+ int clen = ch.length;
+ int state = OUTSIDE;
+ int argstart = 0;
+ for(int c=0; c<=clen; c++) {
+ boolean last = (c==clen);
+ int lastState = state;
+ boolean endToken = false;
+ if(!last) {
+ if(ch[c]=='\'') {
+ if(state == OUTSIDE) {
+ state = SINGLEQ;
+ } else if(state == SINGLEQ) {
+ state = OUTSIDE;
+ }
+ endToken = (state != lastState);
+ } else if(ch[c]=='"') {
+ if(state == OUTSIDE) {
+ state = DOUBLEQ;
+ } else if(state == DOUBLEQ) {
+ state = OUTSIDE;
+ }
+ endToken = (state != lastState);
+ } else if(ch[c]==' ') {
+ if(state == OUTSIDE) {
+ endToken = true;
+ }
+ }
+ }
+ if(last || endToken) {
+ if(c == argstart) {
+ // unquoted space
+ } else {
+ String a;
+ a = args.substring(argstart, c);
+ argList.add(a);
+ }
+ argstart = c+1;
+ lastState = state;
+ }
+ }
+ return (String[])argList.toArray(new String[0]);
+ }
+
+ public void configure(JobConf job)
+ {
+
+ try {
+ String argv = getPipeCommand(job);
+ keyCols_ = getKeyColsFromPipeCommand(argv);
+
+ job_ = job;
+
+ // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
+ doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
+ if(!doPipe_) return;
+
+ setStreamJobDetails(job);
+ setStreamProperties();
+
+ String[] argvSplit = splitArgs(argv);
+ String prog = argvSplit[0];
+ String userdir = System.getProperty("user.dir");
+ if(new File(prog).isAbsolute()) {
+ // we don't own it. Hope it is executable
+ } else {
+ new MustangFile(prog).setExecutable(true, true);
+ }
+
+
+ if(job_.getInputValueClass().equals(BytesWritable.class)) {
+ // TODO expose as separate config:
+ // job or semistandard inputformat property
+ optUseKey_ = false;
+ }
+
+ optSideEffect_ = getUseSideEffect();
+
+ if(optSideEffect_) {
+ String fileName = job_.get("mapred.task.id");
+ sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
+ FileSystem fs = FileSystem.get(job_);
+ sideEffectOut_ = fs.create(sideEffectPath_);
+ }
+
+ // argvSplit[0]:
+ // An absolute path should be a preexisting valid path on all TaskTrackers
+ // A relative path should match in the unjarred Job data
+ // In this case, force an absolute path to make sure exec finds it.
+ argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
+ logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
+ logprintln("sideEffectPath_=" + sideEffectPath_);
+
+ Environment childEnv = (Environment)StreamUtil.env().clone();
+ addEnvironment(childEnv, job_.get("stream.addenvironment"));
+ sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
+
+ /* // This way required jdk1.5
+ ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
+ Map<String, String> env = processBuilder.environment();
+ addEnvironment(env, job_.get("stream.addenvironment"));
+ sim = processBuilder.start();
+ */
+
+ clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
+ clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+ clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
+ doneLock_ = new Object();
+ startTime_ = System.currentTimeMillis();
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ e.printStackTrace(log_);
+ }
+ }
+
+ void setStreamJobDetails(JobConf job)
+ {
+ jobLog_ = job.get("stream.jobLog_");
+ String s = job.get("stream.minRecWrittenToEnableSkip_");
+ if(s != null) {
+ minRecWrittenToEnableSkip_ = Long.parseLong(s);
+ logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+ }
+ }
+
+ void setStreamProperties()
+ {
+ taskid_ = System.getProperty("stream.taskid");
+ if(taskid_ == null) {
+ taskid_ = "noid" + System.currentTimeMillis();
+ }
+ String s = System.getProperty("stream.port");
+ if(s != null) {
+ reportPortPlusOne_ = Integer.parseInt(s);
+ }
+
+ }
+
+ void logprintln(String s)
+ {
+ if(log_ != null) {
+ log_.println(s);
+ } else {
+ System.err.println(s); // or LOG.info()
+ }
+ }
+
+ void logflush()
+ {
+ if(log_ != null) {
+ log_.flush();
+ }
+ }
+
+ void addEnvironment(Properties env, String nameVals)
+ {
+ // encoding "a=b c=d" from StreamJob
+ if(nameVals == null) return;
+ String[] nv = nameVals.split(" ");
+ for(int i=0; i<nv.length; i++) {
+ String[] pair = nv[i].split("=", 2);
+ if(pair.length != 2) {
+ logprintln("Skip ev entry:" + nv[i]);
+ } else {
+ logprintln("Add ev entry:" + nv[i]);
+ env.put(pair[0], pair[1]);
+ }
+ }
+ }
+
+ /** .. and if successful: delete the task log */
+ void appendLogToJobLog(String status)
+ {
+ if(jobLog_ == null) {
+ return; // not using a common joblog
+ }
+ StreamUtil.exec("/bin/rm " + LOGNAME, log_);
+ // TODO socket-based aggregator (in JobTrackerInfoServer)
+ }
+
+
+ void startOutputThreads(OutputCollector output, Reporter reporter)
+ {
+ outputDone_ = false;
+ errorDone_ = false;
+ outThread_ = new MROutputThread(output, reporter);
+ outThread_.start();
+ errThread_ = new MRErrorThread(reporter);
+ errThread_.start();
+ }
+
+ void splitKeyVal(String line, UTF8 key, UTF8 val)
+ {
+ int pos;
+ if(keyCols_ == ALL_COLS) {
+ pos = -1;
+ } else {
+ pos = line.indexOf('\t');
+ }
+ if(pos == -1) {
+ key.set(line);
+ val.set("");
+ } else {
+ key.set(line.substring(0, pos));
+ val.set(line.substring(pos+1));
+ }
+ }
+
+ class MROutputThread extends Thread
+ {
+ MROutputThread(OutputCollector output, Reporter reporter)
+ {
+ setDaemon(true);
+ this.output = output;
+ this.reporter = reporter;
+ }
+ public void run() {
+ try {
+ try {
+ UTF8 EMPTY = new UTF8("");
+ UTF8 key = new UTF8();
+ UTF8 val = new UTF8();
+ // 3/4 Tool to Hadoop
+ while((answer = clientIn_.readLine()) != null) {
+ // 4/4 Hadoop out
+ if(optSideEffect_) {
+ sideEffectOut_.write(answer.getBytes());
+ sideEffectOut_.write('\n');
+ } else {
+ splitKeyVal(answer, key, val);
+ output.collect(key, val);
+ numRecWritten_++;
+ if(numRecWritten_ % 100 == 0) {
+ logprintln(numRecRead_+"/"+numRecWritten_);
+ logflush();
+ }
+ }
+ }
+ } catch(IOException io) {
+ io.printStackTrace(log_);
+ }
+ logprintln("MROutputThread done");
+ } finally {
+ outputDone_ = true;
+ synchronized(doneLock_) {
+ doneLock_.notifyAll();
+ }
+ }
+ }
+ OutputCollector output;
+ Reporter reporter;
+ String answer;
+ }
+
+ class MRErrorThread extends Thread
+ {
+ public MRErrorThread(Reporter reporter)
+ {
+ this.reporter = reporter;
+ setDaemon(true);
+ }
+ public void run()
+ {
+ String line;
+ try {
+ long num = 0;
+ int bucket = 100;
+ while((line=clientErr_.readLine()) != null) {
+ num++;
+ logprintln(line);
+ if(num < 10) {
+ String hline = "MRErr: " + line;
+ System.err.println(hline);
+ reporter.setStatus(hline);
+ }
+ }
+ } catch(IOException io) {
+ io.printStackTrace(log_);
+ } finally {
+ errorDone_ = true;
+ synchronized(doneLock_) {
+ doneLock_.notifyAll();
+ }
+ }
+ }
+ Reporter reporter;
+ }
+
+ public void mapRedFinished()
+ {
+ logprintln("mapRedFinished");
+ try {
+ if(!doPipe_) return;
+ try {
+ if(optSideEffect_) {
+ logprintln("closing " + sideEffectPath_);
+ sideEffectOut_.close();
+ logprintln("closed " + sideEffectPath_);
+ }
+ } catch(IOException io) {
+ io.printStackTrace();
+ }
+ try {
+ if(clientOut_ != null) {
+ clientOut_.close();
+ }
+ } catch(IOException io) {
+ }
+ if(outThread_ == null) {
+ // no input records: threads were never spawned
+ } else {
+ try {
+ while(!outputDone_ || !errorDone_) {
+ synchronized(doneLock_) {
+ doneLock_.wait();
+ }
+ }
+ } catch(InterruptedException ie) {
+ ie.printStackTrace();
+ }
+ }
+ sim.destroy();
+ } catch(RuntimeException e) {
+ e.printStackTrace(log_);
+ throw e;
+ }
+ }
+
+ void maybeLogRecord()
+ {
+ if(numRecRead_ >= nextRecReadLog_) {
+ String info = numRecInfo();
+ logprintln(info);
+ logflush();
+ System.err.println(info);
+ //nextRecReadLog_ *= 10;
+ nextRecReadLog_ += 100;
+ }
+ }
+
+ public String getContext()
+ {
+
+ String s = numRecInfo() + "\n";
+ s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
+ s += "LOGNAME=" + LOGNAME + "\n";
+ s += envline("HOST");
+ s += envline("USER");
+ s += envline("HADOOP_USER");
+ //s += envline("PWD"); // =/home/crawler/hadoop/trunk
+ s += "last Hadoop input: |" + mapredKey_ + "|\n";
+ s += "last tool output: |" + outThread_.answer + "|\n";
+ s += "Date: " + new Date() + "\n";
+ // s += envline("HADOOP_HOME");
+ // s += envline("REMOTE_HOST");
+ return s;
+ }
+
+ String envline(String var)
+ {
+ return var + "=" + StreamUtil.env().get(var) + "\n";
+ }
+
+ String numRecInfo()
+ {
+ long elapsed = (System.currentTimeMillis() - startTime_)/1000;
+ long total = numRecRead_+numRecWritten_+numRecSkipped_;
+ return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
+ + " in:" + safeDiv(numRecRead_, elapsed) + " [rec/s]"
+ + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
+ }
+ String safeDiv(long n, long d)
+ {
+ return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
+ }
+ String logFailure(Exception e)
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
+ logprintln(msg);
+ return msg;
+ }
+
+
+ long startTime_;
+ long numRecRead_ = 0;
+ long numRecWritten_ = 0;
+ long numRecSkipped_ = 0;
+ long nextRecReadLog_ = 1;
+
+ long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
+
+ int keyCols_;
+ final static int ALL_COLS = Integer.MAX_VALUE;
+
+ JobConf job_;
+
+ // generic MapRed parameters passed on by hadoopStreaming
+ String taskid_;
+ int reportPortPlusOne_;
+
+ boolean doPipe_;
+
+ Process sim;
+ Object doneLock_;
+ MROutputThread outThread_;
+ MRErrorThread errThread_;
+ boolean outputDone_;
+ boolean errorDone_;
+ DataOutputStream clientOut_;
+ DataInputStream clientErr_;
+ DataInputStream clientIn_;
+
+ String jobLog_;
+ // set in PipeMapper/PipeReducer subclasses
+ String mapredKey_;
+ int numExceptions_;
+
+ boolean optUseKey_ = true;
+
+ boolean optSideEffect_;
+ Path sideEffectPath_;
+ FSDataOutputStream sideEffectOut_;
+
+ String LOGNAME;
+ PrintStream log_;
+
+ /* curr. going to stderr so that it is preserved
+ { // instance initializer
+ try {
+ int id = (int)((System.currentTimeMillis()/2000) % 10);
+ String sid = id+ "." + StreamUtil.env().get("USER");
+ LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
+ log_ = new PrintStream(new FileOutputStream(LOGNAME));
+ logprintln(new java.util.Date());
+ logflush();
+ } catch(IOException io) {
+ System.err.println("LOGNAME=" + LOGNAME);
+ io.printStackTrace();
+ } finally {
+ if(log_ == null) {
+ log_ = System.err;
+ }
+ }
+ }
+ */
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Fri Jul 14 00:55:15 2006
@@ -1,121 +1,121 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-/** A generic Mapper bridge.
- * It delegates operations to an external program via stdin and stdout.
- * @author Michel Tourn
- */
-public class PipeMapper extends PipeMapRed implements Mapper
-{
-
- String getPipeCommand(JobConf job)
- {
- return job.get("stream.map.streamprocessor");
- }
-
- String getKeyColPropName()
- {
- return "mapKeyCols";
- }
-
- boolean getUseSideEffect()
- {
- String reduce = job_.get("stream.reduce.streamprocessor");
- if(StreamJob.REDUCE_NONE.equals(reduce)) {
- return true;
- }
- return false;
- }
-
-
- // Do NOT declare default constructor
- // (MapRed creates it reflectively)
-
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
- throws IOException
- {
- // init
- if(outThread_ == null) {
- startOutputThreads(output, reporter);
- }
- try {
- // 1/4 Hadoop in
- if(key instanceof BytesWritable) {
- mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
- } else {
- mapredKey_ = key.toString();
- }
- numRecRead_++;
-
- maybeLogRecord();
-
- // 2/4 Hadoop to Tool
- if(numExceptions_==0) {
- String sval;
- if(value instanceof BytesWritable) {
- sval = new String(((BytesWritable)value).get(), "UTF-8");
- } else {
- sval = value.toString();
- }
- if(optUseKey_) {
- clientOut_.writeBytes(mapredKey_);
- clientOut_.writeBytes("\t");
- }
- clientOut_.writeBytes(sval);
- clientOut_.writeBytes("\n");
- clientOut_.flush();
- } else {
- numRecSkipped_++;
- }
- } catch(IOException io) {
- numExceptions_++;
- if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
- // terminate with failure
- String msg = logFailure(io);
- appendLogToJobLog("failure");
- throw new IOException(msg);
- } else {
- // terminate with success:
- // swallow input records although the stream processor failed/closed
- }
- }
- }
-
- public void close()
- {
- appendLogToJobLog("success");
- mapRedFinished();
- }
-
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Mapper bridge.
+ * It delegates operations to an external program via stdin and stdout.
+ * @author Michel Tourn
+ */
+public class PipeMapper extends PipeMapRed implements Mapper
+{
+
+ String getPipeCommand(JobConf job)
+ {
+ return job.get("stream.map.streamprocessor");
+ }
+
+ String getKeyColPropName()
+ {
+ return "mapKeyCols";
+ }
+
+ boolean getUseSideEffect()
+ {
+ String reduce = job_.get("stream.reduce.streamprocessor");
+ if(StreamJob.REDUCE_NONE.equals(reduce)) {
+ return true;
+ }
+ return false;
+ }
+
+
+ // Do NOT declare default constructor
+ // (MapRed creates it reflectively)
+
+ public void map(WritableComparable key, Writable value,
+ OutputCollector output, Reporter reporter)
+ throws IOException
+ {
+ // init
+ if(outThread_ == null) {
+ startOutputThreads(output, reporter);
+ }
+ try {
+ // 1/4 Hadoop in
+ if(key instanceof BytesWritable) {
+ mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
+ } else {
+ mapredKey_ = key.toString();
+ }
+ numRecRead_++;
+
+ maybeLogRecord();
+
+ // 2/4 Hadoop to Tool
+ if(numExceptions_==0) {
+ String sval;
+ if(value instanceof BytesWritable) {
+ sval = new String(((BytesWritable)value).get(), "UTF-8");
+ } else {
+ sval = value.toString();
+ }
+ if(optUseKey_) {
+ clientOut_.writeBytes(mapredKey_);
+ clientOut_.writeBytes("\t");
+ }
+ clientOut_.writeBytes(sval);
+ clientOut_.writeBytes("\n");
+ clientOut_.flush();
+ } else {
+ numRecSkipped_++;
+ }
+ } catch(IOException io) {
+ numExceptions_++;
+ if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
+ // terminate with failure
+ String msg = logFailure(io);
+ appendLogToJobLog("failure");
+ throw new IOException(msg);
+ } else {
+ // terminate with success:
+ // swallow input records although the stream processor failed/closed
+ }
+ }
+ }
+
+ public void close()
+ {
+ appendLogToJobLog("success");
+ mapRedFinished();
+ }
+
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Fri Jul 14 00:55:15 2006
@@ -1,86 +1,86 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.OutputCollector;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-/** A generic Reducer bridge.
- * It delegates operations to an external program via stdin and stdout.
- * @author Michel Tourn
- */
-public class PipeReducer extends PipeMapRed implements Reducer
-{
-
- String getPipeCommand(JobConf job)
- {
- return job.get("stream.reduce.streamprocessor");
- }
-
- String getKeyColPropName()
- {
- return "reduceKeyCols";
- }
-
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
- throws IOException {
-
- // init
- if(doPipe_ && outThread_ == null) {
- startOutputThreads(output, reporter);
- }
- try {
- while (values.hasNext()) {
- Writable val = (Writable)values.next();
- numRecRead_++;
- maybeLogRecord();
- if(doPipe_) {
- clientOut_.writeBytes(key.toString());
- clientOut_.writeBytes("\t");
- clientOut_.writeBytes(val.toString());
- clientOut_.writeBytes("\n");
- clientOut_.flush();
- } else {
- // "identity reduce"
- output.collect(key, val);
- }
- }
- } catch(IOException io) {
- appendLogToJobLog("failure");
- throw new IOException(getContext() + io.getMessage());
- }
- }
-
- public void close()
- {
- appendLogToJobLog("success");
- mapRedFinished();
- }
-
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+/** A generic Reducer bridge.
+ * It delegates operations to an external program via stdin and stdout.
+ * @author Michel Tourn
+ */
+public class PipeReducer extends PipeMapRed implements Reducer
+{
+
+ String getPipeCommand(JobConf job)
+ {
+ return job.get("stream.reduce.streamprocessor");
+ }
+
+ String getKeyColPropName()
+ {
+ return "reduceKeyCols";
+ }
+
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+
+ // init
+ if(doPipe_ && outThread_ == null) {
+ startOutputThreads(output, reporter);
+ }
+ try {
+ while (values.hasNext()) {
+ Writable val = (Writable)values.next();
+ numRecRead_++;
+ maybeLogRecord();
+ if(doPipe_) {
+ clientOut_.writeBytes(key.toString());
+ clientOut_.writeBytes("\t");
+ clientOut_.writeBytes(val.toString());
+ clientOut_.writeBytes("\n");
+ clientOut_.flush();
+ } else {
+ // "identity reduce"
+ output.collect(key, val);
+ }
+ }
+ } catch(IOException io) {
+ appendLogToJobLog("failure");
+ throw new IOException(getContext() + io.getMessage());
+ }
+ }
+
+ public void close()
+ {
+ appendLogToJobLog("success");
+ mapRedFinished();
+ }
+
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Fri Jul 14 00:55:15 2006
@@ -1,173 +1,173 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hadoop.mapred.*;
-
-/** An input format that performs globbing on DFS paths and
- * selects a RecordReader based on a JobConf property.
- * @author Michel Tourn
- */
-public class StreamInputFormat extends InputFormatBase
-{
-
- // an InputFormat should be public with the synthetic public default constructor
- // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
-
- protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
-
- static {
- //LOG.setLevel(Level.FINE);
- }
-
- /** This implementation always returns true. */
- public boolean[] areValidInputDirectories(FileSystem fileSys,
- Path[] inputDirs
- ) throws IOException {
- boolean[] b = new boolean[inputDirs.length];
- for(int i=0; i < inputDirs.length; ++i) {
- b[i] = true;
- }
- return b;
- }
-
-
- protected Path[] listPaths(FileSystem fs, JobConf job)
- throws IOException
- {
- Path[] globs = job.getInputPaths();
- ArrayList list = new ArrayList();
- int dsup = globs.length;
- for(int d=0; d<dsup; d++) {
- String leafName = globs[d].getName();
- LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
- Path[] paths; Path dir;
- PathFilter filter = new GlobFilter(fs, leafName);
- dir = new Path(globs[d].getParent().toString());
- if(dir == null) dir = new Path(".");
- paths = fs.listPaths(dir, filter);
- list.addAll(Arrays.asList(paths));
- }
- return (Path[])list.toArray(new Path[]{});
- }
-
- class GlobFilter implements PathFilter
- {
- public GlobFilter(FileSystem fs, String glob)
- {
- fs_ = fs;
- pat_ = Pattern.compile(globToRegexp(glob));
- }
- String globToRegexp(String glob)
- {
- String re = glob;
- re = re.replaceAll("\\.", "\\\\.");
- re = re.replaceAll("\\+", "\\\\+");
- re = re.replaceAll("\\*", ".*");
- re = re.replaceAll("\\?", ".");
- LOG.info("globToRegexp: |" + glob + "| -> |" + re + "|");
- return re;
- }
-
- public boolean accept(Path pathname)
- {
- boolean acc = !fs_.isChecksumFile(pathname);
- if(acc) {
- acc = pat_.matcher(pathname.getName()).matches();
- }
- LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
- return acc;
- }
-
- Pattern pat_;
- FileSystem fs_;
- }
-
- public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
- JobConf job, Reporter reporter)
- throws IOException {
- LOG.info("getRecordReader start.....");
- reporter.setStatus(split.toString());
-
- final long start = split.getStart();
- final long end = start + split.getLength();
-
- String splitName = split.getFile() + ":" + start + "-" + end;
- final FSDataInputStream in = fs.open(split.getFile());
-
- // will open the file and seek to the start of the split
- // Factory dispatch based on available params..
- Class readerClass;
- String c = job.get("stream.recordreader.class");
- if(c == null) {
- readerClass = StreamLineRecordReader.class;
- } else {
- readerClass = StreamUtil.goodClassOrNull(c, null);
- if(readerClass == null) {
- throw new RuntimeException("Class not found: " + c);
- }
- }
-
- Constructor ctor;
- try {
- ctor = readerClass.getConstructor(new Class[]{
- FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
- } catch(NoSuchMethodException nsm) {
- throw new RuntimeException(nsm);
- }
-
-
- StreamBaseRecordReader reader;
- try {
- reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
- in, split, reporter, job, fs});
- } catch(Exception nsm) {
- throw new RuntimeException(nsm);
- }
-
- reader.init();
-
-
- if(reader instanceof StreamSequenceRecordReader) {
- // override k/v class types with types stored in SequenceFile
- StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
- job.setInputKeyClass(ss.rin_.getKeyClass());
- job.setInputValueClass(ss.rin_.getValueClass());
- }
-
-
- return reader;
- }
-
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import org.apache.hadoop.mapred.*;
+
+/** An input format that performs globbing on DFS paths and
+ * selects a RecordReader based on a JobConf property.
+ * @author Michel Tourn
+ */
+public class StreamInputFormat extends InputFormatBase
+{
+
+ // an InputFormat should be public with the synthetic public default constructor
+ // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
+
+ protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
+
+ static {
+ //LOG.setLevel(Level.FINE);
+ }
+
+ /** This implementation always returns true. */
+ public boolean[] areValidInputDirectories(FileSystem fileSys,
+ Path[] inputDirs
+ ) throws IOException {
+ boolean[] b = new boolean[inputDirs.length];
+ for(int i=0; i < inputDirs.length; ++i) {
+ b[i] = true;
+ }
+ return b;
+ }
+
+
+ protected Path[] listPaths(FileSystem fs, JobConf job)
+ throws IOException
+ {
+ Path[] globs = job.getInputPaths();
+ ArrayList list = new ArrayList();
+ int dsup = globs.length;
+ for(int d=0; d<dsup; d++) {
+ String leafName = globs[d].getName();
+ LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
+ Path[] paths; Path dir;
+ PathFilter filter = new GlobFilter(fs, leafName);
+ dir = new Path(globs[d].getParent().toString());
+ if(dir == null) dir = new Path(".");
+ paths = fs.listPaths(dir, filter);
+ list.addAll(Arrays.asList(paths));
+ }
+ return (Path[])list.toArray(new Path[]{});
+ }
+
+ class GlobFilter implements PathFilter
+ {
+ public GlobFilter(FileSystem fs, String glob)
+ {
+ fs_ = fs;
+ pat_ = Pattern.compile(globToRegexp(glob));
+ }
+ String globToRegexp(String glob)
+ {
+ String re = glob;
+ re = re.replaceAll("\\.", "\\\\.");
+ re = re.replaceAll("\\+", "\\\\+");
+ re = re.replaceAll("\\*", ".*");
+ re = re.replaceAll("\\?", ".");
+ LOG.info("globToRegexp: |" + glob + "| -> |" + re + "|");
+ return re;
+ }
+
+ public boolean accept(Path pathname)
+ {
+ boolean acc = !fs_.isChecksumFile(pathname);
+ if(acc) {
+ acc = pat_.matcher(pathname.getName()).matches();
+ }
+ LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
+ return acc;
+ }
+
+ Pattern pat_;
+ FileSystem fs_;
+ }
+
+ public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
+ JobConf job, Reporter reporter)
+ throws IOException {
+ LOG.info("getRecordReader start.....");
+ reporter.setStatus(split.toString());
+
+ final long start = split.getStart();
+ final long end = start + split.getLength();
+
+ String splitName = split.getFile() + ":" + start + "-" + end;
+ final FSDataInputStream in = fs.open(split.getFile());
+
+ // will open the file and seek to the start of the split
+ // Factory dispatch based on available params..
+ Class readerClass;
+ String c = job.get("stream.recordreader.class");
+ if(c == null) {
+ readerClass = StreamLineRecordReader.class;
+ } else {
+ readerClass = StreamUtil.goodClassOrNull(c, null);
+ if(readerClass == null) {
+ throw new RuntimeException("Class not found: " + c);
+ }
+ }
+
+ Constructor ctor;
+ try {
+ ctor = readerClass.getConstructor(new Class[]{
+ FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
+ } catch(NoSuchMethodException nsm) {
+ throw new RuntimeException(nsm);
+ }
+
+
+ StreamBaseRecordReader reader;
+ try {
+ reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
+ in, split, reporter, job, fs});
+ } catch(Exception nsm) {
+ throw new RuntimeException(nsm);
+ }
+
+ reader.init();
+
+
+ if(reader instanceof StreamSequenceRecordReader) {
+ // override k/v class types with types stored in SequenceFile
+ StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
+ job.setInputKeyClass(ss.rin_.getKeyClass());
+ job.setInputValueClass(ss.rin_.getValueClass());
+ }
+
+
+ return reader;
+ }
+
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Fri Jul 14 00:55:15 2006
@@ -34,30 +34,30 @@
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.RunningJob;
-/** All the client-side work happens here.
+/** All the client-side work happens here.
* (Jar packaging, MapRed job submission and monitoring)
* @author Michel Tourn
*/
public class StreamJob
{
- protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
-
+ protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
+
final static String REDUCE_NONE = "NONE";
-
+
public StreamJob(String[] argv, boolean mayExit)
{
argv_ = argv;
- mayExit_ = mayExit;
+ mayExit_ = mayExit;
}
-
+
public void go() throws IOException
{
init();
-
+
preProcessArgs();
parseArgv();
postProcessArgs();
-
+
setJobConf();
submitAndMonitorJob();
}
@@ -70,13 +70,13 @@
throw new RuntimeException(io);
}
}
-
+
void preProcessArgs()
{
verbose_ = false;
addTaskEnvironment_ = "";
}
-
+
void postProcessArgs() throws IOException
{
if(cluster_ == null) {
@@ -94,18 +94,19 @@
Iterator it = packageFiles_.iterator();
while(it.hasNext()) {
- File f = new File((String)it.next());
+ File f = new File((String)it.next());
if(f.isFile()) {
shippedCanonFiles_.add(f.getCanonicalPath());
}
}
msg("shippedCanonFiles_=" + shippedCanonFiles_);
-
+
// careful with class names..
mapCmd_ = unqualifyIfLocalPath(mapCmd_);
- redCmd_ = unqualifyIfLocalPath(redCmd_);
+ comCmd_ = unqualifyIfLocalPath(comCmd_);
+ redCmd_ = unqualifyIfLocalPath(redCmd_);
}
-
+
void validateNameEqValue(String neqv)
{
String[] nv = neqv.split("=", 2);
@@ -114,11 +115,11 @@
}
msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
}
-
+
String unqualifyIfLocalPath(String cmd) throws IOException
{
if(cmd == null) {
- //
+ //
} else {
String prog = cmd;
String args = "";
@@ -131,8 +132,8 @@
boolean shipped = shippedCanonFiles_.contains(progCanon);
msg("shipped: " + shipped + " " + progCanon);
if(shipped) {
- // Change path to simple filename.
- // That way when PipeMapRed calls Runtime.exec(),
+ // Change path to simple filename.
+ // That way when PipeMapRed calls Runtime.exec(),
// it will look for the excutable in Task's working dir.
// And this is where TaskRunner unjars our job jar.
prog = new File(prog).getName();
@@ -146,25 +147,25 @@
msg("cmd=" + cmd);
return cmd;
}
-
+
String getHadoopAliasConfFile()
{
return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
}
-
-
+
+
void parseArgv()
{
if(argv_.length==0) {
exitUsage(false);
}
- int i=0;
+ int i=0;
while(i < argv_.length) {
String s;
if(argv_[i].equals("-verbose")) {
- verbose_ = true;
+ verbose_ = true;
} else if(argv_[i].equals("-info")) {
- detailedUsage_ = true;
+ detailedUsage_ = true;
} else if(argv_[i].equals("-debug")) {
debug_++;
} else if((s = optionArg(argv_, i, "-input", false)) != null) {
@@ -176,6 +177,9 @@
} else if((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
i++;
mapCmd_ = s;
+ } else if((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
+ i++;
+ comCmd_ = s;
} else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
i++;
redCmd_ = s;
@@ -218,7 +222,7 @@
exitUsage(true);
}
}
-
+
String optionArg(String[] args, int index, String arg, boolean argSet)
{
if(index >= args.length || ! args[index].equals(arg)) {
@@ -229,10 +233,10 @@
}
if(index >= args.length-1) {
throw new IllegalArgumentException("Expected argument after option " + args[index]);
- }
+ }
return args[index+1];
}
-
+
protected void msg(String msg)
{
if(verbose_) {
@@ -242,15 +246,15 @@
public void exitUsage(boolean detailed)
{
- // 1 2 3 4 5 6 7
+ // 1 2 3 4 5 6 7
//1234567890123456789012345678901234567890123456789012345678901234567890123456789
System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar [options]");
System.out.println("Options:");
System.out.println(" -input <path> DFS input file(s) for the Map step");
System.out.println(" -output <path> DFS output directory for the Reduce step");
System.out.println(" -mapper <cmd> The streaming command to run");
- System.out.println(" -combiner <cmd> Not implemented. But you can pipe the mapper output");
- System.out.println(" -reducer <cmd> The streaming command to run.");
+ System.out.println(" -combiner <cmd> The streaming command to run");
+ System.out.println(" -reducer <cmd> The streaming command to run");
System.out.println(" -file <file> File/dir to be shipped in the Job jar file");
System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
System.out.println(" -config <file> Optional. One or more paths to xml config files");
@@ -261,7 +265,7 @@
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
System.out.println(" -verbose");
System.out.println();
- if(!detailed) {
+ if(!detailed) {
System.out.println("For more details about these options:");
System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
fail("");
@@ -307,7 +311,7 @@
System.out.println(" Input files are all the daily logs for days in month 2006-04");
fail("");
}
-
+
public void fail(String message)
{
if(mayExit_) {
@@ -319,8 +323,8 @@
}
// --------------------------------------------
-
-
+
+
protected String getHadoopClientHome()
{
String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
@@ -342,11 +346,11 @@
}
return local;
}
- protected String getClusterNick()
- {
+ protected String getClusterNick()
+ {
return cluster_;
}
-
+
/** @return path to the created Jar file or null if no files are necessary.
*/
protected String packageJobJar() throws IOException
@@ -362,12 +366,12 @@
msg("Found runtime classes in: " + runtimeClasses);
}
if(isLocalHadoop()) {
- // don't package class files (they might get unpackaged in "." and then
+ // don't package class files (they might get unpackaged in "." and then
// hide the intended CLASSPATH entry)
- // we still package everything else (so that scripts and executable are found in
+ // we still package everything else (so that scripts and executable are found in
// Task workdir like distributed Hadoop)
} else {
- if(new File(runtimeClasses).isDirectory()) {
+ if(new File(runtimeClasses).isDirectory()) {
packageFiles_.add(runtimeClasses);
} else {
unjarFiles.add(runtimeClasses);
@@ -377,7 +381,7 @@
return null;
}
File jobJar = File.createTempFile("streamjob", ".jar");
- System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);
+ System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);
if(debug_ == 0) {
jobJar.deleteOnExit();
}
@@ -389,7 +393,7 @@
builder.merge(packageFiles_, unjarFiles, jobJarName);
return jobJarName;
}
-
+
protected void setJobConf() throws IOException
{
msg("hadoopAliasConf_ = " + hadoopAliasConf_);
@@ -403,29 +407,29 @@
while(it.hasNext()) {
String pathName = (String)it.next();
config_.addFinalResource(new Path(pathName));
- }
+ }
// general MapRed job properties
jobConf_ = new JobConf(config_);
for(int i=0; i<inputGlobs_.size(); i++) {
jobConf_.addInputDir(new File((String)inputGlobs_.get(i)));
}
-
+
jobConf_.setInputFormat(StreamInputFormat.class);
- // for SequenceFile, input classes may be overriden in getRecordReader
+ // for SequenceFile, input classes may be overriden in getRecordReader
jobConf_.setInputKeyClass(UTF8.class);
jobConf_.setInputValueClass(UTF8.class);
-
+
jobConf_.setOutputKeyClass(UTF8.class);
jobConf_.setOutputValueClass(UTF8.class);
//jobConf_.setCombinerClass();
jobConf_.setOutputDir(new File(output_));
jobConf_.setOutputFormat(StreamOutputFormat.class);
-
+
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
-
+
String defaultPackage = this.getClass().getPackage().getName();
-
+
Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
if(c != null) {
jobConf_.setMapperClass(c);
@@ -434,6 +438,16 @@
jobConf_.set("stream.map.streamprocessor", mapCmd_);
}
+ if(comCmd_ != null) {
+ c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
+ if(c != null) {
+ jobConf_.setCombinerClass(c);
+ } else {
+ jobConf_.setCombinerClass(PipeCombiner.class);
+ jobConf_.set("stream.combine.streamprocessor", comCmd_);
+ }
+ }
+
if(redCmd_ != null) {
c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
if(c != null) {
@@ -443,13 +457,13 @@
jobConf_.set("stream.reduce.streamprocessor", redCmd_);
}
}
-
+
if(inReaderSpec_ != null) {
String[] args = inReaderSpec_.split(",");
String readerClass = args[0];
// this argument can only be a Java class
c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
- if(c != null) {
+ if(c != null) {
jobConf_.set("stream.recordreader.class", c.getName());
} else {
fail("-inputreader: class not found: " + readerClass);
@@ -461,13 +475,13 @@
jobConf_.set(k, v);
}
}
-
+
jar_ = packageJobJar();
if(jar_ != null) {
jobConf_.setJar(jar_);
}
- // last, allow user to override anything
+ // last, allow user to override anything
// (although typically used with properties we didn't touch)
it = userJobConfProps_.iterator();
while(it.hasNext()) {
@@ -475,39 +489,39 @@
String[] nv = prop.split("=", 2);
msg("xxxJobConf: set(" + nv[0] + ", " + nv[1]+")");
jobConf_.set(nv[0], nv[1]);
- }
+ }
msg("submitting to jobconf: " + getJobTrackerHostPort());
}
-
+
protected String getJobTrackerHostPort()
{
return jobConf_.get("mapred.job.tracker");
}
-
+
protected void jobInfo()
- {
+ {
if(isLocalHadoop()) {
- LOG.info("Job running in-process (local Hadoop)");
+ LOG.info("Job running in-process (local Hadoop)");
} else {
String hp = getJobTrackerHostPort();
- LOG.info("To kill this job, run:");
+ LOG.info("To kill this job, run:");
LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill " + jobId_);
//LOG.info("Job file: " + running_.getJobFile() );
LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
}
}
-
+
// Based on JobClient
public void submitAndMonitorJob() throws IOException {
-
+
if(jar_ != null && isLocalHadoop()) {
// getAbs became required when shell and subvm have different working dirs...
File wd = new File(".").getAbsoluteFile();
StreamUtil.unJar(new File(jar_), wd);
}
-
- // if jobConf_ changes must recreate a JobClient
- jc_ = new JobClient(jobConf_);
+
+ // if jobConf_ changes must recreate a JobClient
+ jc_ = new JobClient(jobConf_);
boolean error = true;
running_ = null;
String lastReport = null;
@@ -515,8 +529,8 @@
running_ = jc_.submitJob(jobConf_);
jobId_ = running_.getJobID();
- LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));
- LOG.info("Running job: " + jobId_);
+ LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));
+ LOG.info("Running job: " + jobId_);
jobInfo();
while (!running_.isComplete()) {
@@ -548,7 +562,7 @@
jc_.close();
}
}
-
+
protected boolean mayExit_;
protected String[] argv_;
@@ -557,7 +571,7 @@
protected int debug_;
protected Environment env_;
-
+
protected String jar_;
protected boolean localHadoop_;
protected Configuration config_;
@@ -567,27 +581,28 @@
// command-line arguments
protected ArrayList inputGlobs_ = new ArrayList(); // <String>
protected ArrayList packageFiles_ = new ArrayList(); // <String>
- protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>
+ protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>
protected ArrayList userJobConfProps_ = new ArrayList(); // <String>
protected String output_;
protected String mapCmd_;
+ protected String comCmd_;
protected String redCmd_;
protected String cluster_;
protected ArrayList configPath_ = new ArrayList(); // <String>
protected String hadoopAliasConf_;
protected String inReaderSpec_;
-
+
// Use to communicate config to the external processes (ex env.var.HADOOP_USER)
// encoding "a=b c=d"
protected String addTaskEnvironment_;
-
+
protected boolean outputSingleNode_;
protected long minRecWrittenToEnableSkip_;
-
+
protected RunningJob running_;
protected String jobId_;
-
-
+
+
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Fri Jul 14 00:55:15 2006
@@ -1,73 +1,73 @@
-/**
- * Copyright 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.streaming;
-
-import java.io.IOException;
-import java.io.File;
-
-import org.apache.hadoop.mapred.*;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hadoop.util.Progressable;
-
-/** Similar to org.apache.hadoop.mapred.TextOutputFormat,
- * but delimits key and value with a TAB.
- * @author Michel Tourn
- */
-public class StreamOutputFormat implements OutputFormat {
-
- public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
- String name, Progressable progr) throws IOException {
-
- File file = new File(job.getOutputDir(), name);
-
- final FSDataOutputStream out = fs.create(file);
-
- return new RecordWriter() {
- public synchronized void write(WritableComparable key, Writable value)
- throws IOException {
- out.write(key.toString().getBytes("UTF-8"));
- out.writeByte('\t');
- out.write(value.toString().getBytes("UTF-8"));
- out.writeByte('\n');
- }
- public synchronized void close(Reporter reporter) throws IOException {
- out.close();
- }
- };
- }
-
-
- /** Check whether the output specification for a job is appropriate. Called
- * when a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.
- *
- * @param job the job whose output will be written
- * @throws IOException when output should not be attempted
- */
- public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
- {
- // allow existing data (for app-level restartability)
- }
-
-}
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.mapred.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.util.Progressable;
+
+/** Similar to org.apache.hadoop.mapred.TextOutputFormat,
+ * but delimits key and value with a TAB.
+ * @author Michel Tourn
+ */
+public class StreamOutputFormat implements OutputFormat {
+
+ public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+ String name, Progressable progr) throws IOException {
+
+ File file = new File(job.getOutputDir(), name);
+
+ final FSDataOutputStream out = fs.create(file);
+
+ return new RecordWriter() {
+ public synchronized void write(WritableComparable key, Writable value)
+ throws IOException {
+ out.write(key.toString().getBytes("UTF-8"));
+ out.writeByte('\t');
+ out.write(value.toString().getBytes("UTF-8"));
+ out.writeByte('\n');
+ }
+ public synchronized void close(Reporter reporter) throws IOException {
+ out.close();
+ }
+ };
+ }
+
+
+ /** Check whether the output specification for a job is appropriate. Called
+ * when a job is submitted. Typically checks that it does not already exist,
+ * throwing an exception when it already exists, so that output is not
+ * overwritten.
+ *
+ * @param job the job whose output will be written
+ * @throws IOException when output should not be attempted
+ */
+ public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
+ {
+ // allow existing data (for app-level restartability)
+ }
+
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=421829&r1=421828&r2=421829&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Fri Jul 14 00:55:15 2006
@@ -18,13 +18,13 @@
import junit.framework.TestCase;
import java.io.*;
+import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* This class tests hadoopStreaming in MapReduce local mode.
- * It requires the Unix utilities tr and uniq.
*/
public class TestStreaming extends TestCase
{
@@ -34,9 +34,12 @@
String INPUT_FILE = "input.txt";
String OUTPUT_DIR = "out";
String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
- String map = "/usr/bin/tr . \\n"; // split words into lines. Careful with spaces in args
- String reduce = "/usr/bin/uniq";
- String outputExpect = "are\t\nblue\t\nbunnies\t\npink\t\nred\t\nroses\t\nviolets\t\n";
+ // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
+ String map = makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
+ // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
+ String combine = makeJavaCommand(UniqApp.class, new String[]{"C"});
+ String reduce = makeJavaCommand(UniqApp.class, new String[]{"R"});
+ String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
StreamJob job;
@@ -60,6 +63,7 @@
}
System.out.println("test.build.data=" + antTestDir);
}
+
void createInput() throws IOException
{
String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked vm
@@ -68,6 +72,37 @@
out.close();
}
+ public String makeJavaCommand(Class main, String[] argv)
+ {
+ ArrayList vargs = new ArrayList();
+ File javaHomeBin = new File(System.getProperty("java.home"), "bin");
+ File jvm = new File(javaHomeBin, "java");
+ vargs.add(jvm.toString());
+ // copy parent classpath
+ vargs.add("-classpath");
+ vargs.add(System.getProperty("java.class.path"));
+
+ // Add main class and its arguments
+ vargs.add(main.getName());
+ for(int i=0; i<argv.length; i++) {
+ vargs.add(argv[i]);
+ }
+ return collate(vargs, " ");
+ }
+
+ String collate(ArrayList args, String sep)
+ {
+ StringBuffer buf = new StringBuffer();
+ Iterator it = args.iterator();
+ while(it.hasNext()) {
+ if(buf.length() > 0) {
+ buf.append(" ");
+ }
+ buf.append(it.next());
+ }
+ return buf.toString();
+ }
+
public void testCommandLine()
{
try {
@@ -80,11 +115,11 @@
"-input", INPUT_FILE,
"-output", OUTPUT_DIR,
"-mapper", map,
+ "-combiner", combine,
"-reducer", reduce,
/*"-debug",*/
"-verbose"
};
-
job = new StreamJob(argv, mayExit);
job.go();
File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile();
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=421829&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Fri Jul 14 00:55:15 2006
@@ -0,0 +1,64 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+/** A minimal Java implementation of /usr/bin/tr.
+ Used to test the usage of external applications without adding
+ platform-specific dependencies.
+*/
+public class TrApp
+{
+
+ public TrApp(char find, char replace)
+ {
+ this.find = find;
+ this.replace = replace;
+ }
+
+ public void go() throws IOException
+ {
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ String out = line.replace(find, replace);
+ System.out.println(out);
+ }
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ args[0] = CUnescape(args[0]);
+ args[1] = CUnescape(args[1]);
+ TrApp app = new TrApp(args[0].charAt(0), args[1].charAt(0));
+ app.go();
+ }
+
+ public static String CUnescape(String s)
+ {
+ if(s.equals("\\n")) {
+ return "\n";
+ } else {
+ return s;
+ }
+ }
+ char find;
+ char replace;
+
+}