You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/19 01:08:08 UTC
svn commit: r447626 [2/3] - in /lucene/hadoop/trunk: ./ src/contrib/
src/contrib/streaming/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Mon Sep 18 16:08:06 2006
@@ -30,60 +30,55 @@
* It delegates operations to an external program via stdin and stdout.
* @author Michel Tourn
*/
-public class PipeMapper extends PipeMapRed implements Mapper
-{
+public class PipeMapper extends PipeMapRed implements Mapper {
- String getPipeCommand(JobConf job)
- {
+ String getPipeCommand(JobConf job) {
return job.get("stream.map.streamprocessor");
}
- String getKeyColPropName()
- {
+ String getKeyColPropName() {
return "mapKeyCols";
- }
+ }
- boolean getUseSideEffect()
- {
- String reduce = job_.get("stream.reduce.streamprocessor");
- if(StreamJob.REDUCE_NONE.equals(reduce)) {
- return true;
- }
- return false;
+ boolean getUseSideEffect() {
+ return StreamUtil.getUseMapSideEffect(job_);
+ }
+
+ boolean getDoPipe() {
+ return true;
}
-
// Do NOT declare default constructor
// (MapRed creates it reflectively)
- public void map(WritableComparable key, Writable value,
- OutputCollector output, Reporter reporter)
- throws IOException
- {
+ public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
// init
- if(outThread_ == null) {
+ if (outThread_ == null) {
startOutputThreads(output, reporter);
}
try {
// 1/4 Hadoop in
numRecRead_++;
maybeLogRecord();
+ if (debugFailDuring_ && numRecRead_ == 3) {
+ throw new IOException("debugFailDuring_");
+ }
// 2/4 Hadoop to Tool
- if(numExceptions_==0) {
- if(optUseKey_) {
- write(key);
- clientOut_.write('\t');
- }
- write(value);
- clientOut_.write('\n');
- clientOut_.flush();
+ if (numExceptions_ == 0) {
+ if (optUseKey_) {
+ write(key);
+ clientOut_.write('\t');
+ }
+ write(value);
+ clientOut_.write('\n');
+ clientOut_.flush();
} else {
- numRecSkipped_++;
+ numRecSkipped_++;
}
- } catch(IOException io) {
+ } catch (IOException io) {
numExceptions_++;
- if(numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
+ if (numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
// terminate with failure
String msg = logFailure(io);
appendLogToJobLog("failure");
@@ -95,11 +90,10 @@
}
}
}
-
- public void close()
- {
+
+ public void close() {
appendLogToJobLog("success");
mapRedFinished();
}
-
+
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Mon Sep 18 16:08:06 2006
@@ -31,33 +31,35 @@
* It delegates operations to an external program via stdin and stdout.
* @author Michel Tourn
*/
-public class PipeReducer extends PipeMapRed implements Reducer
-{
+public class PipeReducer extends PipeMapRed implements Reducer {
- String getPipeCommand(JobConf job)
- {
+ String getPipeCommand(JobConf job) {
return job.get("stream.reduce.streamprocessor");
}
- String getKeyColPropName()
- {
+ boolean getDoPipe() {
+ String argv = getPipeCommand(job_);
+ // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
+ return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
+ }
+
+ String getKeyColPropName() {
return "reduceKeyCols";
- }
-
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
- throws IOException {
+ }
+
+ public void reduce(WritableComparable key, Iterator values, OutputCollector output,
+ Reporter reporter) throws IOException {
// init
- if(doPipe_ && outThread_ == null) {
+ if (doPipe_ && outThread_ == null) {
startOutputThreads(output, reporter);
}
try {
while (values.hasNext()) {
- Writable val = (Writable)values.next();
+ Writable val = (Writable) values.next();
numRecRead_++;
maybeLogRecord();
- if(doPipe_) {
+ if (doPipe_) {
write(key);
clientOut_.write('\t');
write(val);
@@ -68,15 +70,14 @@
output.collect(key, val);
}
}
- } catch(IOException io) {
+ } catch (IOException io) {
appendLogToJobLog("failure");
mapRedFinished();
- throw new IOException(getContext() + io.getMessage());
+ throw new IOException(getContext() + io.getMessage());
}
}
- public void close()
- {
+ public void close() {
appendLogToJobLog("success");
mapRedFinished();
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Mon Sep 18 16:08:06 2006
@@ -30,7 +30,6 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.commons.logging.*;
-
/**
* Shared functionality for hadoopStreaming formats.
* A custom reader can be defined to be a RecordReader with the constructor below
@@ -39,18 +38,15 @@
* @see StreamXmlRecordReader
* @author Michel Tourn
*/
-public abstract class StreamBaseRecordReader implements RecordReader
-{
-
+public abstract class StreamBaseRecordReader implements RecordReader {
+
protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName());
-
+
// custom JobConf properties for this class are prefixed with this namespace
final static String CONF_NS = "stream.recordreader.";
- public StreamBaseRecordReader(
- FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
- throws IOException
- {
+ public StreamBaseRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+ JobConf job, FileSystem fs) throws IOException {
in_ = in;
split_ = split;
start_ = split_.getStart();
@@ -60,99 +56,90 @@
reporter_ = reporter;
job_ = job;
fs_ = fs;
-
+
statusMaxRecordChars_ = job_.getInt(CONF_NS + "statuschars", 200);
}
/// RecordReader API
-
+
/** Read a record. Implementation should call numRecStats at the end
- */
+ */
public abstract boolean next(Writable key, Writable value) throws IOException;
/** This implementation always returns true. */
- public boolean[] areValidInputDirectories(FileSystem fileSys,
- Path[] inputDirs) throws IOException
- {
+ public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
int n = inputDirs.length;
boolean[] b = new boolean[n];
- for(int i=0; i<n; i++) {
+ for (int i = 0; i < n; i++) {
b[i] = true;
}
return b;
}
/** Returns the current position in the input. */
- public synchronized long getPos() throws IOException
- {
- return in_.getPos();
+ public synchronized long getPos() throws IOException {
+ return in_.getPos();
}
/** Close this to future operations.*/
- public synchronized void close() throws IOException
- {
- in_.close();
+ public synchronized void close() throws IOException {
+ in_.close();
}
public WritableComparable createKey() {
return new Text();
}
-
+
public Writable createValue() {
return new Text();
}
-
+
/// StreamBaseRecordReader API
- public void init() throws IOException
- {
- LOG.info("StreamBaseRecordReader.init: " +
- " start_=" + start_ + " end_=" + end_ + " length_=" + length_ +
- " start_ > in_.getPos() ="
- + (start_ > in_.getPos()) + " " + start_
- + " > " + in_.getPos() );
+ public void init() throws IOException {
+ LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
+ + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
+ + in_.getPos());
if (start_ > in_.getPos()) {
in_.seek(start_);
- }
+ }
seekNextRecordBoundary();
}
-
+
/** Implementation should seek forward in_ to the first byte of the next record.
* The initial byte offset in the stream is arbitrary.
*/
public abstract void seekNextRecordBoundary() throws IOException;
-
-
- void numRecStats(byte[] record, int start, int len) throws IOException
- {
- numRec_++;
- if(numRec_ == nextStatusRec_) {
- String recordStr = new String(record, start,
- Math.min(len, statusMaxRecordChars_), "UTF-8");
- nextStatusRec_ +=100;//*= 10;
+
+ void numRecStats(byte[] record, int start, int len) throws IOException {
+ numRec_++;
+ if (numRec_ == nextStatusRec_) {
+ String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars_), "UTF-8");
+ nextStatusRec_ += 100;//*= 10;
String status = getStatus(recordStr);
LOG.info(status);
reporter_.setStatus(status);
}
}
- long lastMem =0;
- String getStatus(CharSequence record)
- {
+ long lastMem = 0;
+
+ String getStatus(CharSequence record) {
long pos = -1;
- try {
+ try {
pos = getPos();
- } catch(IOException io) {
+ } catch (IOException io) {
}
String recStr;
- if(record.length() > statusMaxRecordChars_) {
- recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
+ if (record.length() > statusMaxRecordChars_) {
+ recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
} else {
- recStr = record.toString();
+ recStr = record.toString();
}
- String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+" + split_.getLength();
- String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos
- + " " + unqualSplit + " Processing record=" + recStr;
+ String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+"
+ + split_.getLength();
+ String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " " + unqualSplit
+ + " Processing record=" + recStr;
status += " " + splitName_;
return status;
}
@@ -169,5 +156,5 @@
int numRec_ = 0;
int nextStatusRec_ = 1;
int statusMaxRecordChars_;
-
+
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Sep 18 16:08:06 2006
@@ -39,49 +39,38 @@
* selects a RecordReader based on a JobConf property.
* @author Michel Tourn
*/
-public class StreamInputFormat extends InputFormatBase
-{
+public class StreamInputFormat extends InputFormatBase {
// an InputFormat should be public with the synthetic public default constructor
// JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
- static {
- //LOG.setLevel(Level.FINE);
- }
-
/** This implementation always returns true. */
- public boolean[] areValidInputDirectories(FileSystem fileSys,
- Path[] inputDirs
- ) throws IOException {
+ public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
boolean[] b = new boolean[inputDirs.length];
- for(int i=0; i < inputDirs.length; ++i) {
+ for (int i = 0; i < inputDirs.length; ++i) {
b[i] = true;
}
return b;
}
- static boolean isGzippedInput(JobConf job)
- {
+ static boolean isGzippedInput(JobConf job) {
String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
return "gzip".equals(val);
}
- public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)
- throws IOException {
-
- if(isGzippedInput(job)) {
+ public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
+
+ if (isGzippedInput(job)) {
return getFullFileSplits(fs, job);
} else {
return super.getSplits(fs, job, numSplits);
- }
+ }
}
-
+
/** For the compressed-files case: override InputFormatBase to produce one split. */
- FileSplit[] getFullFileSplits(FileSystem fs, JobConf job)
- throws IOException
- {
+ FileSplit[] getFullFileSplits(FileSystem fs, JobConf job) throws IOException {
Path[] files = listPaths(fs, job);
int numSplits = files.length;
ArrayList splits = new ArrayList(numSplits);
@@ -90,37 +79,35 @@
long splitSize = fs.getLength(file);
splits.add(new FileSplit(file, 0, splitSize));
}
- return (FileSplit[])splits.toArray(new FileSplit[splits.size()]);
+ return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
}
- protected Path[] listPaths(FileSystem fs, JobConf job)
- throws IOException
- {
+ protected Path[] listPaths(FileSystem fs, JobConf job) throws IOException {
Path[] globs = job.getInputPaths();
ArrayList list = new ArrayList();
int dsup = globs.length;
- for(int d=0; d<dsup; d++) {
+ for (int d = 0; d < dsup; d++) {
String leafName = globs[d].getName();
LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
- Path[] paths; Path dir;
+ Path[] paths;
+ Path dir;
PathFilter filter = new GlobFilter(fs, leafName);
dir = new Path(globs[d].getParent().toString());
- if(dir == null) dir = new Path(".");
+ if (dir == null) dir = new Path(".");
paths = fs.listPaths(dir, filter);
list.addAll(Arrays.asList(paths));
}
- return (Path[])list.toArray(new Path[]{});
+ return (Path[]) list.toArray(new Path[] {});
}
- class GlobFilter implements PathFilter
- {
- public GlobFilter(FileSystem fs, String glob)
- {
+ class GlobFilter implements PathFilter {
+
+ public GlobFilter(FileSystem fs, String glob) {
fs_ = fs;
pat_ = Pattern.compile(globToRegexp(glob));
}
- String globToRegexp(String glob)
- {
+
+ String globToRegexp(String glob) {
String re = glob;
re = re.replaceAll("\\.", "\\\\.");
re = re.replaceAll("\\+", "\\\\+");
@@ -130,11 +117,10 @@
return re;
}
- public boolean accept(Path pathname)
- {
+ public boolean accept(Path pathname) {
boolean acc = !fs_.isChecksumFile(pathname);
- if(acc) {
- acc = pat_.matcher(pathname.getName()).matches();
+ if (acc) {
+ acc = pat_.matcher(pathname.getName()).matches();
}
LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
return acc;
@@ -144,10 +130,9 @@
FileSystem fs_;
}
- public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
- JobConf job, Reporter reporter)
- throws IOException {
- LOG.info("getRecordReader start.....");
+ public RecordReader getRecordReader(FileSystem fs, final FileSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ LOG.info("getRecordReader start.....split=" + split);
reporter.setStatus(split.toString());
final long start = split.getStart();
@@ -160,45 +145,41 @@
// Factory dispatch based on available params..
Class readerClass;
String c = job.get("stream.recordreader.class");
- if(c == null) {
+ if (c == null) {
readerClass = StreamLineRecordReader.class;
} else {
readerClass = StreamUtil.goodClassOrNull(c, null);
- if(readerClass == null) {
+ if (readerClass == null) {
throw new RuntimeException("Class not found: " + c);
}
}
Constructor ctor;
try {
- ctor = readerClass.getConstructor(new Class[]{
- FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
- } catch(NoSuchMethodException nsm) {
+ ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, FileSplit.class,
+ Reporter.class, JobConf.class, FileSystem.class });
+ } catch (NoSuchMethodException nsm) {
throw new RuntimeException(nsm);
}
-
StreamBaseRecordReader reader;
try {
- reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
- in, split, reporter, job, fs});
- } catch(Exception nsm) {
+ reader = (StreamBaseRecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
+ fs });
+ } catch (Exception nsm) {
throw new RuntimeException(nsm);
}
reader.init();
-
- if(reader instanceof StreamSequenceRecordReader) {
+ if (reader instanceof StreamSequenceRecordReader) {
// override k/v class types with types stored in SequenceFile
- StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
+ StreamSequenceRecordReader ss = (StreamSequenceRecordReader) reader;
job.setInputKeyClass(ss.rin_.getKeyClass());
job.setInputValueClass(ss.rin_.getValueClass());
}
-
return reader;
}
-
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Sep 18 16:08:06 2006
@@ -18,11 +18,14 @@
import java.io.File;
import java.io.IOException;
-import java.net.URL;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.commons.logging.*;
@@ -39,20 +42,19 @@
* (Jar packaging, MapRed job submission and monitoring)
* @author Michel Tourn
*/
-public class StreamJob
-{
- protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
+public class StreamJob {
+ protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
final static String REDUCE_NONE = "NONE";
- public StreamJob(String[] argv, boolean mayExit)
- {
+ private boolean reducerNone_;
+
+ public StreamJob(String[] argv, boolean mayExit) {
argv_ = argv;
mayExit_ = mayExit;
}
- public void go() throws IOException
- {
+ public void go() throws IOException {
init();
preProcessArgs();
@@ -63,40 +65,37 @@
submitAndMonitorJob();
}
- protected void init()
- {
- try {
- env_ = new Environment();
- } catch(IOException io) {
- throw new RuntimeException(io);
- }
+ protected void init() {
+ try {
+ env_ = new Environment();
+ } catch (IOException io) {
+ throw new RuntimeException(io);
+ }
}
- void preProcessArgs()
- {
+ void preProcessArgs() {
verbose_ = false;
addTaskEnvironment_ = "";
}
- void postProcessArgs() throws IOException
- {
- if(cluster_ == null) {
- // hadoop-default.xml is standard, hadoop-local.xml is not.
- cluster_ = "default";
+ void postProcessArgs() throws IOException {
+ if (cluster_ == null) {
+ // hadoop-default.xml is standard, hadoop-local.xml is not.
+ cluster_ = "default";
}
hadoopAliasConf_ = "hadoop-" + getClusterNick() + ".xml";
- if(inputGlobs_.size() == 0) {
- fail("Required argument: -input <name>");
+ if (inputSpecs_.size() == 0) {
+ fail("Required argument: -input <name>");
}
- if(output_ == null) {
- fail("Required argument: -output ");
+ if (output_ == null) {
+ fail("Required argument: -output ");
}
msg("addTaskEnvironment=" + addTaskEnvironment_);
Iterator it = packageFiles_.iterator();
- while(it.hasNext()) {
- File f = new File((String)it.next());
- if(f.isFile()) {
+ while (it.hasNext()) {
+ File f = new File((String) it.next());
+ if (f.isFile()) {
shippedCanonFiles_.add(f.getCanonicalPath());
}
}
@@ -108,37 +107,40 @@
redCmd_ = unqualifyIfLocalPath(redCmd_);
}
- void validateNameEqValue(String neqv)
- {
+ void validateNameEqValue(String neqv) {
String[] nv = neqv.split("=", 2);
- if(nv.length < 2) {
- fail("Invalid name=value spec: " + neqv);
+ if (nv.length < 2) {
+ fail("Invalid name=value spec: " + neqv);
}
msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]);
}
- String unqualifyIfLocalPath(String cmd) throws IOException
- {
- if(cmd == null) {
+ String unqualifyIfLocalPath(String cmd) throws IOException {
+ if (cmd == null) {
//
} else {
String prog = cmd;
String args = "";
int s = cmd.indexOf(" ");
- if(s != -1) {
+ if (s != -1) {
prog = cmd.substring(0, s);
- args = cmd.substring(s+1);
+ args = cmd.substring(s + 1);
+ }
+ String progCanon;
+ try {
+ progCanon = new File(prog).getCanonicalPath();
+ } catch (IOException io) {
+ progCanon = prog;
}
- String progCanon = new File(prog).getCanonicalPath();
boolean shipped = shippedCanonFiles_.contains(progCanon);
msg("shipped: " + shipped + " " + progCanon);
- if(shipped) {
+ if (shipped) {
// Change path to simple filename.
// That way when PipeMapRed calls Runtime.exec(),
// it will look for the excutable in Task's working dir.
// And this is where TaskRunner unjars our job jar.
prog = new File(prog).getName();
- if(args.length() > 0) {
+ if (args.length() > 0) {
cmd = prog + " " + args;
} else {
cmd = prog;
@@ -149,68 +151,70 @@
return cmd;
}
- String getHadoopAliasConfFile()
- {
+ String getHadoopAliasConfFile() {
return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
}
-
- void parseArgv()
- {
- if(argv_.length==0) {
+ void parseArgv() {
+ if (argv_.length == 0) {
exitUsage(false);
}
- int i=0;
- while(i < argv_.length) {
+ int i = 0;
+ while (i < argv_.length) {
String s;
- if(argv_[i].equals("-verbose")) {
+ if (argv_[i].equals("-verbose")) {
verbose_ = true;
- } else if(argv_[i].equals("-info")) {
+ } else if (argv_[i].equals("-info")) {
detailedUsage_ = true;
- } else if(argv_[i].equals("-debug")) {
+ } else if (argv_[i].equals("-debug")) {
debug_++;
- } else if((s = optionArg(argv_, i, "-input", false)) != null) {
+ } else if ((s = optionArg(argv_, i, "-input", false)) != null) {
i++;
- inputGlobs_.add(s);
- } else if((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
+ inputSpecs_.add(s);
+ } else if (argv_[i].equals("-inputtagged")) {
+ inputTagged_ = true;
+ } else if ((s = optionArg(argv_, i, "-output", output_ != null)) != null) {
i++;
output_ = s;
- } else if((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
+ } else if ((s = optionArg(argv_, i, "-mapsideoutput", mapsideoutURI_ != null)) != null) {
+ i++;
+ mapsideoutURI_ = s;
+ } else if ((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) {
i++;
mapCmd_ = s;
- } else if((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
+ } else if ((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) {
i++;
comCmd_ = s;
- } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
+ } else if ((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) {
i++;
redCmd_ = s;
- } else if((s = optionArg(argv_, i, "-file", false)) != null) {
+ } else if ((s = optionArg(argv_, i, "-file", false)) != null) {
i++;
packageFiles_.add(s);
- } else if((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
+ } else if ((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) {
i++;
cluster_ = s;
- } else if((s = optionArg(argv_, i, "-config", false)) != null) {
+ } else if ((s = optionArg(argv_, i, "-config", false)) != null) {
i++;
configPath_.add(s);
- } else if((s = optionArg(argv_, i, "-dfs", false)) != null) {
+ } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) {
i++;
- userJobConfProps_.add("fs.default.name="+s);
- } else if((s = optionArg(argv_, i, "-jt", false)) != null) {
+ userJobConfProps_.add("fs.default.name=" + s);
+ } else if ((s = optionArg(argv_, i, "-jt", false)) != null) {
i++;
- userJobConfProps_.add("mapred.job.tracker="+s);
- } else if((s = optionArg(argv_, i, "-jobconf", false)) != null) {
+ userJobConfProps_.add("mapred.job.tracker=" + s);
+ } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) {
i++;
validateNameEqValue(s);
userJobConfProps_.add(s);
- } else if((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
+ } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) {
i++;
validateNameEqValue(s);
- if(addTaskEnvironment_.length() > 0) {
- addTaskEnvironment_ += " ";
+ if (addTaskEnvironment_.length() > 0) {
+ addTaskEnvironment_ += " ";
}
addTaskEnvironment_ += s;
- } else if((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
+ } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
i++;
inReaderSpec_ = s;
} else {
@@ -219,37 +223,35 @@
}
i++;
}
- if(detailedUsage_) {
- exitUsage(true);
+ if (detailedUsage_) {
+ exitUsage(true);
}
}
- String optionArg(String[] args, int index, String arg, boolean argSet)
- {
- if(index >= args.length || ! args[index].equals(arg)) {
+ String optionArg(String[] args, int index, String arg, boolean argSet) {
+ if (index >= args.length || !args[index].equals(arg)) {
return null;
}
- if(argSet) {
+ if (argSet) {
throw new IllegalArgumentException("Can only have one " + arg + " option");
}
- if(index >= args.length-1) {
+ if (index >= args.length - 1) {
throw new IllegalArgumentException("Expected argument after option " + args[index]);
}
- return args[index+1];
+ return args[index + 1];
}
- protected void msg(String msg)
- {
- if(verbose_) {
+ protected void msg(String msg) {
+ if (verbose_) {
System.out.println("STREAM: " + msg);
}
}
- public void exitUsage(boolean detailed)
- {
- // 1 2 3 4 5 6 7
- //1234567890123456789012345678901234567890123456789012345678901234567890123456789
- System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar [options]");
+ public void exitUsage(boolean detailed) {
+ // 1 2 3 4 5 6 7
+ //1234567890123456789012345678901234567890123456789012345678901234567890123456789
+ System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\");
+ System.out.println(" $HADOOP_HOME/hadoop-streaming.jar [options]");
System.out.println("Options:");
System.out.println(" -input <path> DFS input file(s) for the Map step");
System.out.println(" -output <path> DFS output directory for the Reduce step");
@@ -257,58 +259,82 @@
System.out.println(" -combiner <cmd> The streaming command to run");
System.out.println(" -reducer <cmd> The streaming command to run");
System.out.println(" -file <file> File/dir to be shipped in the Job jar file");
- System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
- System.out.println(" -config <file> Optional. One or more paths to xml config files");
- System.out.println(" -dfs <h:p> Optional. Override DFS configuration");
- System.out.println(" -jt <h:p> Optional. Override JobTracker configuration");
+ //Only advertise the standard way: [--config dir] in our launcher
+ //System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
+ //System.out.println(" -config <file> Optional. One or more paths to xml config files");
+ System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration");
+ System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration");
System.out.println(" -inputreader <spec> Optional.");
- System.out.println(" -jobconf <n>=<v> Optional.");
+ System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property");
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
System.out.println(" -verbose");
System.out.println();
- if(!detailed) {
- System.out.println("For more details about these options:");
- System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
- fail("");
+ if (!detailed) {
+ System.out.println("For more details about these options:");
+ System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
+ fail("");
}
System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
System.out.println("Default Map input format: a line is a record in UTF-8");
System.out.println(" the key part ends at first TAB, the rest of the line is the value");
System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
- System.out.println(" comma-separated name-values can be specified to configure the InputFormat");
+ System.out
+ .println(" comma-separated name-values can be specified to configure the InputFormat");
System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
System.out.println("Map output format, reduce input/output format:");
- System.out.println(" Format defined by what mapper command outputs. Line-oriented");
+ System.out.println(" Format defined by what the mapper command outputs. Line-oriented");
System.out.println();
- System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
- System.out.println(" Hadoop clusters. ");
- System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml");
- System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
+ System.out.println("The files or directories named in the -file argument[s] end up in the");
+ System.out.println(" working directory when the mapper and reducer are run.");
+ System.out.println(" The location of this working directory is unspecified.");
System.out.println();
- System.out.println("To skip the shuffle/sort/reduce step:" );
+ //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote ");
+ //System.out.println(" Hadoop clusters. ");
+ //System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml");
+ //System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
+ //System.out.println();
+ System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
System.out.println(" Use -reducer " + REDUCE_NONE);
- System.out.println(" This preserves the map input order and speeds up processing");
+ System.out
+ .println(" A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
+ System.out
+ .println(" This speeds up processing, This also feels more like \"in-place\" processing");
+ System.out.println(" because the input filename and the map input order are preserved");
+ System.out.println("To specify a single side-effect output file");
+ System.out.println(" -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated
+ System.out.println(" If the jobtracker is local this is a local file");
+ System.out.println(" This currently requires -reducer NONE");
System.out.println();
System.out.println("To set the number of reduce tasks (num. of output files):");
System.out.println(" -jobconf mapred.reduce.tasks=10");
- System.out.println("To name the job (appears in the JobTrack Web UI):");
+ System.out.println("To speed up the last reduces:");
+ System.out.println(" -jobconf mapred.speculative.execution=true");
+ System.out.println(" Do not use this along -reducer " + REDUCE_NONE);
+ System.out.println("To name the job (appears in the JobTracker Web UI):");
System.out.println(" -jobconf mapred.job.name='My Job' ");
System.out.println("To specify that line-oriented input is in gzip format:");
- System.out.println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
+ System.out
+ .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
System.out.println(" -jobconf stream.recordreader.compression=gzip ");
System.out.println("To change the local temp directory:");
- System.out.println(" -jobconf dfs.data.dir=/tmp");
+ System.out.println(" -jobconf dfs.data.dir=/tmp/dfs");
+ System.out.println(" -jobconf stream.tmpdir=/tmp/streaming");
System.out.println("Additional local temp directories with -cluster local:");
System.out.println(" -jobconf mapred.local.dir=/tmp/local");
System.out.println(" -jobconf mapred.system.dir=/tmp/system");
System.out.println(" -jobconf mapred.temp.dir=/tmp/temp");
+ System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:");
+ System.out.println(" $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\");
+ System.out
+ .println(" [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
System.out.println("For more details about jobconf parameters see:");
System.out.println(" http://wiki.apache.org/lucene-hadoop/JobConfFile");
System.out.println("To set an environement variable in a streaming command:");
System.out.println(" -cmdenv EXAMPLE_DIR=/home/example/dictionaries/");
System.out.println();
- System.out.println("Shortcut to run from any directory:");
- System.out.println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/build/hadoop-streaming.jar\"");
+ System.out.println("Shortcut:");
+ System.out
+ .println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar\"");
System.out.println();
System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\"");
System.out.println(" -file /local/filter.pl -input \"/logs/0604*/*\" [...]");
@@ -318,81 +344,87 @@
fail("");
}
- public void fail(String message)
- {
- if(mayExit_) {
- System.err.println(message);
- System.exit(1);
+ public void fail(String message) {
+ if (mayExit_) {
+ System.err.println(message);
+ System.exit(1);
} else {
- throw new IllegalArgumentException(message);
+ throw new IllegalArgumentException(message);
}
}
// --------------------------------------------
-
- protected String getHadoopClientHome()
- {
+ protected String getHadoopClientHome() {
String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
- if(h == null) {
+ if (h == null) {
//fail("Missing required environment variable: HADOOP_HOME");
h = "UNDEF";
}
return h;
}
-
- protected boolean isLocalHadoop()
- {
+ protected boolean isLocalHadoop() {
boolean local;
- if(jobConf_ == null) {
- local = getClusterNick().equals("local");
+ if (jobConf_ == null) {
+ local = getClusterNick().equals("local");
} else {
- local = jobConf_.get("mapred.job.tracker", "").equals("local");
+ local = StreamUtil.isLocalJobTracker(jobConf_);
}
return local;
}
- protected String getClusterNick()
- {
+
+ protected String getClusterNick() {
return cluster_;
}
/** @return path to the created Jar file or null if no files are necessary.
- */
- protected String packageJobJar() throws IOException
- {
+ */
+ protected String packageJobJar() throws IOException {
ArrayList unjarFiles = new ArrayList();
// Runtime code: ship same version of code as self (job submitter code)
// usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar
- String runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
- if(runtimeClasses == null) {
- throw new IOException("runtime classes not found: " + getClass().getPackage());
+
+ // First try an explicit spec: it's too hard to find our own location in this case:
+ // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
+ // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
+ String runtimeClasses = jobConf_.get("stream.shipped.hadoopstreaming"); // jar or class dir
+
+ if (runtimeClasses == null) {
+ runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
+ }
+ if (runtimeClasses == null) {
+ throw new IOException("runtime classes not found: " + getClass().getPackage());
} else {
- msg("Found runtime classes in: " + runtimeClasses);
+ msg("Found runtime classes in: " + runtimeClasses);
}
- if(isLocalHadoop()) {
+ if (isLocalHadoop()) {
// don't package class files (they might get unpackaged in "." and then
// hide the intended CLASSPATH entry)
// we still package everything else (so that scripts and executable are found in
// Task workdir like distributed Hadoop)
} else {
- if(new File(runtimeClasses).isDirectory()) {
- packageFiles_.add(runtimeClasses);
+ if (new File(runtimeClasses).isDirectory()) {
+ packageFiles_.add(runtimeClasses);
} else {
- unjarFiles.add(runtimeClasses);
+ unjarFiles.add(runtimeClasses);
}
}
- if(packageFiles_.size() + unjarFiles.size()==0) {
+ if (packageFiles_.size() + unjarFiles.size() == 0) {
return null;
}
- File jobJar = File.createTempFile("streamjob", ".jar");
- System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar);
- if(debug_ == 0) {
+ String tmp = jobConf_.get("stream.tmpdir"); //, "/tmp/${user.name}/"
+ File tmpDir = (tmp == null) ? null : new File(tmp);
+ // tmpDir=null means OS default tmp dir
+ File jobJar = File.createTempFile("streamjob", ".jar", tmpDir);
+ System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar
+ + " tmpDir=" + tmpDir);
+ if (debug_ == 0) {
jobJar.deleteOnExit();
}
JarBuilder builder = new JarBuilder();
- if(verbose_) {
+ if (verbose_) {
builder.setVerbose(true);
}
String jobJarName = jobJar.getAbsolutePath();
@@ -400,53 +432,81 @@
return jobJarName;
}
- protected void setJobConf() throws IOException
- {
+ protected void setUserJobConfProps(boolean doEarlyProps) {
+ Iterator it = userJobConfProps_.iterator();
+ while (it.hasNext()) {
+ String prop = (String) it.next();
+ String[] nv = prop.split("=", 2);
+ if (doEarlyProps == nv[0].equals("fs.default.name")) {
+ msg("xxxJobConf: set(" + nv[0] + ", " + nv[1] + ") early=" + doEarlyProps);
+ jobConf_.set(nv[0], nv[1]);
+ }
+ }
+ }
+
+ protected void setJobConf() throws IOException {
msg("hadoopAliasConf_ = " + hadoopAliasConf_);
config_ = new Configuration();
- if(!cluster_.equals("default")) {
- config_.addFinalResource(new Path(getHadoopAliasConfFile()));
+ if (!cluster_.equals("default")) {
+ config_.addFinalResource(new Path(getHadoopAliasConfFile()));
} else {
// use only defaults: hadoop-default.xml and hadoop-site.xml
}
Iterator it = configPath_.iterator();
- while(it.hasNext()) {
- String pathName = (String)it.next();
- config_.addFinalResource(new Path(pathName));
+ while (it.hasNext()) {
+ String pathName = (String) it.next();
+ config_.addFinalResource(new Path(pathName));
}
+
+ testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge"));
+
// general MapRed job properties
jobConf_ = new JobConf(config_);
- for(int i=0; i<inputGlobs_.size(); i++) {
- jobConf_.addInputPath(new Path((String)inputGlobs_.get(i)));
+
+ setUserJobConfProps(true);
+
+ // The correct FS must be set before this is called!
+ // (to resolve local vs. dfs drive letter differences)
+ // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
+ for (int i = 0; i < inputSpecs_.size(); i++) {
+ addInputSpec((String) inputSpecs_.get(i), i);
+ }
+ jobConf_.setBoolean("stream.inputtagged", inputTagged_);
+ jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
+
+ Class fmt;
+ if (testMerge_ && false == hasSimpleInputSpecs_) {
+ // this ignores -inputreader
+ fmt = MergerInputFormat.class;
+ } else {
+ // need to keep this case to support custom -inputreader
+ // and their parameters ,n=v,n=v
+ fmt = StreamInputFormat.class;
}
+ jobConf_.setInputFormat(fmt);
- jobConf_.setInputFormat(StreamInputFormat.class);
// for SequenceFile, input classes may be overriden in getRecordReader
jobConf_.setInputKeyClass(Text.class);
jobConf_.setInputValueClass(Text.class);
jobConf_.setOutputKeyClass(Text.class);
jobConf_.setOutputValueClass(Text.class);
- //jobConf_.setCombinerClass();
-
- jobConf_.setOutputPath(new Path(output_));
- jobConf_.setOutputFormat(StreamOutputFormat.class);
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
String defaultPackage = this.getClass().getPackage().getName();
Class c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
- if(c != null) {
+ if (c != null) {
jobConf_.setMapperClass(c);
} else {
jobConf_.setMapperClass(PipeMapper.class);
jobConf_.set("stream.map.streamprocessor", mapCmd_);
}
- if(comCmd_ != null) {
+ if (comCmd_ != null) {
c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage);
- if(c != null) {
+ if (c != null) {
jobConf_.setCombinerClass(c);
} else {
jobConf_.setCombinerClass(PipeCombiner.class);
@@ -454,9 +514,11 @@
}
}
- if(redCmd_ != null) {
+ reducerNone_ = false;
+ if (redCmd_ != null) {
+ reducerNone_ = redCmd_.equals(REDUCE_NONE);
c = StreamUtil.goodClassOrNull(redCmd_, defaultPackage);
- if(c != null) {
+ if (c != null) {
jobConf_.setReducerClass(c);
} else {
jobConf_.setReducerClass(PipeReducer.class);
@@ -464,66 +526,165 @@
}
}
- if(inReaderSpec_ != null) {
- String[] args = inReaderSpec_.split(",");
- String readerClass = args[0];
- // this argument can only be a Java class
- c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
- if(c != null) {
- jobConf_.set("stream.recordreader.class", c.getName());
- } else {
- fail("-inputreader: class not found: " + readerClass);
- }
- for(int i=1; i<args.length; i++) {
- String[] nv = args[i].split("=", 2);
- String k = "stream.recordreader." + nv[0];
- String v = (nv.length>1) ? nv[1] : "";
- jobConf_.set(k, v);
- }
+ if (inReaderSpec_ != null) {
+ String[] args = inReaderSpec_.split(",");
+ String readerClass = args[0];
+ // this argument can only be a Java class
+ c = StreamUtil.goodClassOrNull(readerClass, defaultPackage);
+ if (c != null) {
+ jobConf_.set("stream.recordreader.class", c.getName());
+ } else {
+ fail("-inputreader: class not found: " + readerClass);
+ }
+ for (int i = 1; i < args.length; i++) {
+ String[] nv = args[i].split("=", 2);
+ String k = "stream.recordreader." + nv[0];
+ String v = (nv.length > 1) ? nv[1] : "";
+ jobConf_.set(k, v);
+ }
}
- jar_ = packageJobJar();
- if(jar_ != null) {
- jobConf_.setJar(jar_);
+ // output setup is done late so we can customize for reducerNone_
+ //jobConf_.setOutputDir(new File(output_));
+ setOutputSpec();
+ if (testMerge_) {
+ fmt = MuxOutputFormat.class;
+ } else {
+ fmt = StreamOutputFormat.class;
}
+ jobConf_.setOutputFormat(fmt);
// last, allow user to override anything
// (although typically used with properties we didn't touch)
- it = userJobConfProps_.iterator();
- while(it.hasNext()) {
- String prop = (String)it.next();
- String[] nv = prop.split("=", 2);
- msg("xxxJobConf: set(" + nv[0] + ", " + nv[1]+")");
- jobConf_.set(nv[0], nv[1]);
+ setUserJobConfProps(false);
+
+ jar_ = packageJobJar();
+ if (jar_ != null) {
+ jobConf_.setJar(jar_);
}
+
+ if(verbose_) {
+ listJobConfProperties();
+ }
+
msg("submitting to jobconf: " + getJobTrackerHostPort());
}
- protected String getJobTrackerHostPort()
+ protected void listJobConfProperties()
{
+ msg("==== JobConf properties:");
+ Iterator it = jobConf_.entries();
+ TreeMap sorted = new TreeMap();
+ while(it.hasNext()) {
+ Map.Entry en = (Map.Entry)it.next();
+ sorted.put(en.getKey(), en.getValue());
+ }
+ it = sorted.entrySet().iterator();
+ while(it.hasNext()) {
+ Map.Entry en = (Map.Entry)it.next();
+ msg(en.getKey() + "=" + en.getValue());
+ }
+ msg("====");
+ }
+
+ /** InputSpec-s encode: a glob pattern x additional column files x additional joins */
+ protected void addInputSpec(String inSpec, int index) {
+ if (!testMerge_) {
+ jobConf_.addInputPath(new Path(inSpec));
+ } else {
+ CompoundDirSpec spec = new CompoundDirSpec(inSpec, true);
+ msg("Parsed -input:\n" + spec.toTableString());
+ if (index == 0) {
+ hasSimpleInputSpecs_ = (spec.paths_.length == 0);
+ msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_);
+ }
+ String primary = spec.primarySpec();
+ if (!seenPrimary_.add(primary)) {
+ // this won't detect glob overlaps and noncanonical path variations
+ fail("Primary used in multiple -input spec: " + primary);
+ }
+ jobConf_.addInputPath(new Path(primary));
+ // during Job execution, will reparse into a CompoundDirSpec
+ jobConf_.set("stream.inputspecs." + index, inSpec);
+ }
+ }
+
+ /** uses output_ and mapsideoutURI_ */
+ protected void setOutputSpec() throws IOException {
+ CompoundDirSpec spec = new CompoundDirSpec(output_, false);
+ msg("Parsed -output:\n" + spec.toTableString());
+ String primary = spec.primarySpec();
+ String channel0;
+ // TODO simplify cases, encapsulate in a StreamJobConf
+ if (!reducerNone_) {
+ channel0 = primary;
+ } else {
+ if (mapsideoutURI_ != null) {
+ // user can override in case this is in a difft filesystem..
+ try {
+ URI uri = new URI(mapsideoutURI_);
+ if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs")
+ if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) {
+ fail("Must be absolute: " + mapsideoutURI_);
+ }
+ } else if (uri.getScheme().equals("socket")) {
+ // ok
+ } else {
+ fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_);
+ }
+ } catch (URISyntaxException e) {
+ throw (IOException) new IOException().initCause(e);
+ }
+ } else {
+ mapsideoutURI_ = primary;
+ }
+ // an empty reduce output named "part-00002" will go here and not collide.
+ channel0 = primary + ".NONE";
+ // the side-effect of the first split of an input named "part-00002"
+ // will go in this directory
+ jobConf_.set("stream.sideoutput.dir", primary);
+ // oops if user overrides low-level this isn't set yet :-(
+ boolean localjt = StreamUtil.isLocalJobTracker(jobConf_);
+ // just a guess user may prefer remote..
+ jobConf_.setBoolean("stream.sideoutput.localfs", localjt);
+ }
+ // a path in fs.name.default filesystem
+ System.out.println(channel0);
+ System.out.println(new Path(channel0));
+ jobConf_.setOutputPath(new Path(channel0));
+ // will reparse remotely
+ jobConf_.set("stream.outputspec", output_);
+ if (null != mapsideoutURI_) {
+ // a path in "jobtracker's filesystem"
+ // overrides sideoutput.dir
+ jobConf_.set("stream.sideoutput.uri", mapsideoutURI_);
+ }
+ }
+
+ protected String getJobTrackerHostPort() {
return jobConf_.get("mapred.job.tracker");
}
- protected void jobInfo()
- {
- if(isLocalHadoop()) {
+ protected void jobInfo() {
+ if (isLocalHadoop()) {
LOG.info("Job running in-process (local Hadoop)");
} else {
String hp = getJobTrackerHostPort();
LOG.info("To kill this job, run:");
- LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill " + jobId_);
+ LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill "
+ + jobId_);
//LOG.info("Job file: " + running_.getJobFile() );
- LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
+ LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
}
}
// Based on JobClient
public void submitAndMonitorJob() throws IOException {
- if(jar_ != null && isLocalHadoop()) {
- // getAbs became required when shell and subvm have different working dirs...
- File wd = new File(".").getAbsoluteFile();
- StreamUtil.unJar(new File(jar_), wd);
+ if (jar_ != null && isLocalHadoop()) {
+ // getAbs became required when shell and subvm have different working dirs...
+ File wd = new File(".").getAbsoluteFile();
+ StreamUtil.unJar(new File(jar_), wd);
}
// if jobConf_ changes must recreate a JobClient
@@ -542,11 +703,12 @@
while (!running_.isComplete()) {
try {
Thread.sleep(1000);
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ }
running_ = jc_.getJob(jobId_);
String report = null;
- report = " map "+Math.round(running_.mapProgress()*100)
- +"% reduce " + Math.round(running_.reduceProgress()*100)+"%";
+ report = " map " + Math.round(running_.mapProgress() * 100) + "% reduce "
+ + Math.round(running_.reduceProgress() * 100) + "%";
if (!report.equals(lastReport)) {
LOG.info(report);
@@ -569,7 +731,6 @@
}
}
-
protected boolean mayExit_;
protected String[] argv_;
protected boolean verbose_;
@@ -585,11 +746,15 @@
protected JobClient jc_;
// command-line arguments
- protected ArrayList inputGlobs_ = new ArrayList(); // <String>
- protected ArrayList packageFiles_ = new ArrayList(); // <String>
- protected ArrayList shippedCanonFiles_= new ArrayList(); // <String>
- protected ArrayList userJobConfProps_ = new ArrayList(); // <String>
+ protected ArrayList inputSpecs_ = new ArrayList(); // <String>
+ protected boolean inputTagged_ = false;
+ protected TreeSet seenPrimary_ = new TreeSet(); // <String>
+ protected boolean hasSimpleInputSpecs_;
+ protected ArrayList packageFiles_ = new ArrayList(); // <String>
+ protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
+ protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value
protected String output_;
+ protected String mapsideoutURI_;
protected String mapCmd_;
protected String comCmd_;
protected String redCmd_;
@@ -598,6 +763,7 @@
protected String hadoopAliasConf_;
protected String inReaderSpec_;
+ protected boolean testMerge_;
// Use to communicate config to the external processes (ex env.var.HADOOP_USER)
// encoding "a=b c=d"
@@ -609,6 +775,4 @@
protected RunningJob running_;
protected String jobId_;
-
}
-
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Mon Sep 18 16:08:06 2006
@@ -18,7 +18,8 @@
import java.io.*;
import java.nio.charset.MalformedInputException;
-import java.util.zip.GZIPInputStream;
+import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
@@ -35,25 +36,21 @@
* but delimits key and value with a TAB.
* @author Michel Tourn
*/
-public class StreamLineRecordReader extends StreamBaseRecordReader
-{
+public class StreamLineRecordReader extends StreamBaseRecordReader {
- public StreamLineRecordReader(
- FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
- throws IOException
- {
+ public StreamLineRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+ JobConf job, FileSystem fs) throws IOException {
super(in, split, reporter, job, fs);
gzipped_ = StreamInputFormat.isGzippedInput(job);
- if(gzipped_) {
+ if (gzipped_) {
din_ = new DataInputStream(new GZIPInputStream(in_));
} else {
din_ = in_;
}
}
- public void seekNextRecordBoundary() throws IOException
- {
- if(gzipped_) {
+ public void seekNextRecordBoundary() throws IOException {
+ if (gzipped_) {
// no skipping: use din_ as-is
// assumes splitter created only one split per file
return;
@@ -63,7 +60,7 @@
in_.seek(start_ - 1);
// scan to the next newline in the file
while (in_.getPos() < end_) {
- char c = (char)in_.read();
+ char c = (char) in_.read();
bytesSkipped++;
if (c == '\r' || c == '\n') {
break;
@@ -75,51 +72,54 @@
}
}
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- if(!(key instanceof Text)) {
- throw new IllegalArgumentException(
- "Key should be of type Text but: "+key.getClass().getName());
+ public synchronized boolean next(Writable key, Writable value) throws IOException {
+ if (!(key instanceof Text)) {
+ throw new IllegalArgumentException("Key should be of type Text but: "
+ + key.getClass().getName());
}
- if(!(value instanceof Text)) {
- throw new IllegalArgumentException(
- "Value should be of type Text but: "+value.getClass().getName());
+ if (!(value instanceof Text)) {
+ throw new IllegalArgumentException("Value should be of type Text but: "
+ + value.getClass().getName());
}
- Text tKey = (Text)key;
- Text tValue = (Text)value;
- byte [] line;
-
+ Text tKey = (Text) key;
+ Text tValue = (Text) value;
+ byte[] line;
+
while (true) {
- if(gzipped_) {
- // figure EOS from readLine
+ if (gzipped_) {
+ // figure EOS from readLine
+ } else {
+ long pos = in_.getPos();
+ if (pos >= end_) return false;
+ }
+
+ line = UTF8ByteArrayUtils.readLine((InputStream) in_);
+ try {
+ Text.validateUTF8(line);
+ } catch (MalformedInputException m) {
+ System.err.println("line=" + line + "|" + new Text(line));
+ System.out.flush();
+ }
+ if (line == null) return false;
+ try {
+ int tab = UTF8ByteArrayUtils.findTab(line);
+ if (tab == -1) {
+ tKey.set(line);
+ tValue.set("");
} else {
- long pos = in_.getPos();
- if (pos >= end_)
- return false;
- }
-
- line = UTF8ByteArrayUtils.readLine((InputStream)in_);
- if(line==null)
- return false;
- try {
- int tab=UTF8ByteArrayUtils.findTab(line);
- if(tab == -1) {
- tKey.set(line);
- tValue.set("");
- } else {
- UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
- }
- break;
- } catch (MalformedInputException e) {
- LOG.warn(e);
- StringUtils.stringifyException(e);
+ UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
}
+ break;
+ } catch (MalformedInputException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
}
- numRecStats( line, 0, line.length );
+ numRecStats(line, 0, line.length);
return true;
}
+
boolean gzipped_;
GZIPInputStream zin_;
- DataInputStream din_; // GZIP or plain
+ DataInputStream din_; // GZIP or plain
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Mon Sep 18 16:08:06 2006
@@ -35,28 +35,27 @@
*/
public class StreamOutputFormat implements OutputFormat {
- public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
- String name, Progressable progr) throws IOException {
-
+ public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progr) throws IOException {
+
File file = new File(job.getOutputDir(), name);
final FSDataOutputStream out = fs.create(file);
return new RecordWriter() {
- public synchronized void write(WritableComparable key, Writable value)
- throws IOException {
- out.write(key.toString().getBytes("UTF-8"));
- out.writeByte('\t');
- out.write(value.toString().getBytes("UTF-8"));
- out.writeByte('\n');
- }
- public synchronized void close(Reporter reporter) throws IOException {
- out.close();
- }
- };
+
+ public synchronized void write(WritableComparable key, Writable value) throws IOException {
+ out.write(key.toString().getBytes("UTF-8"));
+ out.writeByte('\t');
+ out.write(value.toString().getBytes("UTF-8"));
+ out.writeByte('\n');
+ }
+
+ public synchronized void close(Reporter reporter) throws IOException {
+ out.close();
+ }
+ };
}
-
-
+
/** Check whether the output specification for a job is appropriate. Called
* when a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
@@ -65,9 +64,8 @@
* @param job the job whose output will be written
* @throws IOException when output should not be attempted
*/
- public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException
- {
+ public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
// allow existing data (for app-level restartability)
}
-
+
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Mon Sep 18 16:08:06 2006
@@ -29,22 +29,16 @@
import org.apache.hadoop.util.ReflectionUtils;
-public class StreamSequenceRecordReader extends StreamBaseRecordReader
-{
+public class StreamSequenceRecordReader extends StreamBaseRecordReader {
- public StreamSequenceRecordReader (
- FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
- throws IOException
- {
+ public StreamSequenceRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+ JobConf job, FileSystem fs) throws IOException {
super(in, split, reporter, job, fs);
numFailed_ = 0;
// super.in_ ignored, using rin_ instead
}
-
- public synchronized boolean next(Writable key, Writable value)
- throws IOException
- {
+ public synchronized boolean next(Writable key, Writable value) throws IOException {
boolean success;
do {
if (!more_) return false;
@@ -58,29 +52,26 @@
more_ = eof;
}
success = true;
- } catch(IOException io) {
+ } catch (IOException io) {
numFailed_++;
- if(numFailed_ < 100 || numFailed_ % 100 == 0) {
- err_.println("StreamSequenceRecordReader: numFailed_/numRec_="
- + numFailed_+ "/" + numRec_);
+ if (numFailed_ < 100 || numFailed_ % 100 == 0) {
+ err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" + numFailed_ + "/"
+ + numRec_);
}
io.printStackTrace(err_);
success = false;
}
- } while(!success);
-
+ } while (!success);
+
numRecStats(new byte[0], 0, 0);
return more_;
}
-
- public void seekNextRecordBoundary() throws IOException
- {
+ public void seekNextRecordBoundary() throws IOException {
rin_ = new SequenceFile.Reader(fs_, split_.getPath(), job_);
end_ = split_.getStart() + split_.getLength();
- if (split_.getStart() > rin_.getPosition())
- rin_.sync(split_.getStart()); // sync to start
+ if (split_.getStart() > rin_.getPosition()) rin_.sync(split_.getStart()); // sync to start
more_ = rin_.getPosition() < end_;
@@ -90,14 +81,13 @@
}
public WritableComparable createKey() {
- return (WritableComparable)
- ReflectionUtils.newInstance(rin_.getKeyClass(), null);
+ return (WritableComparable) ReflectionUtils.newInstance(rin_.getKeyClass(), null);
}
-
+
public Writable createValue() {
return (Writable) ReflectionUtils.newInstance(rin_.getValueClass(), null);
}
-
+
boolean more_;
SequenceFile.Reader rin_;
int numFailed_;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Mon Sep 18 16:08:06 2006
@@ -19,69 +19,81 @@
import java.text.DecimalFormat;
import java.io.*;
import java.net.*;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
import java.util.jar.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+
/** Utilities not available elsewhere in Hadoop.
*
*/
-public class StreamUtil
-{
+public class StreamUtil {
- public static Class goodClassOrNull(String className, String defaultPackage)
- {
- if(className.indexOf('.') == -1 && defaultPackage != null) {
- className = defaultPackage + "." + className;
+ /** It may seem strange to silently switch behaviour when a String
+ * is not a classname; the reason is simplified Usage:<pre>
+ * -mapper [classname | program ]
+ * instead of the explicit Usage:
+ * [-mapper program | -javamapper classname], -mapper and -javamapper are mutually exclusive.
+ * (repeat for -reducer, -combiner) </pre>
+ */
+ public static Class goodClassOrNull(String className, String defaultPackage) {
+ if (className.indexOf('.') == -1 && defaultPackage != null) {
+ className = defaultPackage + "." + className;
}
Class clazz = null;
try {
- clazz = Class.forName(className);
- } catch(ClassNotFoundException cnf) {
- } catch(LinkageError cnf) {
+ clazz = Class.forName(className);
+ } catch (ClassNotFoundException cnf) {
+ } catch (LinkageError cnf) {
}
return clazz;
}
-
- /** @return a jar file path or a base directory or null if not found.
+
+ public static String findInClasspath(String className) {
+ return findInClasspath(className, StreamUtil.class.getClassLoader());
+ }
+
+ /** @return a jar file path or a base directory or null if not found.
*/
- public static String findInClasspath(String className)
- {
+ public static String findInClasspath(String className, ClassLoader loader) {
String relPath = className;
- if (!relPath.startsWith("/")) {
- relPath = "/" + relPath;
- }
relPath = relPath.replace('.', '/');
relPath += ".class";
-
- java.net.URL classUrl = StreamUtil.class.getResource(relPath);
+ java.net.URL classUrl = loader.getResource(relPath);
String codePath;
if (classUrl != null) {
- boolean inJar = classUrl.getProtocol().equals("jar");
- codePath = classUrl.toString();
- if(codePath.startsWith("jar:")) {
- codePath = codePath.substring("jar:".length());
- }
- if(codePath.startsWith("file:")) { // can have both
- codePath = codePath.substring("file:".length());
- }
- if(inJar) {
- // A jar spec: remove class suffix in /path/my.jar!/package/Class
- int bang = codePath.lastIndexOf('!');
- codePath = codePath.substring(0, bang);
- } else {
- // A class spec: remove the /my/package/Class.class portion
- int pos = codePath.lastIndexOf(relPath);
- if(pos == -1) {
- throw new IllegalArgumentException(
- "invalid codePath: className=" + className + " codePath=" + codePath);
- }
- codePath = codePath.substring(0, pos);
+ boolean inJar = classUrl.getProtocol().equals("jar");
+ codePath = classUrl.toString();
+ if (codePath.startsWith("jar:")) {
+ codePath = codePath.substring("jar:".length());
+ }
+ if (codePath.startsWith("file:")) { // can have both
+ codePath = codePath.substring("file:".length());
+ }
+ if (inJar) {
+ // A jar spec: remove class suffix in /path/my.jar!/package/Class
+ int bang = codePath.lastIndexOf('!');
+ codePath = codePath.substring(0, bang);
+ } else {
+ // A class spec: remove the /my/package/Class.class portion
+ int pos = codePath.lastIndexOf(relPath);
+ if (pos == -1) {
+ throw new IllegalArgumentException("invalid codePath: className=" + className
+ + " codePath=" + codePath);
}
+ codePath = codePath.substring(0, pos);
+ }
} else {
- codePath = null;
+ codePath = null;
}
return codePath;
}
@@ -92,7 +104,7 @@
try {
Enumeration entries = jar.entries();
while (entries.hasMoreElements()) {
- JarEntry entry = (JarEntry)entries.nextElement();
+ JarEntry entry = (JarEntry) entries.nextElement();
if (!entry.isDirectory()) {
InputStream in = jar.getInputStream(entry);
try {
@@ -117,9 +129,7 @@
jar.close();
}
}
-
-
final static long KB = 1024L * 1;
final static long MB = 1024L * KB;
final static long GB = 1024L * MB;
@@ -128,64 +138,61 @@
static DecimalFormat dfm = new DecimalFormat("####.000");
static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
-
- public static String dfmt(double d)
- {
+
+ public static String dfmt(double d) {
return dfm.format(d);
}
- public static String ifmt(double d)
- {
+
+ public static String ifmt(double d) {
return ifm.format(d);
}
-
- public static String formatBytes(long numBytes)
- {
+
+ public static String formatBytes(long numBytes) {
StringBuffer buf = new StringBuffer();
- boolean bDetails = true;
+ boolean bDetails = true;
double num = numBytes;
-
- if(numBytes < KB) {
+
+ if (numBytes < KB) {
buf.append(numBytes + " B");
bDetails = false;
- } else if(numBytes < MB) {
- buf.append(dfmt(num/KB) + " KB");
- } else if(numBytes < GB) {
- buf.append(dfmt(num/MB) + " MB");
- } else if(numBytes < TB) {
- buf.append(dfmt(num/GB) + " GB");
- } else if(numBytes < PB) {
- buf.append(dfmt(num/TB) + " TB");
+ } else if (numBytes < MB) {
+ buf.append(dfmt(num / KB) + " KB");
+ } else if (numBytes < GB) {
+ buf.append(dfmt(num / MB) + " MB");
+ } else if (numBytes < TB) {
+ buf.append(dfmt(num / GB) + " GB");
+ } else if (numBytes < PB) {
+ buf.append(dfmt(num / TB) + " TB");
} else {
- buf.append(dfmt(num/PB) + " PB");
+ buf.append(dfmt(num / PB) + " PB");
}
- if(bDetails) {
+ if (bDetails) {
buf.append(" (" + ifmt(numBytes) + " bytes)");
}
return buf.toString();
}
- public static String formatBytes2(long numBytes)
- {
+ public static String formatBytes2(long numBytes) {
StringBuffer buf = new StringBuffer();
long u = 0;
- if(numBytes >= TB) {
- u = numBytes/TB;
- numBytes -= u*TB;
+ if (numBytes >= TB) {
+ u = numBytes / TB;
+ numBytes -= u * TB;
buf.append(u + " TB ");
}
- if(numBytes >= GB) {
- u = numBytes/GB;
- numBytes -= u*GB;
+ if (numBytes >= GB) {
+ u = numBytes / GB;
+ numBytes -= u * GB;
buf.append(u + " GB ");
}
- if(numBytes >= MB) {
- u = numBytes/MB;
- numBytes -= u*MB;
+ if (numBytes >= MB) {
+ u = numBytes / MB;
+ numBytes -= u * MB;
buf.append(u + " MB ");
}
- if(numBytes >= KB) {
- u = numBytes/KB;
- numBytes -= u*KB;
+ if (numBytes >= KB) {
+ u = numBytes / KB;
+ numBytes -= u * KB;
buf.append(u + " KB ");
}
buf.append(u + " B"); //even if zero
@@ -194,125 +201,295 @@
static Environment env;
static String HOST;
-
+
static {
try {
env = new Environment();
HOST = env.getHost();
- } catch(IOException io) {
+ } catch (IOException io) {
io.printStackTrace();
}
}
- static class StreamConsumer extends Thread
- {
- StreamConsumer(InputStream in, OutputStream out)
- {
- this.bin = new LineNumberReader(
- new BufferedReader(new InputStreamReader(in)));
- if(out != null) {
+ static class StreamConsumer extends Thread {
+
+ StreamConsumer(InputStream in, OutputStream out) {
+ this.bin = new LineNumberReader(new BufferedReader(new InputStreamReader(in)));
+ if (out != null) {
this.bout = new DataOutputStream(out);
}
}
- public void run()
- {
+
+ public void run() {
try {
String line;
- while((line=bin.readLine()) != null) {
- if(bout != null) {
+ while ((line = bin.readLine()) != null) {
+ if (bout != null) {
bout.writeUTF(line); //writeChars
bout.writeChar('\n');
}
}
bout.flush();
- } catch(IOException io) {
+ } catch (IOException io) {
}
}
+
LineNumberReader bin;
DataOutputStream bout;
}
- static void exec(String arg, PrintStream log)
- {
- exec( new String[] {arg}, log );
+ static void exec(String arg, PrintStream log) {
+ exec(new String[] { arg }, log);
}
-
- static void exec(String[] args, PrintStream log)
- {
- try {
- log.println("Exec: start: " + Arrays.asList(args));
- Process proc = Runtime.getRuntime().exec(args);
- new StreamConsumer(proc.getErrorStream(), log).start();
- new StreamConsumer(proc.getInputStream(), log).start();
- int status = proc.waitFor();
- //if status != 0
- log.println("Exec: status=" + status + ": " + Arrays.asList(args));
- } catch(InterruptedException in) {
- in.printStackTrace();
- } catch(IOException io) {
- io.printStackTrace();
- }
+
+ static void exec(String[] args, PrintStream log) {
+ try {
+ log.println("Exec: start: " + Arrays.asList(args));
+ Process proc = Runtime.getRuntime().exec(args);
+ new StreamConsumer(proc.getErrorStream(), log).start();
+ new StreamConsumer(proc.getInputStream(), log).start();
+ int status = proc.waitFor();
+ //if status != 0
+ log.println("Exec: status=" + status + ": " + Arrays.asList(args));
+ } catch (InterruptedException in) {
+ in.printStackTrace();
+ } catch (IOException io) {
+ io.printStackTrace();
+ }
}
-
- static String qualifyHost(String url)
- {
+
+ static String qualifyHost(String url) {
try {
- return qualifyHost(new URL(url)).toString();
- } catch(IOException io) {
- return url;
+ return qualifyHost(new URL(url)).toString();
+ } catch (IOException io) {
+ return url;
}
}
-
- static URL qualifyHost(URL url)
- {
+
+ static URL qualifyHost(URL url) {
try {
InetAddress a = InetAddress.getByName(url.getHost());
String qualHost = a.getCanonicalHostName();
URL q = new URL(url.getProtocol(), qualHost, url.getPort(), url.getFile());
return q;
- } catch(IOException io) {
+ } catch (IOException io) {
return url;
}
}
-
+
static final String regexpSpecials = "[]()?*+|.!^-\\~@";
-
- public static String regexpEscape(String plain)
- {
+
+ public static String regexpEscape(String plain) {
StringBuffer buf = new StringBuffer();
char[] ch = plain.toCharArray();
int csup = ch.length;
- for(int c=0; c<csup; c++) {
- if(regexpSpecials.indexOf(ch[c]) != -1) {
- buf.append("\\");
+ for (int c = 0; c < csup; c++) {
+ if (regexpSpecials.indexOf(ch[c]) != -1) {
+ buf.append("\\");
}
buf.append(ch[c]);
}
return buf.toString();
}
-
- static String slurp(File f) throws IOException
- {
+
+ public static String safeGetCanonicalPath(File f) {
+ try {
+ String s = f.getCanonicalPath();
+ return (s == null) ? f.toString() : s;
+ } catch (IOException io) {
+ return f.toString();
+ }
+ }
+
+ static String slurp(File f) throws IOException {
+ int len = (int) f.length();
+ byte[] buf = new byte[len];
FileInputStream in = new FileInputStream(f);
- int len = (int)f.length();
+ String contents = null;
+ try {
+ in.read(buf, 0, len);
+ contents = new String(buf, "UTF-8");
+ } finally {
+ in.close();
+ }
+ return contents;
+ }
+
+ static String slurpHadoop(Path p, FileSystem fs) throws IOException {
+ int len = (int) fs.getLength(p);
byte[] buf = new byte[len];
- in.read(buf, 0, len);
- return new String(buf);
+ InputStream in = fs.open(p);
+ String contents = null;
+ try {
+ in.read(buf, 0, len);
+ contents = new String(buf, "UTF-8");
+ } finally {
+ in.close();
+ }
+ return contents;
}
-
+
+ public static String rjustify(String s, int width) {
+ if (s == null) s = "null";
+ if (width > s.length()) {
+ s = getSpace(width - s.length()) + s;
+ }
+ return s;
+ }
+
+ public static String ljustify(String s, int width) {
+ if (s == null) s = "null";
+ if (width > s.length()) {
+ s = s + getSpace(width - s.length());
+ }
+ return s;
+ }
+
+ static char[] space;
+ static {
+ space = new char[300];
+ Arrays.fill(space, '\u0020');
+ }
+
+ public static String getSpace(int len) {
+ if (len > space.length) {
+ space = new char[Math.max(len, 2 * space.length)];
+ Arrays.fill(space, '\u0020');
+ }
+ return new String(space, 0, len);
+ }
+
static private Environment env_;
-
- static Environment env()
- {
- if(env_ != null) {
+
+ static Environment env() {
+ if (env_ != null) {
return env_;
}
try {
env_ = new Environment();
- } catch(IOException io) {
+ } catch (IOException io) {
io.printStackTrace();
}
return env_;
}
+
+ public static String makeJavaCommand(Class main, String[] argv) {
+ ArrayList vargs = new ArrayList();
+ File javaHomeBin = new File(System.getProperty("java.home"), "bin");
+ File jvm = new File(javaHomeBin, "java");
+ vargs.add(jvm.toString());
+ // copy parent classpath
+ vargs.add("-classpath");
+ vargs.add("\"" + System.getProperty("java.class.path") + "\"");
+
+ // Add main class and its arguments
+ vargs.add(main.getName());
+ for (int i = 0; i < argv.length; i++) {
+ vargs.add(argv[i]);
+ }
+ return collate(vargs, " ");
+ }
+
+ public static String collate(Object[] args, String sep) {
+ return collate(Arrays.asList(args), sep);
+ }
+
+ public static String collate(List args, String sep) {
+ StringBuffer buf = new StringBuffer();
+ Iterator it = args.iterator();
+ while (it.hasNext()) {
+ if (buf.length() > 0) {
+ buf.append(" ");
+ }
+ buf.append(it.next());
+ }
+ return buf.toString();
+ }
+
+ // JobConf helpers
+
+ public static FileSplit getCurrentSplit(JobConf job) {
+ String path = job.get("map.input.file");
+ if (path == null) {
+ return null;
+ }
+ Path p = new Path(path);
+ long start = Long.parseLong(job.get("map.input.start"));
+ long length = Long.parseLong(job.get("map.input.length"));
+ return new FileSplit(p, start, length);
+ }
+
+ static class TaskId {
+
+ boolean mapTask;
+ String jobid;
+ int taskid;
+ int execid;
+ }
+
+ public static boolean isLocalJobTracker(JobConf job) {
+ return job.get("mapred.job.tracker", "local").equals("local");
+ }
+
+ public static TaskId getTaskInfo(JobConf job) {
+ TaskId res = new TaskId();
+
+ String id = job.get("mapred.task.id");
+ if (isLocalJobTracker(job)) {
+ // it uses difft naming
+ res.mapTask = job.getBoolean("mapred.task.is.map", true);
+ res.jobid = "0";
+ res.taskid = 0;
+ res.execid = 0;
+ } else {
+ String[] e = id.split("_");
+ res.mapTask = e[2].equals("m");
+ res.jobid = e[1];
+ res.taskid = Integer.parseInt(e[3]);
+ res.execid = Integer.parseInt(e[4]);
+ }
+ return res;
+ }
+
+ static boolean getUseMapSideEffect(JobConf job) {
+ String reduce = job.get("stream.reduce.streamprocessor");
+ return StreamJob.REDUCE_NONE.equals(reduce);
+ }
+
+ public static void touch(File file) throws IOException {
+ file = file.getAbsoluteFile();
+ FileOutputStream out = new FileOutputStream(file);
+ out.close();
+ if (!file.exists()) {
+ throw new IOException("touch failed: " + file);
+ }
+ }
+
+ public static boolean isCygwin() {
+ String OS = System.getProperty("os.name");
+ return (OS.indexOf("Windows") > -1);
+ }
+
+ public static String localizeBin(String path) {
+ if (isCygwin()) {
+ path = "C:/cygwin/" + path;
+ }
+ return path;
+ }
+ /** @param name foo where <junit><sysproperty key="foo" value="${foo}"/>
+ * If foo is undefined then Ant sets the unevaluated value.
+ * Take this into account when setting defaultVal. */
+ public static String getBoundAntProperty(String name, String defaultVal)
+ {
+ String val = System.getProperty(name);
+ if(val != null && val.indexOf("${") >= 0) {
+ val = null;
+ }
+ if(val == null) {
+ val = defaultVal;
+ }
+ return val;
+ }
+
}