You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2007/04/16 21:35:18 UTC
svn commit: r529378 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: tomwhite
Date: Mon Apr 16 12:35:15 2007
New Revision: 529378
URL: http://svn.apache.org/viewvc?view=rev&rev=529378
Log:
HADOOP-1214. Replace streaming classes with new counterparts from Hadoop core. Contributed by Runping Qi.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Apr 16 12:35:15 2007
@@ -201,6 +201,9 @@
60. HADOOP-1256. Fix NameNode so that multiple DataNodeDescriptors
can no longer be created on startup. (Hairong Kuang via cutting)
+61. HADOOP-1214. Replace streaming classes with new counterparts
+ from Hadoop core. (Runping Qi via tomwhite)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Apr 16 12:35:15 2007
@@ -20,85 +20,39 @@
import java.io.*;
import java.lang.reflect.*;
-import java.util.ArrayList;
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
-/** An input format that performs globbing on DFS paths and
- * selects a RecordReader based on a JobConf property.
- * @author Michel Tourn
+/** An input format that selects a RecordReader based on a JobConf property.
+ * This should be used only for non-standard record reader such as
+ * StreamXmlRecordReader. For all other standard
+ * record readers, the appropriate input format classes should be used.
*/
-public class StreamInputFormat extends TextInputFormat {
-
- // an InputFormat should be public with the synthetic public default constructor
- // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
-
- protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
-
- static boolean isGzippedInput(JobConf job) {
- String val = job.get(StreamBaseRecordReader.CONF_NS + "compression");
- return "gzip".equals(val);
- }
+public class StreamInputFormat extends KeyValueTextInputFormat {
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-
- if (isGzippedInput(job)) {
- return getFullFileSplits(job);
- } else {
- return super.getSplits(job, numSplits);
- }
- }
- /** For the compressed-files case: override InputFormatBase to produce one split. */
- FileSplit[] getFullFileSplits(JobConf job) throws IOException {
- Path[] files = listPaths(job);
- int numSplits = files.length;
- ArrayList splits = new ArrayList(numSplits);
- for (int i = 0; i < files.length; i++) {
- Path file = files[i];
- long splitSize = file.getFileSystem(job).getLength(file);
- splits.add(new FileSplit(file, 0, splitSize, job));
+ public RecordReader getRecordReader(final InputSplit genericSplit,
+ JobConf job, Reporter reporter) throws IOException {
+ String c = job.get("stream.recordreader.class");
+ if (c == null || c.indexOf("LineRecordReader") >= 0) {
+ return super.getRecordReader(genericSplit, job, reporter);
}
- return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
- }
- public RecordReader getRecordReader(final InputSplit genericSplit,
- JobConf job,
- Reporter reporter) throws IOException {
+ // handling non-standard record reader (likely StreamXmlRecordReader)
FileSplit split = (FileSplit) genericSplit;
LOG.info("getRecordReader start.....split=" + split);
reporter.setStatus(split.toString());
- long start = split.getStart();
- long length = split.getLength();
-
// Open the file and seek to the start of the split
FileSystem fs = split.getPath().getFileSystem(job);
FSDataInputStream in = fs.open(split.getPath());
- if (isGzippedInput(job)) {
- length = Long.MAX_VALUE;
- } else if (start != 0) {
- in.seek(start-1);
- LineRecordReader.readLine(in, null);
- long oldStart = start;
- start = in.getPos();
- length -= (start - oldStart);
- }
- // Ugly hack!
- split = new FileSplit(split.getPath(), start, length, job);
// Factory dispatch based on available params..
Class readerClass;
- String c = job.get("stream.recordreader.class");
- if (c == null) {
- readerClass = StreamLineRecordReader.class;
- } else {
+
+ {
readerClass = StreamUtil.goodClassOrNull(c, null);
if (readerClass == null) {
throw new RuntimeException("Class not found: " + c);
@@ -107,27 +61,19 @@
Constructor ctor;
try {
- ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class, FileSplit.class,
- Reporter.class, JobConf.class, FileSystem.class });
+ ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class,
+ FileSplit.class, Reporter.class, JobConf.class, FileSystem.class });
} catch (NoSuchMethodException nsm) {
throw new RuntimeException(nsm);
}
RecordReader reader;
try {
- reader = (RecordReader) ctor.newInstance(new Object[] { in, split, reporter, job,
- fs });
+ reader = (RecordReader) ctor.newInstance(new Object[] { in, split,
+ reporter, job, fs });
} catch (Exception nsm) {
throw new RuntimeException(nsm);
}
-
- if (reader instanceof StreamSequenceRecordReader) {
- // override k/v class types with types stored in SequenceFile
- StreamSequenceRecordReader ss = (StreamSequenceRecordReader) reader;
- job.setInputKeyClass(ss.rin_.getKeyClass());
- job.setInputValueClass(ss.rin_.getValueClass());
- }
-
return reader;
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Apr 16 12:35:15 2007
@@ -59,7 +59,11 @@
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
import org.apache.log4j.helpers.OptionConverter;
@@ -235,6 +239,10 @@
userJobConfProps_.put("fs.default.name", jt);
}
+ additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec");
+ inputFormatSpec_ = (String)cmdLine.getValue("-inputformat");
+ outputFormatSpec_ = (String)cmdLine.getValue("-outputformat");
+ partitionerSpec_ = (String)cmdLine.getValue("-partitioner");
inReaderSpec_ = (String)cmdLine.getValue("-inputreader");
List<String> car = cmdLine.getValues("-cacheArchive");
@@ -381,6 +389,14 @@
"Optional. Override DFS configuration", "<h:p>|local", 1, false);
Option jt = createOption("jt",
"Optional. Override JobTracker configuration", "<h:p>|local",1, false);
+ Option additionalconfspec = createOption("additionalconfspec",
+ "Optional.", "spec",1, false );
+ Option inputformat = createOption("inputformat",
+ "Optional.", "spec",1, false );
+ Option outputformat = createOption("outputformat",
+ "Optional.", "spec",1, false );
+ Option partitioner = createOption("partitioner",
+ "Optional.", "spec",1, false );
Option inputreader = createOption("inputreader",
"Optional.", "spec",1, false );
Option cacheFile = createOption("cacheFile",
@@ -405,6 +421,10 @@
withOption(file).
withOption(dfs).
withOption(jt).
+ withOption(additionalconfspec).
+ withOption(inputformat).
+ withOption(outputformat).
+ withOption(partitioner).
withOption(inputreader).
withOption(jobconf).
withOption(cmdenv).
@@ -438,6 +458,10 @@
//System.out.println(" -config <file> Optional. One or more paths to xml config files");
System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration");
System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration");
+ System.out.println(" -additionalconfspec specfile Optional.");
+ System.out.println(" -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat Optional.");
+ System.out.println(" -outputformat specfile Optional.");
+ System.out.println(" -partitioner specfile Optional.");
System.out.println(" -inputreader <spec> Optional.");
System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property");
System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands");
@@ -645,6 +669,10 @@
} else {
// use only defaults: hadoop-default.xml and hadoop-site.xml
}
+ System.out.println("additionalConfSpec_:" + additionalConfSpec_);
+ if (additionalConfSpec_ != null) {
+ config_.addDefaultResource(new Path(additionalConfSpec_));
+ }
Iterator it = configPath_.iterator();
while (it.hasNext()) {
String pathName = (String) it.next();
@@ -670,29 +698,53 @@
jobConf_.setBoolean("stream.inputtagged", inputTagged_);
jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
- Class fmt;
- if (testMerge_ && false == hasSimpleInputSpecs_) {
- // this ignores -inputreader
- fmt = MergerInputFormat.class;
- } else {
- // need to keep this case to support custom -inputreader
- // and their parameters ,n=v,n=v
- fmt = StreamInputFormat.class;
+ String defaultPackage = this.getClass().getPackage().getName();
+ Class c;
+ Class fmt = null;
+ if (inReaderSpec_ == null && inputFormatSpec_ == null) {
+ fmt = KeyValueTextInputFormat.class;
+ } else if (inputFormatSpec_ != null) {
+ if ((inputFormatSpec_.compareToIgnoreCase("KeyValueTextInputFormat") == 0)
+ || (inputFormatSpec_
+ .compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) {
+ fmt = KeyValueTextInputFormat.class;
+ } else if ((inputFormatSpec_
+ .compareToIgnoreCase("SequenceFileInputFormat") == 0)
+ || (inputFormatSpec_
+ .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
+ fmt = SequenceFileInputFormat.class;
+ } else if ((inputFormatSpec_
+ .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
+ || (inputFormatSpec_
+ .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
+ fmt = SequenceFileAsTextInputFormat.class;
+ } else {
+ c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
+ if (c != null) {
+ fmt = c;
+ } else {
+
+ }
+ }
+ }
+ if (fmt == null) {
+ if (testMerge_ && false == hasSimpleInputSpecs_) {
+ // this ignores -inputreader
+ fmt = MergerInputFormat.class;
+ } else {
+ // need to keep this case to support custom -inputreader
+ // and their parameters ,n=v,n=v
+ fmt = StreamInputFormat.class;
+ }
}
- jobConf_.setInputFormat(fmt);
- // for SequenceFile, input classes may be overriden in getRecordReader
- jobConf_.setInputKeyClass(Text.class);
- jobConf_.setInputValueClass(Text.class);
+ jobConf_.setInputFormat(fmt);
jobConf_.setOutputKeyClass(Text.class);
jobConf_.setOutputValueClass(Text.class);
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
- String defaultPackage = this.getClass().getPackage().getName();
-
- Class c;
if (mapCmd_ != null) {
c = StreamUtil.goodClassOrNull(mapCmd_, defaultPackage);
if (c != null) {
@@ -748,13 +800,29 @@
// output setup is done late so we can customize for reducerNone_
//jobConf_.setOutputDir(new File(output_));
setOutputSpec();
- if (testMerge_) {
- fmt = MuxOutputFormat.class;
- } else {
- fmt = StreamOutputFormat.class;
+ fmt = null;
+ if (outputFormatSpec_!= null) {
+ c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage);
+ if (c != null) {
+ fmt = c;
+ }
+ }
+ if (fmt == null) {
+ if (testMerge_) {
+ fmt = MuxOutputFormat.class;
+ } else {
+ fmt = TextOutputFormat.class;
+ }
}
jobConf_.setOutputFormat(fmt);
+ if (partitionerSpec_!= null) {
+ c = StreamUtil.goodClassOrNull(partitionerSpec_, defaultPackage);
+ if (c != null) {
+ jobConf_.setPartitionerClass(c);
+ }
+ }
+
// last, allow user to override anything
// (although typically used with properties we didn't touch)
@@ -1042,6 +1110,10 @@
protected ArrayList configPath_ = new ArrayList(); // <String>
protected String hadoopAliasConf_;
protected String inReaderSpec_;
+ protected String inputFormatSpec_;
+ protected String outputFormatSpec_;
+ protected String partitionerSpec_;
+ protected String additionalConfSpec_;
protected boolean testMerge_;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Mon Apr 16 12:35:15 2007
@@ -18,128 +18,21 @@
package org.apache.hadoop.streaming;
-import java.io.*;
+import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.mapred.LineRecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.KeyValueLineRecordReader;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
/**
- * Similar to org.apache.hadoop.mapred.TextRecordReader,
- * but delimits key and value with a TAB.
- * @author Michel Tourn
+ * same as org.apache.hadoop.mapred.KeyValueLineRecordReader
+ *
+ * @deprecated
*/
-public class StreamLineRecordReader extends LineRecordReader {
-
- private String splitName;
- private Reporter reporter;
- private FileSplit split;
- private int numRec = 0;
- private int nextStatusRec = 1;
- private int statusMaxRecordChars;
- protected static final Log LOG = LogFactory.getLog(StreamLineRecordReader.class);
- // base class uses LongWritable as key, use this.
- private WritableComparable dummyKey = super.createKey();
- private Text innerValue = (Text)super.createValue();
+public class StreamLineRecordReader extends KeyValueLineRecordReader {
- public StreamLineRecordReader(FSDataInputStream in, FileSplit split,
- Reporter reporter,
- JobConf job, FileSystem fs) throws IOException {
- super(createStream(in, job), split.getStart(),
- (split.getStart() + split.getLength()));
- this.split = split ;
- this.reporter = reporter ;
- }
-
- private static InputStream createStream(FSDataInputStream in, JobConf job)
- throws IOException{
- InputStream finalStream = in ;
- boolean gzipped = StreamInputFormat.isGzippedInput(job);
- if ( gzipped ) {
- GzipCodec codec = new GzipCodec();
- codec.setConf(job);
- finalStream = codec.createInputStream(in);
- }
- return finalStream;
- }
-
- public WritableComparable createKey() {
- return new Text();
- }
-
- public Writable createValue() {
- return new Text();
- }
-
- public synchronized boolean next(Writable key, Writable value) throws IOException {
- if (!(key instanceof Text)) {
- throw new IllegalArgumentException("Key should be of type Text but: "
- + key.getClass().getName());
- }
- if (!(value instanceof Text)) {
- throw new IllegalArgumentException("Value should be of type Text but: "
- + value.getClass().getName());
- }
-
- Text tKey = (Text) key;
- Text tValue = (Text) value;
- byte[] line = null ;
- int lineLen = -1;
- if( super.next(dummyKey, innerValue) ){
- line = innerValue.getBytes();
- lineLen = innerValue.getLength();
- }else{
- return false;
- }
- if (line == null) return false;
- int tab = UTF8ByteArrayUtils.findTab(line, 0, lineLen);
- if (tab == -1) {
- tKey.set(line, 0, lineLen);
- tValue.set("");
- } else {
- UTF8ByteArrayUtils.splitKeyVal(line, 0, lineLen, tKey, tValue, tab);
- }
- numRecStats(line, 0, lineLen);
- return true;
- }
-
- private void numRecStats(byte[] record, int start, int len) throws IOException {
- numRec++;
- if (numRec == nextStatusRec) {
- String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars), "UTF-8");
- nextStatusRec += 100;//*= 10;
- String status = getStatus(recordStr);
- LOG.info(status);
- reporter.setStatus(status);
- }
- }
-
- private String getStatus(CharSequence record) {
- long pos = -1;
- try {
- pos = getPos();
- } catch (IOException io) {
- }
- String recStr;
- if (record.length() > statusMaxRecordChars) {
- recStr = record.subSequence(0, statusMaxRecordChars) + "...";
- } else {
- recStr = record.toString();
- }
- String unqualSplit = split.getFile().getName() + ":" + split.getStart() + "+"
- + split.getLength();
- String status = "HSTR " + StreamUtil.HOST + " " + numRec + ". pos=" + pos + " " + unqualSplit
- + " Processing record=" + recStr;
- status += " " + splitName;
- return status;
+ public StreamLineRecordReader(Configuration job, FileSplit split)
+ throws IOException {
+ super(job, split);
}
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamOutputFormat.java Mon Apr 16 12:35:15 2007
@@ -18,56 +18,11 @@
package org.apache.hadoop.streaming;
-import java.io.IOException;
+import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.mapred.*;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
-import org.apache.hadoop.util.Progressable;
-
-/** Similar to org.apache.hadoop.mapred.TextOutputFormat,
- * but delimits key and value with a TAB.
- * @author Michel Tourn
+/** Same as org.apache.hadoop.mapred.TextOutputFormat,
+ * @deprecated
*/
-public class StreamOutputFormat implements OutputFormat {
-
- public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progr) throws IOException {
-
- Path file = new Path(job.getOutputPath(), name);
-
- final FSDataOutputStream out = fs.create(file);
-
- return new RecordWriter() {
-
- public synchronized void write(WritableComparable key, Writable value) throws IOException {
- out.write(key.toString().getBytes("UTF-8"));
- out.writeByte('\t');
- out.write(value.toString().getBytes("UTF-8"));
- out.writeByte('\n');
- }
-
- public synchronized void close(Reporter reporter) throws IOException {
- out.close();
- }
- };
- }
-
- /** Check whether the output specification for a job is appropriate. Called
- * when a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.
- *
- * @param job the job whose output will be written
- * @throws IOException when output should not be attempted
- */
- public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
- // allow existing data (for app-level restartability)
- }
+public class StreamOutputFormat extends TextOutputFormat {
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Mon Apr 16 12:35:15 2007
@@ -20,80 +20,19 @@
import java.io.*;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class StreamSequenceRecordReader extends StreamBaseRecordReader {
-
- public StreamSequenceRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
- JobConf job, FileSystem fs) throws IOException {
- super(in, split, reporter, job, fs);
- numFailed_ = 0;
- seekNextRecordBoundary();
- // super.in_ ignored, using rin_ instead
- }
-
- public synchronized boolean next(Writable key, Writable value) throws IOException {
- boolean success;
- do {
- if (!more_) return false;
- success = false;
- try {
- long pos = rin_.getPosition();
- boolean eof = rin_.next(key, value);
- if (pos >= end_ && rin_.syncSeen()) {
- more_ = false;
- } else {
- more_ = eof;
- }
- success = true;
- } catch (IOException io) {
- numFailed_++;
- if (numFailed_ < 100 || numFailed_ % 100 == 0) {
- err_.println("StreamSequenceRecordReader: numFailed_/numRec_=" + numFailed_ + "/"
- + numRec_);
- }
- io.printStackTrace(err_);
- success = false;
- }
- } while (!success);
-
- numRecStats(new byte[0], 0, 0);
- return more_;
+/**
+ * same as org.apache.hadoop.mapred.SequenceFileRecordReader
+ *
+ * @deprecated
+ */
+public class StreamSequenceRecordReader extends SequenceFileRecordReader {
+
+ public StreamSequenceRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+ super(conf, split);
}
-
- public void seekNextRecordBoundary() throws IOException {
- rin_ = new SequenceFile.Reader(fs_, split_.getPath(), job_);
- end_ = split_.getStart() + split_.getLength();
-
- if (split_.getStart() > rin_.getPosition()) rin_.sync(split_.getStart()); // sync to start
-
- more_ = rin_.getPosition() < end_;
-
- reporter_.setStatus(split_.toString());
-
- //return new SequenceFileRecordReader(job_, split_);
- }
-
- public WritableComparable createKey() {
- return (WritableComparable) ReflectionUtils.newInstance(rin_.getKeyClass(), null);
- }
-
- public Writable createValue() {
- return (Writable) ReflectionUtils.newInstance(rin_.getValueClass(), null);
- }
-
- boolean more_;
- SequenceFile.Reader rin_;
- int numFailed_;
- PrintStream err_ = System.err;
-
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java Mon Apr 16 12:35:15 2007
@@ -18,6 +18,7 @@
package org.apache.hadoop.streaming;
+import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
@@ -29,6 +30,7 @@
{
public TestGzipInput() throws IOException {
+ INPUT_FILE = new File("input.txt.gz");
}
protected void createInput() throws IOException
@@ -38,6 +40,7 @@
out.write(input.getBytes("UTF-8"));
out.close();
}
+
protected String[] genArgs() {
return new String[] {
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Apr 16 12:35:15 2007
@@ -77,6 +77,11 @@
public void testCommandLine()
{
try {
+ try {
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ } catch (Exception e) {
+ }
+
createInput();
boolean mayExit = false;
@@ -93,8 +98,10 @@
} catch(Exception e) {
failTrace(e);
} finally {
+ File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
INPUT_FILE.delete();
- OUTPUT_DIR.delete();
+ outFileCRC.delete();
+ OUTPUT_DIR.getAbsoluteFile().delete();
}
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Mon Apr 16 12:35:15 2007
@@ -79,7 +79,6 @@
};
fileSys.delete(new Path(OUTPUT_DIR));
- fileSys.mkdirs(new Path(OUTPUT_DIR));
DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
file.writeBytes(mapString);
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Mon Apr 16 12:35:15 2007
@@ -41,13 +41,11 @@
// test that some JobConf properties are exposed as expected
// Note the dots translated to underscore:
// property names have been escaped in PipeMapRed.safeEnvVarName()
- expect("mapred_input_format_class", "org.apache.hadoop.streaming.StreamInputFormat");
+ expect("mapred_input_format_class", "org.apache.hadoop.mapred.KeyValueTextInputFormat");
expect("mapred_job_tracker", "local");
- expect("mapred_input_key_class", "org.apache.hadoop.io.Text");
- expect("mapred_input_value_class", "org.apache.hadoop.io.Text");
//expect("mapred_local_dir", "build/test/mapred/local");
expectDefined("mapred_local_dir");
- expect("mapred_output_format_class", "org.apache.hadoop.streaming.StreamOutputFormat");
+ expect("mapred_output_format_class", "org.apache.hadoop.mapred.TextOutputFormat");
expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This class treats a line in the input as a key/value pair separated by a
+ * separator character. The separator can be specified in config file
+ * under the attribute name key.value.separator.in.input.line. The default
+ * separator is the tab character ('\t').
+ *
+ */
+public class KeyValueLineRecordReader extends LineRecordReader {
+
+ private byte separator = (byte) '\t';
+
+ private WritableComparable dummyKey = super.createKey();
+
+ private Text innerValue = (Text) super.createValue();
+
+ public Class getKeyClass() { return Text.class; }
+
+ public Text createKey() {
+ return new Text();
+ }
+
+ public KeyValueLineRecordReader(Configuration job, FileSplit split)
+ throws IOException {
+ super(job, split);
+ String sepStr = job.get("key.value.separator.in.input.line", "\t");
+ this.separator = (byte) sepStr.charAt(0);
+ }
+
+ public static int findSeparator(byte[] utf, int start, int length, byte sep) {
+ for (int i = start; i < (start + length); i++) {
+ if (utf[i] == sep) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /** Read key/value pair in a line. */
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ Text tKey = (Text) key;
+ Text tValue = (Text) value;
+ byte[] line = null;
+ int lineLen = -1;
+ if (super.next(dummyKey, innerValue)) {
+ line = innerValue.getBytes();
+ lineLen = innerValue.getLength();
+ } else {
+ return false;
+ }
+ if (line == null)
+ return false;
+ int pos = findSeparator(line, 0, lineLen, this.separator);
+ if (pos == -1) {
+ tKey.set(line, 0, lineLen);
+ tValue.set("");
+ } else {
+ int keyLen = pos;
+ byte[] keyBytes = new byte[keyLen];
+ System.arraycopy(line, 0, keyBytes, 0, keyLen);
+ int valLen = lineLen - keyLen - 1;
+ byte[] valBytes = new byte[valLen];
+ System.arraycopy(line, pos + 1, valBytes, 0, valLen);
+ tKey.set(keyBytes);
+ tValue.set(valBytes);
+ }
+ return true;
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * An {@link InputFormat} for plain text files. Files are broken into lines.
+ * Either linefeed or carriage-return are used to signal end of line. Each line
+ * is divided into key and value parts by a separator byte. If no such a byte
+ * exists, the key will be the entire line and value will be empty.
+ */
+public class KeyValueTextInputFormat extends TextInputFormat {
+
+ public RecordReader getRecordReader(InputSplit genericSplit, JobConf job,
+ Reporter reporter) throws IOException {
+ reporter.setStatus(genericSplit.toString());
+ return new KeyValueLineRecordReader(job, (FileSplit) genericSplit);
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This class is similar to SequenceFileInputFormat, except it generates SequenceFileAsTextRecordReader
+ * which converts the input keys and values to their String forms by calling toString() method.
+ *
+ */
+public class SequenceFileAsTextInputFormat extends SequenceFileInputFormat {
+
+ public SequenceFileAsTextInputFormat() {
+ super();
+ }
+
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+
+ reporter.setStatus(split.toString());
+
+ return new SequenceFileAsTextRecordReader(job, (FileSplit) split);
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileAsTextRecordReader.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * This class converts the input keys and values to their String forms by calling toString()
+ * method. This class to SequenceFileAsTextInputFormat class is as LineRecordReader
+ * class to TextInputFormat class.
+ *
+ */
+public class SequenceFileAsTextRecordReader extends SequenceFileRecordReader {
+
+ private Writable innerKey = super.createKey();
+ private Writable innerValue = super.createValue();
+
+ public SequenceFileAsTextRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+ super(conf, split);
+ }
+
+ public WritableComparable createKey() {
+ return new Text();
+ }
+
+ public Writable createValue() {
+ return new Text();
+ }
+
+ /** Read key/value pair in a line. */
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+ Text tKey = (Text) key;
+ Text tValue = (Text) value;
+ if (!super.next(innerKey, innerValue)) {
+ return false;
+ }
+ tKey.set(innerKey.toString());
+ tValue.set(innerValue.toString());
+ return true;
+ }
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=529378&r1=529377&r2=529378
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Mon Apr 16 12:35:15 2007
@@ -32,7 +32,7 @@
private long start;
private long end;
private boolean more = true;
- private Configuration conf;
+ protected Configuration conf;
public SequenceFileRecordReader(Configuration conf, FileSplit split)
throws IOException {
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class TestKeyValueTextInputFormat extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestKeyValueTextInputFormat.class.getName());
+
+ private static int MAX_LENGTH = 10000;
+
+ private static JobConf defaultConf = new JobConf();
+ private static FileSystem localFs = null;
+ static {
+ try {
+ localFs = FileSystem.getLocal(defaultConf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ "TestKeyValueTextInputFormat");
+
+ public void testFormat() throws Exception {
+ JobConf job = new JobConf();
+ Path file = new Path(workDir, "test.txt");
+
+ // A reporter that does nothing
+ Reporter reporter = Reporter.NULL;
+
+ int seed = new Random().nextInt();
+ LOG.info("seed = "+seed);
+ Random random = new Random(seed);
+
+ localFs.delete(workDir);
+ job.setInputPath(workDir);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+ LOG.debug("creating; entries = " + length);
+
+ // create a file with length entries
+ Writer writer = new OutputStreamWriter(localFs.create(file));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i*2));
+ writer.write("\t");
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+
+ // try splitting the file in a variety of sizes
+ TextInputFormat format = new KeyValueTextInputFormat();
+ format.configure(job);
+ for (int i = 0; i < 3; i++) {
+ int numSplits = random.nextInt(MAX_LENGTH/20)+1;
+ LOG.debug("splitting: requesting = " + numSplits);
+ InputSplit[] splits = format.getSplits(job, numSplits);
+ LOG.debug("splitting: got = " + splits.length);
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.length; j++) {
+ LOG.debug("split["+j+"]= " + splits[j]);
+ RecordReader reader =
+ format.getRecordReader(splits[j], job, reporter);
+ Class readerClass = reader.getClass();
+ assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass);
+
+ Writable key = reader.createKey();
+ Class keyClass = key.getClass();
+ Writable value = reader.createValue();
+ Class valueClass = value.getClass();
+ assertEquals("Key class is Text.", Text.class, keyClass);
+ assertEquals("Value class is Text.", Text.class, valueClass);
+ try {
+ int count = 0;
+ while (reader.next(key, value)) {
+ int v = Integer.parseInt(value.toString());
+ LOG.debug("read " + v);
+ if (bits.get(v)) {
+ LOG.warn("conflict with " + v +
+ " in split " + j +
+ " at position "+reader.getPos());
+ }
+ assertFalse("Key in multiple partitions.", bits.get(v));
+ bits.set(v);
+ count++;
+ }
+ LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+
+ }
+ }
+
+ private InputStream makeStream(String str) throws IOException {
+ Text text = new Text(str);
+ return new ByteArrayInputStream(text.getBytes(), 0, text.getLength());
+ }
+
+ public void testUTF8() throws Exception {
+ InputStream in = makeStream("abcd\u20acbdcd\u20ac");
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ LineRecordReader.readLine(in, out);
+ Text line = new Text();
+ line.set(out.toByteArray());
+ assertEquals("readLine changed utf8 characters",
+ "abcd\u20acbdcd\u20ac", line.toString());
+ in = makeStream("abc\u200axyz");
+ out.reset();
+ LineRecordReader.readLine(in, out);
+ line.set(out.toByteArray());
+ assertEquals("split on fake newline", "abc\u200axyz", line.toString());
+ }
+
+ public void testNewLines() throws Exception {
+ InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ LineRecordReader.readLine(in, out);
+ assertEquals("line1 length", 1, out.size());
+ out.reset();
+ LineRecordReader.readLine(in, out);
+ assertEquals("line2 length", 2, out.size());
+ out.reset();
+ LineRecordReader.readLine(in, out);
+ assertEquals("line3 length", 0, out.size());
+ out.reset();
+ LineRecordReader.readLine(in, out);
+ assertEquals("line4 length", 3, out.size());
+ out.reset();
+ LineRecordReader.readLine(in, out);
+ assertEquals("line5 length", 4, out.size());
+ out.reset();
+ LineRecordReader.readLine(in, out);
+ assertEquals("line5 length", 5, out.size());
+ assertEquals("end of file", 0, LineRecordReader.readLine(in, out));
+ }
+
+ private static void writeFile(FileSystem fs, Path name,
+ CompressionCodec codec,
+ String contents) throws IOException {
+ OutputStream stm;
+ if (codec == null) {
+ stm = fs.create(name);
+ } else {
+ stm = codec.createOutputStream(fs.create(name));
+ }
+ stm.write(contents.getBytes());
+ stm.close();
+ }
+
+ private static final Reporter voidReporter = Reporter.NULL;
+
+ private static List<Text> readSplit(InputFormat format,
+ InputSplit split,
+ JobConf job) throws IOException {
+ List<Text> result = new ArrayList<Text>();
+ RecordReader reader = format.getRecordReader(split, job,
+ voidReporter);
+ Text key = (Text) reader.createKey();
+ Text value = (Text) reader.createValue();
+ while (reader.next(key, value)) {
+ result.add(value);
+ value = (Text) reader.createValue();
+ }
+ return result;
+ }
+
+ /**
+ * Test using the gzip codec for reading
+ */
+ public static void testGzip() throws IOException {
+ JobConf job = new JobConf();
+ CompressionCodec gzip = new GzipCodec();
+ ReflectionUtils.setConf(gzip, job);
+ localFs.delete(workDir);
+ writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
+ "line-1\tthe quick\nline-2\tbrown\nline-3\tfox jumped\nline-4\tover\nline-5\t the lazy\nline-6\t dog\n");
+ writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+ "line-1\tthis is a test\nline-1\tof gzip\n");
+ job.setInputPath(workDir);
+ KeyValueTextInputFormat format = new KeyValueTextInputFormat();
+ format.configure(job);
+ InputSplit[] splits = format.getSplits(job, 100);
+ assertEquals("compressed splits == 2", 2, splits.length);
+ FileSplit tmp = (FileSplit) splits[0];
+ if (tmp.getPath().getName().equals("part2.txt.gz")) {
+ splits[0] = splits[1];
+ splits[1] = tmp;
+ }
+ List<Text> results = readSplit(format, splits[0], job);
+ assertEquals("splits[0] length", 6, results.size());
+ assertEquals("splits[0][5]", " dog", results.get(5).toString());
+ results = readSplit(format, splits[1], job);
+ assertEquals("splits[1] length", 2, results.size());
+ assertEquals("splits[1][0]", "this is a test",
+ results.get(0).toString());
+ assertEquals("splits[1][1]", "of gzip",
+ results.get(1).toString());
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestKeyValueTextInputFormat().testFormat();
+ }
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java?view=auto&rev=529378
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsTextInputFormat.java Mon Apr 16 12:35:15 2007
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+public class TestSequenceFileAsTextInputFormat extends TestCase {
+ private static final Log LOG = InputFormatBase.LOG;
+
+ private static int MAX_LENGTH = 10000;
+ private static Configuration conf = new Configuration();
+
+ public void testFormat() throws Exception {
+ JobConf job = new JobConf(conf);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+ Path file = new Path(dir, "test.seq");
+
+ Reporter reporter = Reporter.NULL;
+
+ int seed = new Random().nextInt();
+ //LOG.info("seed = "+seed);
+ Random random = new Random(seed);
+
+ fs.delete(dir);
+
+ job.setInputPath(dir);
+
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+ //LOG.info("creating; entries = " + length);
+
+ // create a file with length entries
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, file,
+ IntWritable.class, LongWritable.class);
+ try {
+ for (int i = 0; i < length; i++) {
+ IntWritable key = new IntWritable(i);
+ LongWritable value = new LongWritable(10 * i);
+ writer.append(key, value);
+ }
+ } finally {
+ writer.close();
+ }
+
+ // try splitting the file in a variety of sizes
+ InputFormat format = new SequenceFileAsTextInputFormat();
+
+ for (int i = 0; i < 3; i++) {
+ int numSplits =
+ random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
+ //LOG.info("splitting: requesting = " + numSplits);
+ InputSplit[] splits = format.getSplits(job, numSplits);
+ //LOG.info("splitting: got = " + splits.length);
+
+ // check each split
+ BitSet bits = new BitSet(length);
+ for (int j = 0; j < splits.length; j++) {
+ RecordReader reader =
+ format.getRecordReader(splits[j], job, reporter);
+ Class readerClass = reader.getClass();
+ assertEquals("reader class is SequenceFileAsTextRecordReader.", SequenceFileAsTextRecordReader.class, readerClass);
+ Text value = (Text)reader.createValue();
+ Text key = (Text)reader.createKey();
+ try {
+ int count = 0;
+ while (reader.next(key, value)) {
+ // if (bits.get(key.get())) {
+ // LOG.info("splits["+j+"]="+splits[j]+" : " + key.get());
+ // LOG.info("@"+reader.getPos());
+ // }
+ int keyInt = Integer.parseInt(key.toString());
+ assertFalse("Key in multiple partitions.", bits.get(keyInt));
+ bits.set(keyInt);
+ count++;
+ }
+ //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+ } finally {
+ reader.close();
+ }
+ }
+ assertEquals("Some keys in no partition.", length, bits.cardinality());
+ }
+
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestSequenceFileAsTextInputFormat().testFormat();
+ }
+}