You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/11 10:58:06 UTC
svn commit: r420768 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
Author: cutting
Date: Tue Jul 11 01:58:05 2006
New Revision: 420768
URL: http://svn.apache.org/viewvc?rev=420768&view=rev
Log:
HADOOP-355. Updates to streaming contrib module. Contributed by Michel.
Added:
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 11 01:58:05 2006
@@ -14,6 +14,10 @@
not yet complete, i.e., that are queued or running.
(Mahadev Konar via cutting)
+ 4. HADOOP-355. Updates to the streaming contrib module, including
+ API fixes, making reduce optional, and adding an input type for
+ StreamSequenceRecordReader. (Michel Tourn via cutting)
+
Release 0.4.0 - 2006-06-28
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Jul 11 01:58:05 2006
@@ -35,11 +35,16 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
/** Shared functionality for PipeMapper, PipeReducer.
* @author Michel Tourn
@@ -56,7 +61,13 @@
*/
abstract String getKeyColPropName();
-
+ /** Write output as side-effect files rather than as map outputs.
+ This is useful to do "Map" tasks rather than "MapReduce" tasks. */
+ boolean getUseSideEffect()
+ {
+ return false;
+ }
+
/**
* @returns how many TABS before the end of the key part
* usually: 1 or "ALL"
@@ -154,7 +165,10 @@
String argv = getPipeCommand(job);
keyCols_ = getKeyColsFromPipeCommand(argv);
- doPipe_ = (argv != null);
+ job_ = job;
+
+ // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
+ doPipe_ = (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
if(!doPipe_) return;
setStreamJobDetails(job);
@@ -169,29 +183,46 @@
new MustangFile(prog).setExecutable(true, true);
}
+
+ if(job_.getInputValueClass().equals(BytesWritable.class)) {
+ // TODO expose as separate config:
+ // job or semistandard inputformat property
+ optUseKey_ = false;
+ }
+
+ optSideEffect_ = getUseSideEffect();
+
+ if(optSideEffect_) {
+ String fileName = job_.get("mapred.task.id");
+ sideEffectPath_ = new Path(job_.getOutputPath(), fileName);
+ FileSystem fs = FileSystem.get(job_);
+ sideEffectOut_ = fs.create(sideEffectPath_);
+ }
+
// argvSplit[0]:
// An absolute path should be a preexisting valid path on all TaskTrackers
// A relative path should match in the unjarred Job data
// In this case, force an absolute path to make sure exec finds it.
argvSplit[0] = new File(argvSplit[0]).getAbsolutePath();
- log_.println("PipeMapRed exec " + Arrays.asList(argvSplit));
-
+ logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
+ logprintln("sideEffectPath_=" + sideEffectPath_);
Environment childEnv = (Environment)StreamUtil.env().clone();
- addEnvironment(childEnv, job.get("stream.addenvironment"));
+ addEnvironment(childEnv, job_.get("stream.addenvironment"));
sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
/* // This way required jdk1.5
ProcessBuilder processBuilder = new ProcessBuilder(argvSplit);
Map<String, String> env = processBuilder.environment();
- addEnvironment(env, job.get("stream.addenvironment"));
+ addEnvironment(env, job_.get("stream.addenvironment"));
sim = processBuilder.start();
*/
clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
- clientIn_ = new BufferedReader(new InputStreamReader(sim.getInputStream()));
- clientErr_ = new DataInputStream(sim.getErrorStream());
+ clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+ clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
doneLock_ = new Object();
+ startTime_ = System.currentTimeMillis();
} catch(Exception e) {
e.printStackTrace();
@@ -205,7 +236,7 @@
String s = job.get("stream.minRecWrittenToEnableSkip_");
if(s != null) {
minRecWrittenToEnableSkip_ = Long.parseLong(s);
- log_.println("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
+ logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
}
}
@@ -222,6 +253,22 @@
}
+ void logprintln(String s)
+ {
+ if(log_ != null) {
+ log_.println(s);
+ } else {
+ System.err.println(s); // or LOG.info()
+ }
+ }
+
+ void logflush()
+ {
+ if(log_ != null) {
+ log_.flush();
+ }
+ }
+
void addEnvironment(Properties env, String nameVals)
{
// encoding "a=b c=d" from StreamJob
@@ -230,9 +277,9 @@
for(int i=0; i<nv.length; i++) {
String[] pair = nv[i].split("=", 2);
if(pair.length != 2) {
- log_.println("Skip ev entry:" + nv[i]);
+ logprintln("Skip ev entry:" + nv[i]);
} else {
- log_.println("Add ev entry:" + nv[i]);
+ logprintln("Add ev entry:" + nv[i]);
env.put(pair[0], pair[1]);
}
}
@@ -293,18 +340,23 @@
// 3/4 Tool to Hadoop
while((answer = clientIn_.readLine()) != null) {
// 4/4 Hadoop out
- splitKeyVal(answer, key, val);
- output.collect(key, val);
- numRecWritten_++;
- if(numRecWritten_ % 100 == 0) {
- log_.println(numRecRead_+"/"+numRecWritten_);
- log_.flush();
+ if(optSideEffect_) {
+ sideEffectOut_.write(answer.getBytes());
+ sideEffectOut_.write('\n');
+ } else {
+ splitKeyVal(answer, key, val);
+ output.collect(key, val);
+ numRecWritten_++;
+ if(numRecWritten_ % 100 == 0) {
+ logprintln(numRecRead_+"/"+numRecWritten_);
+ logflush();
+ }
}
}
} catch(IOException io) {
io.printStackTrace(log_);
}
- log_.println("MROutputThread done");
+ logprintln("MROutputThread done");
} finally {
outputDone_ = true;
synchronized(doneLock_) {
@@ -332,7 +384,7 @@
int bucket = 100;
while((line=clientErr_.readLine()) != null) {
num++;
- log_.println(line);
+ logprintln(line);
if(num < 10) {
String hline = "MRErr: " + line;
System.err.println(hline);
@@ -353,10 +405,19 @@
public void mapRedFinished()
{
- log_.println("mapRedFinished");
+ logprintln("mapRedFinished");
try {
if(!doPipe_) return;
try {
+ if(optSideEffect_) {
+ logprintln("closing " + sideEffectPath_);
+ sideEffectOut_.close();
+ logprintln("closed " + sideEffectPath_);
+ }
+ } catch(IOException io) {
+ io.printStackTrace();
+ }
+ try {
if(clientOut_ != null) {
clientOut_.close();
}
@@ -385,10 +446,12 @@
void maybeLogRecord()
{
if(numRecRead_ >= nextRecReadLog_) {
- log_.println(numRecInfo());
- log_.flush();
- nextRecReadLog_ *= 10;
- //nextRecReadLog_ += 1000;
+ String info = numRecInfo();
+ logprintln(info);
+ logflush();
+ System.err.println(info);
+ //nextRecReadLog_ *= 10;
+ nextRecReadLog_ += 100;
}
}
@@ -417,7 +480,15 @@
String numRecInfo()
{
- return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_;
+ long elapsed = (System.currentTimeMillis() - startTime_)/1000;
+ long total = numRecRead_+numRecWritten_+numRecSkipped_;
+ return "R/W/S=" + numRecRead_+"/"+numRecWritten_+"/"+numRecSkipped_
+ + " in:" + safeDiv(numRecRead_, elapsed) + " [rec/s]"
+ + " out:" + safeDiv(numRecWritten_, elapsed) + " [rec/s]";
+ }
+ String safeDiv(long n, long d)
+ {
+ return (d==0) ? "NA" : ""+n/d + "=" + n + "/" + d;
}
String logFailure(Exception e)
{
@@ -425,15 +496,15 @@
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String msg = "log:" + jobLog_ + "\n" + getContext() + sw + "\n";
- log_.println(msg);
+ logprintln(msg);
return msg;
}
+ long startTime_;
long numRecRead_ = 0;
long numRecWritten_ = 0;
long numRecSkipped_ = 0;
-
long nextRecReadLog_ = 1;
long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
@@ -441,6 +512,8 @@
int keyCols_;
final static int ALL_COLS = Integer.MAX_VALUE;
+ JobConf job_;
+
// generic MapRed parameters passed on by hadoopStreaming
String taskid_;
int reportPortPlusOne_;
@@ -455,24 +528,31 @@
boolean errorDone_;
DataOutputStream clientOut_;
DataInputStream clientErr_;
- BufferedReader clientIn_;
+ DataInputStream clientIn_;
String jobLog_;
// set in PipeMapper/PipeReducer subclasses
String mapredKey_;
int numExceptions_;
+ boolean optUseKey_ = true;
+
+ boolean optSideEffect_;
+ Path sideEffectPath_;
+ FSDataOutputStream sideEffectOut_;
+
String LOGNAME;
PrintStream log_;
+ /* curr. going to stderr so that it is preserved
{ // instance initializer
try {
int id = (int)((System.currentTimeMillis()/2000) % 10);
String sid = id+ "." + StreamUtil.env().get("USER");
LOGNAME = "/tmp/PipeMapRed." + sid + ".log";
log_ = new PrintStream(new FileOutputStream(LOGNAME));
- log_.println(new java.util.Date());
- log_.flush();
+ logprintln(new java.util.Date());
+ logflush();
} catch(IOException io) {
System.err.println("LOGNAME=" + LOGNAME);
io.printStackTrace();
@@ -482,5 +562,5 @@
}
}
}
-
+ */
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Tue Jul 11 01:58:05 2006
@@ -25,6 +25,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
@@ -47,6 +48,15 @@
return "mapKeyCols";
}
+ boolean getUseSideEffect()
+ {
+ String reduce = job_.get("stream.reduce.streamprocessor");
+ if(StreamJob.REDUCE_NONE.equals(reduce)) {
+ return true;
+ }
+ return false;
+ }
+
// Do NOT declare default constructor
// (MapRed creates it reflectively)
@@ -61,16 +71,28 @@
}
try {
// 1/4 Hadoop in
- mapredKey_ = key.toString();
+ if(key instanceof BytesWritable) {
+ mapredKey_ = new String(((BytesWritable)key).get(), "UTF-8");
+ } else {
+ mapredKey_ = key.toString();
+ }
numRecRead_++;
maybeLogRecord();
// 2/4 Hadoop to Tool
if(numExceptions_==0) {
- clientOut_.writeBytes(mapredKey_);
- clientOut_.writeBytes("\t");
- clientOut_.writeBytes(value.toString());
+ String sval;
+ if(value instanceof BytesWritable) {
+ sval = new String(((BytesWritable)value).get(), "UTF-8");
+ } else {
+ sval = value.toString();
+ }
+ if(optUseKey_) {
+ clientOut_.writeBytes(mapredKey_);
+ clientOut_.writeBytes("\t");
+ }
+ clientOut_.writeBytes(sval);
clientOut_.writeBytes("\n");
clientOut_.flush();
} else {
@@ -90,11 +112,10 @@
}
}
-
public void close()
{
appendLogToJobLog("success");
mapRedFinished();
}
-
+
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Tue Jul 11 01:58:05 2006
@@ -20,6 +20,7 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapred.Reporter;
@@ -46,17 +47,18 @@
final String CONF_NS = "stream.recordreader.";
public StreamBaseRecordReader(
- FSDataInputStream in, long start, long end,
- String splitName, Reporter reporter, JobConf job)
+ FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
throws IOException
{
in_ = in;
- start_ = start;
- end_ = end;
- length_ = end_ - start_;
- splitName_ = splitName;
+ split_ = split;
+ start_ = split_.getStart();
+ length_ = split_.getLength();
+ end_ = start_ + length_;
+ splitName_ = split_.getFile().getName();
reporter_ = reporter;
job_ = job;
+ fs_ = fs;
statusMaxRecordChars_ = job_.getInt(CONF_NS + "statuschars", 200);
}
@@ -67,6 +69,18 @@
*/
public abstract boolean next(Writable key, Writable value) throws IOException;
+ /** This implementation always returns true. */
+ public boolean[] areValidInputDirectories(FileSystem fileSys,
+ Path[] inputDirs) throws IOException
+ {
+ int n = inputDirs.length;
+ boolean[] b = new boolean[n];
+ for(int i=0; i<n; i++) {
+ b[i] = true;
+ }
+ return b;
+ }
+
/** Returns the current position in the input. */
public synchronized long getPos() throws IOException
{
@@ -125,18 +139,22 @@
} else {
recStr = record.toString();
}
- String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " Processing record=" + recStr;
+ String unqualSplit = split_.getFile().getName() + ":" + split_.getStart() + "+" + split_.getLength();
+ String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos
+ + " " + unqualSplit + " Processing record=" + recStr;
status += " " + splitName_;
return status;
}
FSDataInputStream in_;
+ FileSplit split_;
long start_;
long end_;
long length_;
String splitName_;
Reporter reporter_;
JobConf job_;
+ FileSystem fs_;
int numRec_ = 0;
int nextStatusRec_ = 1;
int statusMaxRecordChars_;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Tue Jul 11 01:58:05 2006
@@ -51,6 +51,18 @@
//LOG.setLevel(Level.FINE);
}
+ /** This implementation always returns true. */
+ public boolean[] areValidInputDirectories(FileSystem fileSys,
+ Path[] inputDirs
+ ) throws IOException {
+ boolean[] b = new boolean[inputDirs.length];
+ for(int i=0; i < inputDirs.length; ++i) {
+ b[i] = true;
+ }
+ return b;
+ }
+
+
protected Path[] listPaths(FileSystem fs, JobConf job)
throws IOException
{
@@ -129,9 +141,8 @@
Constructor ctor;
try {
- // reader = new StreamLineRecordReader(in, start, end, splitName, reporter, job);
ctor = readerClass.getConstructor(new Class[]{
- FSDataInputStream.class, long.class, long.class, String.class, Reporter.class, JobConf.class});
+ FSDataInputStream.class, FileSplit.class, Reporter.class, JobConf.class, FileSystem.class});
} catch(NoSuchMethodException nsm) {
throw new RuntimeException(nsm);
}
@@ -140,12 +151,21 @@
StreamBaseRecordReader reader;
try {
reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
- in, new Long(start), new Long(end), splitName, reporter, job});
+ in, split, reporter, job, fs});
} catch(Exception nsm) {
throw new RuntimeException(nsm);
}
reader.init();
+
+
+ if(reader instanceof StreamSequenceRecordReader) {
+ // override k/v class types with types stored in SequenceFile
+ StreamSequenceRecordReader ss = (StreamSequenceRecordReader)reader;
+ job.setInputKeyClass(ss.rin_.getKeyClass());
+ job.setInputValueClass(ss.rin_.getValueClass());
+ }
+
return reader;
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Jul 11 01:58:05 2006
@@ -42,6 +42,8 @@
{
protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
+ final static String REDUCE_NONE = "NONE";
+
public StreamJob(String[] argv, boolean mayExit)
{
argv_ = argv;
@@ -248,7 +250,7 @@
System.out.println(" -output <path> DFS output directory for the Reduce step");
System.out.println(" -mapper <cmd> The streaming command to run");
System.out.println(" -combiner <cmd> Not implemented. But you can pipe the mapper output");
- System.out.println(" -reducer <cmd> The streaming command to run");
+ System.out.println(" -reducer <cmd> The streaming command to run.");
System.out.println(" -file <file> File/dir to be shipped in the Job jar file");
System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml");
System.out.println(" -config <file> Optional. One or more paths to xml config files");
@@ -278,6 +280,10 @@
System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml");
System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml");
System.out.println();
+ System.out.println("To skip the shuffle/sort/reduce step:" );
+ System.out.println(" Use -reducer " + REDUCE_NONE);
+ System.out.println(" This preserves the map input order and speeds up processing");
+ System.out.println();
System.out.println("To set the number of reduce tasks (num. of output files):");
System.out.println(" -jobconf mapred.reduce.tasks=10");
System.out.println("To change the local temp directory:");
@@ -405,8 +411,10 @@
}
jobConf_.setInputFormat(StreamInputFormat.class);
+ // for SequenceFile, input classes may be overriden in getRecordReader
jobConf_.setInputKeyClass(UTF8.class);
jobConf_.setInputValueClass(UTF8.class);
+
jobConf_.setOutputKeyClass(UTF8.class);
jobConf_.setOutputValueClass(UTF8.class);
//jobConf_.setCombinerClass();
@@ -465,10 +473,10 @@
while(it.hasNext()) {
String prop = (String)it.next();
String[] nv = prop.split("=", 2);
- msg("JobConf: set(" + nv[0] + ", " + nv[1]+")");
+ msg("xxxJobConf: set(" + nv[0] + ", " + nv[1]+")");
jobConf_.set(nv[0], nv[1]);
}
-
+ msg("submitting to jobconf: " + getJobTrackerHostPort());
}
protected String getJobTrackerHostPort()
@@ -532,7 +540,7 @@
LOG.info("Job complete: " + jobId_);
LOG.info("Output: " + output_);
error = false;
- } finally {
+ } finally {
if (error && (running_ != null)) {
LOG.info("killJob...");
running_.killJob();
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Tue Jul 11 01:58:05 2006
@@ -37,11 +37,10 @@
{
public StreamLineRecordReader(
- FSDataInputStream in, long start, long end,
- String splitName, Reporter reporter, JobConf job)
+ FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
throws IOException
{
- super(in, start, end, splitName, reporter, job);
+ super(in, split, reporter, job, fs);
}
public void seekNextRecordBoundary() throws IOException
@@ -59,7 +58,7 @@
}
}
- System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
+ //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped);
}
public synchronized boolean next(Writable key, Writable value)
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Tue Jul 11 01:58:05 2006
@@ -27,6 +27,8 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
/** Similar to org.apache.hadoop.mapred.TextOutputFormat,
* but delimits key and value with a TAB.
* @author Michel Tourn
@@ -34,8 +36,8 @@
public class StreamOutputFormat implements OutputFormat {
public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
- String name) throws IOException {
-
+ String name, Progressable progr) throws IOException {
+
File file = new File(job.getOutputDir(), name);
final FSDataOutputStream out = fs.create(file);
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?rev=420768&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Tue Jul 11 01:58:05 2006
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+
+
+public class StreamSequenceRecordReader extends StreamBaseRecordReader
+{
+
+ public StreamSequenceRecordReader (
+ FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
+ throws IOException
+ {
+ super(in, split, reporter, job, fs);
+ numFailed_ = 0;
+ // super.in_ ignored, using rin_ instead
+ }
+
+
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException
+ {
+ boolean success;
+ do {
+ if (!more_) return false;
+ success = false;
+ try {
+ long pos = rin_.getPosition();
+ boolean eof = rin_.next(key, value);
+ if (pos >= end_ && rin_.syncSeen()) {
+ more_ = false;
+ } else {
+ more_ = eof;
+ }
+ success = true;
+ } catch(IOException io) {
+ numFailed_++;
+ if(numFailed_ < 100 || numFailed_ % 100 == 0) {
+ err_.println("StreamSequenceRecordReader: numFailed_/numRec_="
+ + numFailed_+ "/" + numRec_);
+ }
+ io.printStackTrace(err_);
+ success = false;
+ }
+ } while(!success);
+ numRecStats("");
+ return more_;
+ }
+
+
+ public void seekNextRecordBoundary() throws IOException
+ {
+ rin_ = new SequenceFile.Reader(fs_, split_.getPath(), job_);
+ end_ = split_.getStart() + split_.getLength();
+
+ if (split_.getStart() > rin_.getPosition())
+ rin_.sync(split_.getStart()); // sync to start
+
+ more_ = rin_.getPosition() < end_;
+
+ reporter_.setStatus(split_.toString());
+
+ //return new SequenceFileRecordReader(job_, split_);
+ }
+
+ boolean more_;
+ SequenceFile.Reader rin_;
+ int numFailed_;
+ PrintStream err_ = System.err;
+
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?rev=420768&r1=420767&r2=420768&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Tue Jul 11 01:58:05 2006
@@ -47,11 +47,10 @@
public class StreamXmlRecordReader extends StreamBaseRecordReader
{
public StreamXmlRecordReader(
- FSDataInputStream in, long start, long end,
- String splitName, Reporter reporter, JobConf job)
+ FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
throws IOException
{
- super(in, start, end, splitName, reporter, job);
+ super(in, split, reporter, job, fs);
beginMark_ = checkJobGet(CONF_NS + "begin");
endMark_ = checkJobGet(CONF_NS + "end");