You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/05 23:55:21 UTC
svn commit: r440503 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Author: cutting
Date: Tue Sep 5 14:55:20 2006
New Revision: 440503
URL: http://svn.apache.org/viewvc?view=rev&rev=440503
Log:
HADOOP-499. Reduce the use of Strings in contrib/streaming, replacing them with Text for better performance. Contributed by Hairong.
Added:
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Sep 5 14:55:20 2006
@@ -110,6 +110,10 @@
27. HADOOP-501. Fix Configuration.toString() to handle URL resources.
(Thomas Friol via cutting)
+28. HADOOP-499. Reduce the use of Strings in contrib/streaming,
+ replacing them with Text for better performance.
+ (Hairong Kuang via cutting)
+
Release 0.5.0 - 2006-08-04
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Sep 5 14:55:20 2006
@@ -17,7 +17,7 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.nio.channels.*;
+import java.nio.charset.CharacterCodingException;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
@@ -30,17 +30,12 @@
import org.apache.commons.logging.*;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.fs.Path;
@@ -170,6 +165,9 @@
if(debug_) {
System.out.println("PipeMapRed: stream.debug=true");
}
+
+ joinDelay_ = job.getLong("stream.joindelay.milli", 0);
+
job_ = job;
// Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
@@ -228,7 +226,6 @@
clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
- doneLock_ = new Object();
startTime_ = System.currentTimeMillis();
} catch(Exception e) {
@@ -265,7 +262,7 @@
if(log_ != null) {
log_.println(s);
} else {
- System.err.println(s); // or LOG.info()
+ LOG.info(s); // or LOG.info()
}
}
@@ -341,28 +338,49 @@
void startOutputThreads(OutputCollector output, Reporter reporter)
{
- outputDone_ = false;
- errorDone_ = false;
outThread_ = new MROutputThread(output, reporter);
outThread_.start();
errThread_ = new MRErrorThread(reporter);
errThread_.start();
}
+
+ void waitOutputThreads() {
+ try {
+ sim.waitFor();
+ if(outThread_ != null) {
+ outThread_.join(joinDelay_);
+ }
+ if(errThread_ != null) {
+ errThread_.join(joinDelay_);
+ }
+ } catch(InterruptedException e) {
+ //ignore
+ }
+ }
- void splitKeyVal(String line, UTF8 key, UTF8 val)
+ /**
+ * Split a line into key and value. Assume the delimitor is a tab.
+ * @param line: a byte array of line containing UTF-8 bytes
+ * @param key: key of a record
+ * @param val: value of a record
+ * @throws IOException
+ */
+ void splitKeyVal(byte [] line, Text key, Text val) throws IOException
{
- int pos;
- if(keyCols_ == ALL_COLS) {
- pos = -1;
- } else {
- pos = line.indexOf('\t');
+ int pos=-1;
+ if(keyCols_ != ALL_COLS) {
+ pos = UTF8ByteArrayUtils.findTab(line);
}
- if(pos == -1) {
- key.set(line);
- val.set("");
- } else {
- key.set(line.substring(0, pos));
- val.set(line.substring(pos+1));
+ try {
+ if(pos == -1) {
+ key.set(line);
+ val.set("");
+ } else {
+ UTF8ByteArrayUtils.splitKeyVal(line, key, val, pos);
+ }
+ } catch (CharacterCodingException e) {
+ LOG.warn(e);
+ StringUtils.stringifyException(e);
}
}
@@ -375,41 +393,33 @@
this.reporter = reporter;
}
public void run() {
- try {
- try {
- UTF8 EMPTY = new UTF8("");
- UTF8 key = new UTF8();
- UTF8 val = new UTF8();
- // 3/4 Tool to Hadoop
- while((answer = clientIn_.readLine()) != null) {
+ try {
+ Text key = new Text();
+ Text val = new Text();
+ // 3/4 Tool to Hadoop
+ while((answer=UTF8ByteArrayUtils.readLine(clientIn_))!= null) {
// 4/4 Hadoop out
if(optSideEffect_) {
- sideEffectOut_.write(answer.getBytes());
- sideEffectOut_.write('\n');
+ sideEffectOut_.write(answer);
+ sideEffectOut_.write('\n');
} else {
- splitKeyVal(answer, key, val);
- output.collect(key, val);
- numRecWritten_++;
- if(numRecWritten_ % 100 == 0) {
+ splitKeyVal(answer, key, val);
+ output.collect(key, val);
+ }
+ numRecWritten_++;
+ if(numRecWritten_ % 100 == 0) {
logprintln(numRecRead_+"/"+numRecWritten_);
logflush();
- }
}
- }
- } catch(IOException io) {
- io.printStackTrace(log_);
}
- logprintln("MROutputThread done");
- } finally {
- outputDone_ = true;
- synchronized(doneLock_) {
- doneLock_.notifyAll();
- }
- }
+ } catch(IOException io) {
+ io.printStackTrace(log_);
+ }
+ logprintln("MROutputThread done");
}
OutputCollector output;
Reporter reporter;
- String answer;
+ byte [] answer;
}
class MRErrorThread extends Thread
@@ -421,26 +431,21 @@
}
public void run()
{
- String line;
+ byte [] line;
try {
long num = 0;
- int bucket = 100;
- while((line=clientErr_.readLine()) != null) {
+ while((line=UTF8ByteArrayUtils.readLine(clientErr_)) != null) {
num++;
- logprintln(line);
+ String lineStr = new String(line, "UTF-8");
+ logprintln(lineStr);
if(num < 10) {
- String hline = "MRErr: " + line;
+ String hline = "MRErr: " + lineStr;
System.err.println(hline);
reporter.setStatus(hline);
}
}
} catch(IOException io) {
io.printStackTrace(log_);
- } finally {
- errorDone_ = true;
- synchronized(doneLock_) {
- doneLock_.notifyAll();
- }
}
}
Reporter reporter;
@@ -448,42 +453,31 @@
public void mapRedFinished()
{
- logprintln("mapRedFinished");
- try {
- if(!doPipe_) return;
- try {
- if(optSideEffect_) {
- logprintln("closing " + sideEffectPath_);
- sideEffectOut_.close();
- logprintln("closed " + sideEffectPath_);
- }
- } catch(IOException io) {
- io.printStackTrace();
- }
- try {
- if(clientOut_ != null) {
- clientOut_.close();
- }
- } catch(IOException io) {
- }
- if(outThread_ == null) {
- // no input records: threads were never spawned
- } else {
+ logprintln("mapRedFinished");
+ if(!doPipe_) return;
+
try {
- while(!outputDone_ || !errorDone_) {
- synchronized(doneLock_) {
- doneLock_.wait();
+ try {
+ if(clientOut_ != null) {
+ clientOut_.close();
+ }
+ } catch(IOException io) {
}
- }
- } catch(InterruptedException ie) {
- ie.printStackTrace();
+ waitOutputThreads();
+ try {
+ if(optSideEffect_) {
+ logprintln("closing " + sideEffectPath_);
+ sideEffectOut_.close();
+ logprintln("closed " + sideEffectPath_);
+ }
+ } catch(IOException io) {
+ io.printStackTrace();
+ }
+ sim.destroy();
+ } catch(RuntimeException e) {
+ e.printStackTrace(log_);
+ throw e;
}
- }
- sim.destroy();
- } catch(RuntimeException e) {
- e.printStackTrace(log_);
- throw e;
- }
}
void maybeLogRecord()
@@ -543,7 +537,30 @@
return msg;
}
-
+ /**
+ * Write a writable value to the output stream using UTF-8 encoding
+ * @param value output value
+ * @throws IOException
+ */
+ void write(Writable value) throws IOException {
+ byte[] bval;
+ int valSize;
+ if(value instanceof BytesWritable) {
+ BytesWritable val = (BytesWritable)value;
+ bval = val.get();
+ valSize = val.getSize();
+ } else if(value instanceof Text){
+ Text val = (Text)value;
+ bval = val.getBytes();
+ valSize = val.getLength();
+ } else {
+ String sval = value.toString();
+ bval = sval.getBytes("UTF-8");
+ valSize = bval.length;
+ }
+ clientOut_.write(bval, 0, valSize);
+ }
+
long startTime_;
long numRecRead_ = 0;
long numRecWritten_ = 0;
@@ -555,6 +572,7 @@
int keyCols_;
final static int ALL_COLS = Integer.MAX_VALUE;
+ long joinDelay_;
JobConf job_;
// generic MapRed parameters passed on by hadoopStreaming
@@ -565,16 +583,13 @@
boolean debug_;
Process sim;
- Object doneLock_;
MROutputThread outThread_;
+ String jobLog_;
MRErrorThread errThread_;
- boolean outputDone_;
- boolean errorDone_;
DataOutputStream clientOut_;
DataInputStream clientErr_;
DataInputStream clientIn_;
- String jobLog_;
// set in PipeMapper/PipeReducer subclasses
String mapredKey_;
int numExceptions_;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Tue Sep 5 14:55:20 2006
@@ -17,17 +17,12 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.util.Iterator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
@@ -71,34 +66,20 @@
}
try {
// 1/4 Hadoop in
- if(key instanceof BytesWritable) {
- BytesWritable bKey = (BytesWritable)key;
- mapredKey_ = new String(bKey.get(), 0, bKey.getSize(), "UTF-8");
- } else {
- mapredKey_ = key.toString();
- }
numRecRead_++;
-
maybeLogRecord();
// 2/4 Hadoop to Tool
if(numExceptions_==0) {
- String sval;
- if(value instanceof BytesWritable) {
- BytesWritable bVal = (BytesWritable)value;
- sval = new String(bVal.get(), 0, bVal.getSize(), "UTF-8");
- } else {
- sval = value.toString();
- }
- if(optUseKey_) {
- clientOut_.writeBytes(mapredKey_);
- clientOut_.writeBytes("\t");
- }
- clientOut_.writeBytes(sval);
- clientOut_.writeBytes("\n");
- clientOut_.flush();
+ if(optUseKey_) {
+ write(key);
+ clientOut_.write('\t');
+ }
+ write(value);
+ clientOut_.write('\n');
+ clientOut_.flush();
} else {
- numRecSkipped_++;
+ numRecSkipped_++;
}
} catch(IOException io) {
numExceptions_++;
@@ -106,6 +87,7 @@
// terminate with failure
String msg = logFailure(io);
appendLogToJobLog("failure");
+ mapRedFinished();
throw new IOException(msg);
} else {
// terminate with success:
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Tue Sep 5 14:55:20 2006
@@ -20,13 +20,10 @@
import java.util.Iterator;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
@@ -61,10 +58,10 @@
numRecRead_++;
maybeLogRecord();
if(doPipe_) {
- clientOut_.writeBytes(key.toString());
- clientOut_.writeBytes("\t");
- clientOut_.writeBytes(val.toString());
- clientOut_.writeBytes("\n");
+ write(key);
+ clientOut_.write('\t');
+ write(val);
+ clientOut_.write('\n');
clientOut_.flush();
} else {
// "identity reduce"
@@ -73,6 +70,7 @@
}
} catch(IOException io) {
appendLogToJobLog("failure");
+ mapRedFinished();
throw new IOException(getContext() + io.getMessage());
}
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Tue Sep 5 14:55:20 2006
@@ -18,7 +18,7 @@
import java.io.*;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.fs.Path;
@@ -95,11 +95,11 @@
}
public WritableComparable createKey() {
- return new UTF8();
+ return new Text();
}
public Writable createValue() {
- return new UTF8();
+ return new Text();
}
/// StreamBaseRecordReader API
@@ -123,12 +123,14 @@
public abstract void seekNextRecordBoundary() throws IOException;
- void numRecStats(CharSequence record) throws IOException
+ void numRecStats(byte[] record, int start, int len) throws IOException
{
numRec_++;
if(numRec_ == nextStatusRec_) {
+ String recordStr = new String(record, start,
+ Math.min(len, statusMaxRecordChars_), "UTF-8");
nextStatusRec_ +=100;//*= 10;
- String status = getStatus(record);
+ String status = getStatus(recordStr);
LOG.info(status);
reporter_.setStatus(status);
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Sep 5 14:55:20 2006
@@ -28,7 +28,7 @@
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
@@ -417,19 +417,19 @@
// general MapRed job properties
jobConf_ = new JobConf(config_);
for(int i=0; i<inputGlobs_.size(); i++) {
- jobConf_.addInputDir(new File((String)inputGlobs_.get(i)));
+ jobConf_.addInputPath(new Path((String)inputGlobs_.get(i)));
}
jobConf_.setInputFormat(StreamInputFormat.class);
// for SequenceFile, input classes may be overriden in getRecordReader
- jobConf_.setInputKeyClass(UTF8.class);
- jobConf_.setInputValueClass(UTF8.class);
+ jobConf_.setInputKeyClass(Text.class);
+ jobConf_.setInputValueClass(Text.class);
- jobConf_.setOutputKeyClass(UTF8.class);
- jobConf_.setOutputValueClass(UTF8.class);
+ jobConf_.setOutputKeyClass(Text.class);
+ jobConf_.setOutputValueClass(Text.class);
//jobConf_.setCombinerClass();
- jobConf_.setOutputDir(new File(output_));
+ jobConf_.setOutputPath(new Path(output_));
jobConf_.setOutputFormat(StreamOutputFormat.class);
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Tue Sep 5 14:55:20 2006
@@ -17,17 +17,18 @@
package org.apache.hadoop.streaming;
import java.io.*;
+import java.nio.charset.MalformedInputException;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
/**
* Similar to org.apache.hadoop.mapred.TextRecordReader,
@@ -75,68 +76,49 @@
}
public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- if(gzipped_) {
- // figure EOS from readLine
- } else {
- long pos = in_.getPos();
- if (pos >= end_)
- return false;
- }
-
- //((LongWritable)key).set(pos); // key is position
- //((UTF8)value).set(readLine(in)); // value is line
- String line = readLine(din_);
- if(line == null) {
- return false; // for gzipped_
- }
-
- // key is line up to TAB, value is rest
- final boolean NOVAL = false;
- if(NOVAL) {
- ((UTF8)key).set(line);
- ((UTF8)value).set("");
- } else {
- int tab = line.indexOf('\t');
- if(tab == -1) {
- ((UTF8)key).set(line);
- ((UTF8)value).set("");
- } else {
- ((UTF8)key).set(line.substring(0, tab));
- ((UTF8)value).set(line.substring(tab+1));
- }
+ throws IOException {
+ if(!(key instanceof Text)) {
+ throw new IllegalArgumentException(
+ "Key should be of type Text but: "+key.getClass().getName());
+ }
+ if(!(value instanceof Text)) {
+ throw new IllegalArgumentException(
+ "Value should be of type Text but: "+value.getClass().getName());
}
- numRecStats(line);
- return true;
- }
-
- // from TextInputFormat
- private static String readLine(InputStream in) throws IOException {
- StringBuffer buffer = new StringBuffer();
- boolean over = true;
- while (true) {
-
- int b = in.read();
- if (b == -1)
- break;
-
- over = false;
- char c = (char)b; // bug: this assumes eight-bit characters.
- if (c == '\r' || c == '\n') // TODO || c == '\t' here
- break;
-
- buffer.append(c);
- }
+ Text tKey = (Text)key;
+ Text tValue = (Text)value;
+ byte [] line;
- if(over) {
- return null;
- } else {
- return buffer.toString();
+ while (true) {
+ if(gzipped_) {
+ // figure EOS from readLine
+ } else {
+ long pos = in_.getPos();
+ if (pos >= end_)
+ return false;
+ }
+
+ line = UTF8ByteArrayUtils.readLine(in_);
+ if(line==null)
+ return false;
+ try {
+ int tab=UTF8ByteArrayUtils.findTab(line);
+ if(tab == -1) {
+ tKey.set(line);
+ tValue.set("");
+ } else {
+ UTF8ByteArrayUtils.splitKeyVal(line, tKey, tValue, tab);
+ }
+ break;
+ } catch (MalformedInputException e) {
+ LOG.warn(e);
+ StringUtils.stringifyException(e);
+ }
}
-
+ numRecStats( line, 0, line.length );
+ return true;
}
-
boolean gzipped_;
GZIPInputStream zin_;
DataInputStream din_; // GZIP or plain
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Tue Sep 5 14:55:20 2006
@@ -68,7 +68,8 @@
success = false;
}
} while(!success);
- numRecStats("");
+
+ numRecStats(new byte[0], 0, 0);
return more_;
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Tue Sep 5 14:55:20 2006
@@ -19,14 +19,12 @@
import java.io.*;
import java.util.regex.*;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -76,18 +74,22 @@
return false;
}
- StringBuffer buf = new StringBuffer();
+ DataOutputBuffer buf = new DataOutputBuffer();
if(!readUntilMatchBegin()) {
return false;
}
if(!readUntilMatchEnd(buf)) {
return false;
}
- numRecStats(buf);
// There is only one elem..key/value splitting is not done here.
- ((UTF8)key).set(buf.toString());
- ((UTF8)value).set("");
+ byte [] record = new byte[buf.getLength()];
+ System.arraycopy(buf.getData(), 0, record, 0, record.length);
+
+ numRecStats(record, 0, record.length);
+
+ ((Text)key).set(record);
+ ((Text)value).set("");
/*if(numNext < 5) {
System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
@@ -111,7 +113,7 @@
}
}
- boolean readUntilMatchEnd(StringBuffer buf) throws IOException
+ private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException
{
if(slowMatch_) {
return slowReadUntilMatch(endPat_, true, buf);
@@ -121,7 +123,8 @@
}
- boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, StringBuffer outBufOrNull)
+ private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
+ DataOutputBuffer outBufOrNull)
throws IOException
{
try {
@@ -131,7 +134,10 @@
boolean success = true;
in_.mark(lookAhead_ + 2);
read = in_.read(buf);
- String sbuf = new String(buf);
+ if( read == -1 )
+ return false;
+
+ String sbuf = new String(buf, 0, read, "UTF-8");
Matcher match = markPattern.matcher(sbuf);
firstMatchStart_ = NA;
@@ -176,16 +182,11 @@
if(matched) {
int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
//System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_=" + firstMatchEnd_);
- String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
+ //String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
//System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern);
if(outBufOrNull != null) {
- buf = new byte[endPos];
in_.reset();
- read = in_.read(buf);
- if(read != endPos) {
- //System.out.println("@@@ BAD re-read less: " + read + " < " + endPos);
- }
- outBufOrNull.append(new String(buf));
+ outBufOrNull.write(in_,endPos);
} else {
//System.out.println("Skip to " + (inStart + endPos));
in_.seek(inStart + endPos);
@@ -255,10 +256,12 @@
- boolean fastReadUntilMatch(String textPat, boolean includePat, StringBuffer outBufOrNull) throws IOException
+ boolean fastReadUntilMatch(String textPat,
+ boolean includePat,
+ DataOutputBuffer outBufOrNull) throws IOException
{
//System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos());
- char[] cpat = textPat.toCharArray();
+ byte[] cpat = textPat.getBytes("UTF-8");
int m = 0;
boolean match = false;
long markPos = -1;
@@ -273,10 +276,7 @@
if (b == -1)
break;
- char c = (char)b; // this assumes eight-bit matching. OK with UTF-8
- if(outBufOrNull != null) {
- outBufOrNull.append(c);
- }
+ byte c = (byte)b; // this assumes eight-bit matching. OK with UTF-8
if (c == cpat[m]) {
m++;
if(m==msup) {
@@ -284,16 +284,20 @@
break;
}
} else {
+ if(outBufOrNull != null) {
+ outBufOrNull.write(cpat, 0, m);
+ outBufOrNull.write(c);
+ }
+
m = 0;
}
}
if(!includePat && match) {
- if(outBufOrNull != null) {
- outBufOrNull.setLength(outBufOrNull.length() - textPat.length());
- }
long pos = in_.getPos() - textPat.length();
in_.reset();
in_.seek(pos);
+ } else if(outBufOrNull != null){
+ outBufOrNull.write(cpat);
}
//System.out.println("@@@DONE readUntilMatch inPos=" + in_.getPos() + " includePat=" + includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|");
return match;
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=auto&rev=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Tue Sep 5 14:55:20 2006
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * General utils for byte array containing UTF-8 encoded strings
+ * @author hairong
+ */
+
+public class UTF8ByteArrayUtils {
+ /**
+ * Find the first occured tab in a UTF-8 encoded string
+ * @param utf: a byte array containing a UTF-8 encoded string
+ * @return position that first tab occures otherwise -1
+ */
+ public static int findTab(byte [] utf) {
+ for(int i=0; i<utf.length; i++) {
+ if(utf[i]==(byte)'\t') {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param ut: utf-8 encoded string
+ * @param key: contains key upon the method is returned
+ * @param val: contains value upon the method is returned
+ * @param splitPos: the split pos
+ * @throws IOException: when
+ */
+ public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos)
+ throws IOException {
+ if(splitPos<0 || splitPos >= utf.length)
+ throw new IllegalArgumentException(
+ "splitPos must be in the range [0, "+splitPos+"]: " +splitPos);
+ byte [] keyBytes = new byte[splitPos];
+ System.arraycopy(utf, 0, keyBytes, 0, splitPos);
+ int valLen = utf.length-splitPos-1;
+ byte [] valBytes = new byte[valLen];
+ System.arraycopy(utf,splitPos+1, valBytes, 0, valLen );
+ key.set(keyBytes);
+ val.set(valBytes);
+ }
+
+ /**
+ * Read a utf8 encoded line from a data input stream.
+ * @param in data input stream
+ * @return a byte array containing the line
+ * @throws IOException
+ */
+ public static byte[] readLine(DataInputStream in) throws IOException {
+ byte [] buf = new byte[128];
+ byte [] lineBuffer = buf;
+ int room = 128;
+ int offset = 0;
+ boolean isEOF = false;
+ while (true) {
+ int b = in.read();
+ if (b == -1) {
+ isEOF = true;
+ break;
+ }
+
+ char c = (char)b;
+ if (c == '\r' || c == '\n')
+ break;
+
+ if (--room < 0) {
+ buf = new byte[offset + 128];
+ room = buf.length - offset - 1;
+ System.arraycopy(lineBuffer, 0, buf, 0, offset);
+ lineBuffer = buf;
+ }
+ buf[offset++] = (byte) c;
+ }
+
+ if(isEOF && offset==0) {
+ return null;
+ } else {
+ lineBuffer = new byte[offset];
+ System.arraycopy(buf, 0, lineBuffer, 0, offset);
+ return lineBuffer;
+ }
+ }
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=440503&r1=440502&r2=440503
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Tue Sep 5 14:55:20 2006
@@ -17,7 +17,6 @@
package org.apache.hadoop.streaming;
import java.io.*;
-import java.util.*;
import org.apache.hadoop.streaming.Environment;
/** A minimal Java implementation of /usr/bin/tr.
@@ -51,12 +50,12 @@
// property names have been escaped in PipeMapRed.safeEnvVarName()
expect("mapred_input_format_class", "org.apache.hadoop.streaming.StreamInputFormat");
expect("mapred_job_tracker", "local");
- expect("mapred_input_key_class", "org.apache.hadoop.io.UTF8");
- expect("mapred_input_value_class", "org.apache.hadoop.io.UTF8");
+ expect("mapred_input_key_class", "org.apache.hadoop.io.Text");
+ expect("mapred_input_value_class", "org.apache.hadoop.io.Text");
expect("mapred_local_dir", "build/test/mapred/local");
expect("mapred_output_format_class", "org.apache.hadoop.streaming.StreamOutputFormat");
- expect("mapred_output_key_class", "org.apache.hadoop.io.UTF8");
- expect("mapred_output_value_class", "org.apache.hadoop.io.UTF8");
+ expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
+ expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
expect("mapred_task_is_map", "true");
expect("mapred_reduce_tasks", "1");