You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/18 22:58:34 UTC
svn commit: r488438 - in /lucene/hadoop/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/s3/
src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/tes...
Author: cutting
Date: Mon Dec 18 13:58:33 2006
New Revision: 488438
URL: http://svn.apache.org/viewvc?view=rev&rev=488438
Log:
HADOOP-451. Add a Split interface. Contributed by Owen.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Dec 18 13:58:33 2006
@@ -122,6 +122,12 @@
34. HADOOP-823. Fix problem starting datanode when not all configured
data directories exist. (Bryan Pendleton via cutting)
+35. HADOOP-451. Add a Split interface. CAUTION: This incompatibly
+ changes the InputFormat and RecordReader interfaces. Not only is
+ FileSplit replaced with Split, but a FileSystem parameter is no
+ longer passed in several methods, input validation has changed,
+ etc. (omalley via cutting)
+
Release 0.9.2 - 2006-12-15
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Mon Dec 18 13:58:33 2006
@@ -366,7 +366,7 @@
<delete dir="${test.log.dir}"/>
<mkdir dir="${test.log.dir}"/>
<junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
- fork="yes" maxmemory="128m" dir="${basedir}"
+ fork="yes" maxmemory="256m" dir="${basedir}"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="test.build.data" value="${test.build.data}"/>
<sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java Mon Dec 18 13:58:33 2006
@@ -84,20 +84,20 @@
(and if there was, this index may need to be created for the first time
full file at a time... )
*/
- public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
- checkReady(fs, job);
- return ((StreamInputFormat) primary_).getFullFileSplits(fs, job);
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ return ((StreamInputFormat) primary_).getFullFileSplits(job);
}
/**
*/
- public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ FileSystem fs = ((FileSplit) split).getPath().getFileSystem(job);
checkReady(fs, job);
reporter.setStatus(split.toString());
ArrayList readers = new ArrayList();
- String primary = split.getPath().toString();
+ String primary = ((FileSplit) split).getPath().toString();
CompoundDirSpec spec = CompoundDirSpec.findInputSpecForPrimary(primary, job);
if (spec == null) {
throw new IOException("Did not find -input spec in JobConf for primary:" + primary);
@@ -106,7 +106,7 @@
InputFormat f = (InputFormat) fmts_.get(i);
Path path = new Path(spec.getPaths()[i][0]);
FileSplit fsplit = makeFullFileSplit(path);
- RecordReader r = f.getRecordReader(fs, fsplit, job, reporter);
+ RecordReader r = f.getRecordReader(fsplit, job, reporter);
readers.add(r);
}
@@ -115,7 +115,7 @@
private FileSplit makeFullFileSplit(Path path) throws IOException {
long len = fs_.getLength(path);
- return new FileSplit(path, 0, len);
+ return new FileSplit(path, 0, len, job_);
}
/*
@@ -188,6 +188,14 @@
}
}
+ public float getProgress() throws IOException {
+ if (primaryClosed_) {
+ return 1.0f;
+ } else {
+ return primaryReader_.getProgress();
+ }
+ }
+
public void close() throws IOException {
IOException firstErr = null;
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Mon Dec 18 13:58:33 2006
@@ -189,7 +189,8 @@
if (split.getStart() == 0) {
return leaf;
} else {
- return new FileSplit(new Path(leaf), split.getStart(), split.getLength()).toString();
+ return new FileSplit(new Path(leaf), split.getStart(),
+ split.getLength(), job_).toString();
}
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Mon Dec 18 13:58:33 2006
@@ -69,13 +69,7 @@
public abstract boolean next(Writable key, Writable value) throws IOException;
/** This implementation always returns true. */
- public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
- int n = inputDirs.length;
- boolean[] b = new boolean[n];
- for (int i = 0; i < n; i++) {
- b[i] = true;
- }
- return b;
+ public void validateInput(JobConf job) throws IOException {
}
/** Returns the current position in the input. */
@@ -88,6 +82,14 @@
in_.close();
}
+ public float getProgress() throws IOException {
+ if (end_ == start_) {
+ return 1.0f;
+ } else {
+ return (in_.getPos() - start_) / (end_ - start_);
+ }
+ }
+
public WritableComparable createKey() {
return new Text();
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Dec 18 13:58:33 2006
@@ -49,12 +49,7 @@
protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
/** This implementation always returns true. */
- public boolean[] areValidInputDirectories(FileSystem fileSys, Path[] inputDirs) throws IOException {
- boolean[] b = new boolean[inputDirs.length];
- for (int i = 0; i < inputDirs.length; ++i) {
- b[i] = true;
- }
- return b;
+ public void validateInput(JobConf job) throws IOException {
}
static boolean isGzippedInput(JobConf job) {
@@ -62,29 +57,29 @@
return "gzip".equals(val);
}
- public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits) throws IOException {
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
if (isGzippedInput(job)) {
- return getFullFileSplits(fs, job);
+ return getFullFileSplits(job);
} else {
- return super.getSplits(fs, job, numSplits);
+ return super.getSplits(job, numSplits);
}
}
/** For the compressed-files case: override InputFormatBase to produce one split. */
- FileSplit[] getFullFileSplits(FileSystem fs, JobConf job) throws IOException {
- Path[] files = listPaths(fs, job);
+ FileSplit[] getFullFileSplits(JobConf job) throws IOException {
+ Path[] files = listPaths(job);
int numSplits = files.length;
ArrayList splits = new ArrayList(numSplits);
for (int i = 0; i < files.length; i++) {
Path file = files[i];
- long splitSize = fs.getLength(file);
- splits.add(new FileSplit(file, 0, splitSize));
+ long splitSize = file.getFileSystem(job).getLength(file);
+ splits.add(new FileSplit(file, 0, splitSize, job));
}
return (FileSplit[]) splits.toArray(new FileSplit[splits.size()]);
}
- protected Path[] listPaths(FileSystem fs, JobConf job) throws IOException {
+ protected Path[] listPaths(JobConf job) throws IOException {
Path[] globs = job.getInputPaths();
ArrayList list = new ArrayList();
int dsup = globs.length;
@@ -93,6 +88,7 @@
LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
Path[] paths;
Path dir;
+ FileSystem fs = globs[d].getFileSystem(job);
PathFilter filter = new GlobFilter(fs, leafName);
dir = new Path(globs[d].getParent().toString());
if (dir == null) dir = new Path(".");
@@ -132,8 +128,11 @@
FileSystem fs_;
}
- public RecordReader getRecordReader(FileSystem fs, final FileSplit split, JobConf job,
- Reporter reporter) throws IOException {
+ public RecordReader getRecordReader(final InputSplit genericSplit,
+ JobConf job,
+ Reporter reporter) throws IOException {
+ FileSplit split = (FileSplit) genericSplit;
+ FileSystem fs = split.getPath().getFileSystem(job);
LOG.info("getRecordReader start.....split=" + split);
reporter.setStatus(split.toString());
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Mon Dec 18 13:58:33 2006
@@ -418,7 +418,7 @@
Path p = new Path(path);
long start = Long.parseLong(job.get("map.input.start"));
long length = Long.parseLong(job.get("map.input.length"));
- return new FileSplit(p, start, length);
+ return new FileSplit(p, start, length, job);
}
static class TaskId {
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java Mon Dec 18 13:58:33 2006
@@ -51,7 +51,8 @@
/**
*/
- public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+ public RecordReader getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
@@ -82,6 +83,9 @@
return new UTF8();
}
+ public float getProgress() {
+ return 1.0f;
+ }
}
ArrayList/*<InputFormat>*/fmts_;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Mon Dec 18 13:58:33 2006
@@ -44,7 +44,7 @@
public LocalFileSystem() {}
- /** @deprecated. */
+ /** @deprecated */
public LocalFileSystem(Configuration conf) throws IOException {
initialize(NAME, conf);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/s3/package.html Mon Dec 18 13:58:33 2006
@@ -5,8 +5,10 @@
org.apache.hadoop.fs.FileSystem} that uses <a href="http://aws.amazon.com/s3">Amazon S3</a>.</p>
<p>
-Files are stored in S3 as blocks (represented by {@link Block}), which have an ID and a length.
-Block metadata is stored in S3 as a small record (represented by {@link INode}) using the URL-encoded
+Files are stored in S3 as blocks (represented by
+{@link org.apache.hadoop.fs.s3.Block}), which have an ID and a length.
+Block metadata is stored in S3 as a small record (represented by
+{@link org.apache.hadoop.fs.s3.INode}) using the URL-encoded
path string as a key. Inodes record the file type (regular file or directory) and the list of blocks.
This design makes it easy to seek to any given position in a file by reading the inode data to compute
which block to access, then using S3's support for
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Mon Dec 18 13:58:33 2006
@@ -23,18 +23,18 @@
import java.io.DataOutput;
import java.io.File; // deprecated
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/** A section of an input file. Returned by {@link
- * InputFormat#getSplits(FileSystem, JobConf, int)} and passed to
- * {@link InputFormat#getRecordReader(FileSystem,FileSplit,JobConf,Reporter)}. */
-public class FileSplit implements Writable {
+ * InputFormat#getSplits(JobConf, int)} and passed to
+ * {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}. */
+public class FileSplit implements InputSplit {
private Path file;
private long start;
private long length;
+ private JobConf conf;
FileSplit() {}
@@ -44,10 +44,11 @@
* @param start the position of the first byte in the file to process
* @param length the number of bytes in the file to process
*/
- public FileSplit(Path file, long start, long length) {
+ public FileSplit(Path file, long start, long length, JobConf conf) {
this.file = file;
this.start = start;
this.length = length;
+ this.conf = conf;
}
/** @deprecated Call {@link #getPath()} instead. */
@@ -79,5 +80,13 @@
length = in.readLong();
}
-
+ public String[] getLocations() throws IOException {
+ String[][] hints = file.getFileSystem(conf).
+ getFileCacheHints(file, start, length);
+ if (hints != null && hints.length > 0) {
+ return hints[0];
+ }
+ return new String[]{};
+ }
+
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java Mon Dec 18 13:58:33 2006
@@ -33,13 +33,10 @@
* Are the input directories valid? This method is used to test the input
* directories when a job is submitted so that the framework can fail early
* with a useful error message when the input directory does not exist.
- * @param fileSys the file system to check for the directories
- * @param inputDirs the list of input directories
- * @return is each inputDir valid?
- * @throws IOException
+ * @param job the job to check
+ * @throws InvalidInputException if the job does not have valid input
*/
- boolean[] areValidInputDirectories(FileSystem fileSys,
- Path[] inputDirs) throws IOException;
+ void validateInput(JobConf job) throws IOException;
/** Splits a set of input files. One split is created per map task.
*
@@ -47,17 +44,16 @@
* @param numSplits the desired number of splits
* @return the splits
*/
- FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
- throws IOException;
+ InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
/** Construct a {@link RecordReader} for a {@link FileSplit}.
*
- * @param split the {@link FileSplit}
+ * @param split the {@link InputSplit}
* @param job the job that this split belongs to
* @return a {@link RecordReader}
*/
- RecordReader getRecordReader(FileSystem ignored, FileSplit split,
- JobConf job, Reporter reporter)
- throws IOException;
+ RecordReader getRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Mon Dec 18 13:58:33 2006
@@ -18,9 +18,11 @@
package org.apache.hadoop.mapred;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.*;
@@ -52,8 +54,7 @@
return true;
}
- public abstract RecordReader getRecordReader(FileSystem fs,
- FileSplit split,
+ public abstract RecordReader getRecordReader(InputSplit split,
JobConf job,
Reporter reporter)
throws IOException;
@@ -66,15 +67,17 @@
* that is appended to all input dirs specified by job, and if the given fs
* lists those too, each is added to the returned array of Path.
*
- * @param fs
- * @param job
- * @return array of Path objects, never zero length.
+ * @param job the job to list input paths for
+ * @return array of Path objects
* @throws IOException if zero items.
*/
- protected Path[] listPaths(FileSystem ignored, JobConf job)
+ protected Path[] listPaths(JobConf job)
throws IOException {
- Path[] dirs = job.getInputPaths();
String subdir = job.get("mapred.input.subdir");
+ Path[] dirs = job.getInputPaths();
+ if (dirs.length == 0) {
+ throw new IOException("No input directories specified in job");
+ }
ArrayList result = new ArrayList();
for (int i = 0; i < dirs.length; i++) {
FileSystem fs = dirs[i].getFileSystem(job);
@@ -96,30 +99,34 @@
}
}
- if (result.size() == 0) {
- throw new IOException("No input directories specified in: "+job);
- }
return (Path[])result.toArray(new Path[result.size()]);
}
- // NOTE: should really pass a Configuration here, not a FileSystem
- public boolean[] areValidInputDirectories(FileSystem fs, Path[] inputDirs)
- throws IOException {
- boolean[] result = new boolean[inputDirs.length];
+ public void validateInput(JobConf job) throws IOException {
+ Path[] inputDirs = job.getInputPaths();
+ List<IOException> result = new ArrayList();
for(int i=0; i < inputDirs.length; ++i) {
- result[i] =
- inputDirs[i].getFileSystem(fs.getConf()).isDirectory(inputDirs[i]);
+ FileSystem fs = inputDirs[i].getFileSystem(job);
+ if (!fs.exists(inputDirs[i])) {
+ result.add(new FileNotFoundException("Input directory " +
+ inputDirs[i] +
+ " doesn't exist."));
+ } else if (!fs.isDirectory(inputDirs[i])) {
+ result.add(new InvalidFileTypeException
+ ("Invalid input path, expecting directory : " +
+ inputDirs[i]));
+ }
+ }
+ if (!result.isEmpty()) {
+ throw new InvalidInputException(result);
}
- return result;
}
- /** Splits files returned by {@link #listPaths(FileSystem,JobConf)} when
+ /** Splits files returned by {@link #listPaths(JobConf)} when
* they're too big.*/
- public FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
+ public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
-
- Path[] files = listPaths(ignored, job);
-
+ Path[] files = listPaths(job);
long totalSize = 0; // compute total size
for (int i = 0; i < files.length; i++) { // check we have valid files
Path file = files[i];
@@ -145,16 +152,18 @@
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- splits.add(new FileSplit(file, length-bytesRemaining, splitSize));
+ splits.add(new FileSplit(file, length-bytesRemaining, splitSize,
+ job));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
- splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining));
+ splits.add(new FileSplit(file, length-bytesRemaining,
+ bytesRemaining, job));
}
} else {
if (length != 0) {
- splits.add(new FileSplit(file, 0, length));
+ splits.add(new FileSplit(file, 0, length, job));
}
}
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java?view=auto&rev=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputSplit.java Mon Dec 18 13:58:33 2006
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The description of the data for a single map task.
+ * @author Owen O'Malley
+ */
+public interface InputSplit extends Writable {
+
+ /**
+ * Get the list of hostnames where the input split is located.
+ * @return A list of prefered hostnames
+ * @throws IOException
+ */
+ String[] getLocations() throws IOException;
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java?view=auto&rev=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InvalidInputException.java Mon Dec 18 13:58:33 2006
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Iterator;
+
+/**
+ * This class wraps a list of problems with the input, so that the user
+ * can get a list of problems together instead of finding and fixing them one
+ * by one.
+ * @author Owen O'Malley
+ */
+public class InvalidInputException extends IOException {
+ private List<IOException> problems;
+
+ /**
+ * Create the exception with the given list.
+ * @param probs the list of problems to report. this list is not copied.
+ */
+ public InvalidInputException(List<IOException> probs) {
+ problems = probs;
+ }
+
+ /**
+ * Get the complete list of the problems reported.
+ * @return the list of problems, which must not be modified
+ */
+ public List<IOException> getProblems() {
+ return problems;
+ }
+
+ /**
+ * Get a summary message of the problems found.
+ * @return the concatenated messages from all of the problems.
+ */
+ public String getMessage() {
+ StringBuffer result = new StringBuffer();
+ Iterator<IOException> itr = problems.iterator();
+ while(itr.hasNext()) {
+ result.append(itr.next().getMessage());
+ if (itr.hasNext()) {
+ result.append("\n");
+ }
+ }
+ return result.toString();
+ }
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Mon Dec 18 13:58:33 2006
@@ -153,7 +153,8 @@
if (isMap) {
FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
conf.getLong("map.input.start", 0),
- conf.getLong("map.input.length", 0));
+ conf.getLong("map.input.length", 0),
+ conf);
task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"),
taskId, partition, split);
} else {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Dec 18 13:58:33 2006
@@ -313,30 +313,7 @@
}
// input paths should exist.
- boolean[] validDirs =
- job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs);
- for(int i=0; i < validDirs.length; ++i) {
- if (!validDirs[i]) {
- String msg = null ;
- if( !userFileSys.exists(inputDirs[i]) ){
- msg = "Input directory " + inputDirs[i] +
- " doesn't exist in " + userFileSys.getName();
- LOG.error(msg);
- throw new FileNotFoundException(msg);
- }else if( !userFileSys.isDirectory(inputDirs[i])){
- msg = "Invalid input path, expecting directory : " + inputDirs[i] ;
- LOG.error(msg);
- throw new InvalidFileTypeException(msg);
- }else{
- // some other error
- msg = "Input directory " + inputDirs[i] +
- " in " + userFileSys.getName() + " is invalid.";
- LOG.error(msg);
- throw new IOException(msg);
- }
- }
- }
-
+ job.getInputFormat().validateInput(job);
// Check the output specification
job.getOutputFormat().checkOutputSpecs(fs, job);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Dec 18 13:58:33 2006
@@ -122,7 +122,7 @@
}
InputFormat inputFormat = conf.getInputFormat();
- FileSplit[] splits = inputFormat.getSplits(fs, conf, numMapTasks);
+ InputSplit[] splits = inputFormat.getSplits(conf, numMapTasks);
//
// sort splits by decreasing length, to reduce job's tail
@@ -168,22 +168,16 @@
// Obtain some tasktracker-cache information for the map task splits.
//
for (int i = 0; i < maps.length; i++) {
- String hints[][] =
- fs.getFileCacheHints(splits[i].getPath(), splits[i].getStart(),
- splits[i].getLength());
-
- if (hints != null) {
- for (int k = 0; k < hints.length; k++) {
- for (int j = 0; j < hints[k].length; j++) {
- ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k][j]);
- if (hostMaps == null) {
- hostMaps = new ArrayList();
- hostToMaps.put(hints[k][j], hostMaps);
- }
- hostMaps.add(maps[i]);
- }
- }
+ String hints[] = splits[i].getLocations();
+ for (int k = 0; k < hints.length; k++) {
+ ArrayList hostMaps = (ArrayList)hostToMaps.get(hints[k]);
+ if (hostMaps == null) {
+ hostMaps = new ArrayList();
+ hostToMaps.put(hints[k], hostMaps);
}
+ hostMaps.add(maps[i]);
+
+ }
}
this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Dec 18 13:58:33 2006
@@ -89,8 +89,8 @@
public void run() {
try {
// split input into minimum number of splits
- FileSplit[] splits;
- splits = job.getInputFormat().getSplits(fs, job, 1);
+ InputSplit[] splits;
+ splits = job.getInputFormat().getSplits(job, 1);
String jobId = profile.getJobId();
// run a map task for each split
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Dec 18 13:58:33 2006
@@ -86,14 +86,14 @@
private MapTaskMetrics myMetrics = null;
- private FileSplit split;
+ private InputSplit split;
private MapOutputFile mapOutputFile = new MapOutputFile();
private JobConf conf;
public MapTask() {}
public MapTask(String jobId, String jobFile, String tipId, String taskId,
- int partition, FileSplit split) {
+ int partition, InputSplit split) {
super(jobId, jobFile, tipId, taskId, partition);
this.split = split;
myMetrics = new MapTaskMetrics(taskId);
@@ -103,18 +103,25 @@
return true;
}
- public void localizeConfiguration(JobConf conf) {
+ public void localizeConfiguration(JobConf conf) throws IOException {
super.localizeConfiguration(conf);
- conf.set("map.input.file", split.getPath().toString());
- conf.setLong("map.input.start", split.getStart());
- conf.setLong("map.input.length", split.getLength());
+ Path localSplit = new Path(new Path(getJobFile()).getParent(),
+ "split.dta");
+ DataOutputStream out = LocalFileSystem.get(conf).create(localSplit);
+ split.write(out);
+ out.close();
+ if (split instanceof FileSplit) {
+ conf.set("map.input.file", ((FileSplit) split).getPath().toString());
+ conf.setLong("map.input.start", ((FileSplit) split).getStart());
+ conf.setLong("map.input.length", ((FileSplit) split).getLength());
+ }
}
public TaskRunner createRunner(TaskTracker tracker) {
return new MapTaskRunner(this, tracker, this.conf);
}
- public FileSplit getSplit() { return split; }
+ public InputSplit getSplit() { return split; }
public void write(DataOutput out) throws IOException {
super.write(out);
@@ -139,11 +146,9 @@
MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
final RecordReader rawIn = // open input
- job.getInputFormat().getRecordReader
- (FileSystem.get(job), split, job, reporter);
+ job.getInputFormat().getRecordReader(split, job, reporter);
RecordReader in = new RecordReader() { // wrap in progress reporter
- private float perByte = 1.0f /(float)split.getLength();
public WritableComparable createKey() {
return rawIn.createKey();
@@ -156,9 +161,7 @@
public synchronized boolean next(Writable key, Writable value)
throws IOException {
- float progress = // compute progress
- (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
- reportProgress(umbilical, progress);
+ reportProgress(umbilical, getProgress());
long beforePos = getPos();
boolean ret = rawIn.next(key, value);
myMetrics.mapInput(getPos() - beforePos);
@@ -166,6 +169,9 @@
}
public long getPos() throws IOException { return rawIn.getPos(); }
public void close() throws IOException { rawIn.close(); }
+ public float getProgress() throws IOException {
+ return rawIn.getProgress();
+ }
};
MapRunnable runner =
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java Mon Dec 18 13:58:33 2006
@@ -55,4 +55,10 @@
/** Close this to future operations.*/
public void close() throws IOException;
+ /**
+ * How far has the reader gone through the input.
+ * @return progress from 0.0 to 1.0
+ * @throws IOException
+ */
+ float getProgress() throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon Dec 18 13:58:33 2006
@@ -100,7 +100,7 @@
/**
* Localize the given JobConf to be specific for this task.
*/
- public void localizeConfiguration(JobConf conf) {
+ public void localizeConfiguration(JobConf conf) throws IOException {
super.localizeConfiguration(conf);
conf.setNumMapTasks(numMaps);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFilter.java Mon Dec 18 13:58:33 2006
@@ -53,19 +53,18 @@
}
/** Create a record reader for the given split
- * @param fs file system where the file split is stored
* @param split file split
* @param job job configuration
* @param reporter reporter who sends report to task tracker
* @return RecordReader
*/
- public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+ public RecordReader getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(split.toString());
- return new FilterRecordReader(job, split);
+ return new FilterRecordReader(job, (FileSplit) split);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileInputFormat.java Mon Dec 18 13:58:33 2006
@@ -33,26 +33,26 @@
setMinSplitSize(SequenceFile.SYNC_INTERVAL);
}
- protected Path[] listPaths(FileSystem fs, JobConf job)
+ protected Path[] listPaths(JobConf job)
throws IOException {
- Path[] files = super.listPaths(fs, job);
+ Path[] files = super.listPaths(job);
for (int i = 0; i < files.length; i++) {
Path file = files[i];
- if (fs.isDirectory(file)) { // it's a MapFile
+ if (file.getFileSystem(job).isDirectory(file)) { // it's a MapFile
files[i] = new Path(file, MapFile.DATA_FILE_NAME); // use the data file
}
}
return files;
}
- public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+ public RecordReader getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(split.toString());
- return new SequenceFileRecordReader(job, split);
+ return new SequenceFileRecordReader(job, (FileSplit) split);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileRecordReader.java Mon Dec 18 13:58:33 2006
@@ -29,6 +29,7 @@
/** An {@link RecordReader} for {@link SequenceFile}s. */
public class SequenceFileRecordReader implements RecordReader {
private SequenceFile.Reader in;
+ private long start;
private long end;
private boolean more = true;
private Configuration conf;
@@ -93,6 +94,18 @@
protected synchronized void getCurrentValue(Writable value)
throws IOException {
in.getCurrentValue(value);
+ }
+
+ /**
+ * Return the progress within the input split
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return (in.getPosition() - start) / (end - start);
+ }
}
public synchronized long getPos() throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon Dec 18 13:58:33 2006
@@ -119,7 +119,7 @@
/**
* Localize the given JobConf to be specific for this task.
*/
- public void localizeConfiguration(JobConf conf) {
+ public void localizeConfiguration(JobConf conf) throws IOException {
conf.set("mapred.tip.id", tipId);
conf.set("mapred.task.id", taskId);
conf.setBoolean("mapred.task.is.map",isMapTask());
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Dec 18 13:58:33 2006
@@ -52,7 +52,7 @@
// Defines the TIP
private String jobFile = null;
- private FileSplit split = null;
+ private InputSplit split = null;
private int numMaps;
private int partition;
private JobTracker jobtracker;
@@ -88,7 +88,7 @@
/**
* Constructor for MapTask
*/
- public TaskInProgress(String uniqueString, String jobFile, FileSplit split,
+ public TaskInProgress(String uniqueString, String jobFile, InputSplit split,
JobTracker jobtracker, JobConf conf,
JobInProgress job, int partition) {
this.jobFile = jobFile;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Mon Dec 18 13:58:33 2006
@@ -40,6 +40,7 @@
}
protected static class LineRecordReader implements RecordReader {
+ private long start;
private long pos;
private long end;
private BufferedInputStream in;
@@ -61,6 +62,7 @@
public LineRecordReader(InputStream in, long offset, long endOffset) {
this.in = new BufferedInputStream(in);
+ this.start = offset;
this.pos = offset;
this.end = endOffset;
}
@@ -73,6 +75,17 @@
return new Text();
}
+ /**
+ * Get the progress within the split
+ */
+ public float getProgress() {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return (pos - start) / (end - start);
+ }
+ }
+
/** Read a line. */
public synchronized boolean next(Writable key, Writable value)
throws IOException {
@@ -101,18 +114,19 @@
}
- public RecordReader getRecordReader(FileSystem fs, FileSplit split,
+ public RecordReader getRecordReader(InputSplit genericSplit,
JobConf job, Reporter reporter)
throws IOException {
- reporter.setStatus(split.toString());
-
+ reporter.setStatus(genericSplit.toString());
+ FileSplit split = (FileSplit) genericSplit;
long start = split.getStart();
long end = start + split.getLength();
final Path file = split.getPath();
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
+ FileSystem fs = FileSystem.get(job);
FSDataInputStream fileIn = fs.open(split.getPath());
InputStream in = fileIn;
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/EmptyInputFormat.java Mon Dec 18 13:58:33 2006
@@ -31,7 +31,7 @@
return new FileSplit[0];
}
- public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
- return new SequenceFileRecordReader(job, split);
+ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ return new SequenceFileRecordReader(job, (FileSplit) split);
}
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestEmptyJobWithDFS.java Mon Dec 18 13:58:33 2006
@@ -22,6 +22,8 @@
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +37,8 @@
* A JUnit test to test Map-Reduce empty jobs Mini-DFS.
*/
public class TestEmptyJobWithDFS extends TestCase {
+ private static final Log LOG =
+ LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
/**
* Simple method running a MapReduce job with no input data. Used
@@ -58,6 +62,7 @@
FileSystem fs = FileSystem.getNamed(fileSys, conf);
fs.delete(outDir);
if (!fs.mkdirs(inDir)) {
+ LOG.warn("Can't create " + inDir);
return false;
}
@@ -88,6 +93,7 @@
}
}
// return job result
+ LOG.info("job is complete: " + runningJob.isSuccessful());
return (runningJob.isSuccessful());
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java Mon Dec 18 13:58:33 2006
@@ -80,13 +80,14 @@
numSplits =
random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
}
- FileSplit[] splits = format.getSplits(fs, job, numSplits);
+ InputSplit[] splits = format.getSplits(job, numSplits);
// check each split
int count = 0;
+ LOG.info("Generated " + splits.length + " splits.");
for (int j = 0; j < splits.length; j++) {
RecordReader reader =
- format.getRecordReader(fs, splits[j], job, reporter);
+ format.getRecordReader(splits[j], job, reporter);
try {
while (reader.next(key, value)) {
LOG.info("Accept record "+key.toString());
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java Mon Dec 18 13:58:33 2006
@@ -83,14 +83,14 @@
int numSplits =
random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
//LOG.info("splitting: requesting = " + numSplits);
- FileSplit[] splits = format.getSplits(fs, job, numSplits);
+ InputSplit[] splits = format.getSplits(job, numSplits);
//LOG.info("splitting: got = " + splits.length);
// check each split
BitSet bits = new BitSet(length);
for (int j = 0; j < splits.length; j++) {
RecordReader reader =
- format.getRecordReader(fs, splits[j], job, reporter);
+ format.getRecordReader(splits[j], job, reporter);
try {
int count = 0;
while (reader.next(key, value)) {
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Mon Dec 18 13:58:33 2006
@@ -88,16 +88,15 @@
for (int i = 0; i < 3; i++) {
int numSplits = random.nextInt(MAX_LENGTH/20)+1;
LOG.debug("splitting: requesting = " + numSplits);
- FileSplit[] splits = format.getSplits(localFs, job, numSplits);
+ InputSplit[] splits = format.getSplits(job, numSplits);
LOG.debug("splitting: got = " + splits.length);
// check each split
BitSet bits = new BitSet(length);
for (int j = 0; j < splits.length; j++) {
- LOG.debug("split["+j+"]= " + splits[j].getStart() + "+" +
- splits[j].getLength());
+ LOG.debug("split["+j+"]= " + splits[j]);
RecordReader reader =
- format.getRecordReader(localFs, splits[j], job, reporter);
+ format.getRecordReader(splits[j], job, reporter);
try {
int count = 0;
while (reader.next(key, value)) {
@@ -186,10 +185,10 @@
private static final Reporter voidReporter = new VoidReporter();
private static List<Text> readSplit(InputFormat format,
- FileSplit split,
+ InputSplit split,
JobConf job) throws IOException {
List<Text> result = new ArrayList<Text>();
- RecordReader reader = format.getRecordReader(localFs, split, job,
+ RecordReader reader = format.getRecordReader(split, job,
voidReporter);
LongWritable key = (LongWritable) reader.createKey();
Text value = (Text) reader.createValue();
@@ -215,10 +214,10 @@
job.setInputPath(workDir);
TextInputFormat format = new TextInputFormat();
format.configure(job);
- FileSplit[] splits = format.getSplits(localFs, job, 100);
+ InputSplit[] splits = format.getSplits(job, 100);
assertEquals("compressed splits == 2", 2, splits.length);
- if (splits[0].getPath().getName().equals("part2.txt.gz")) {
- FileSplit tmp = splits[0];
+ FileSplit tmp = (FileSplit) splits[0];
+ if (tmp.getPath().getName().equals("part2.txt.gz")) {
splits[0] = splits[1];
splits[1] = tmp;
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java?view=diff&rev=488438&r1=488437&r2=488438
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/record/test/TestWritable.java Mon Dec 18 13:58:33 2006
@@ -26,7 +26,7 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.*;
import org.apache.commons.logging.*;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputFormatBase;
import org.apache.hadoop.mapred.JobConf;
@@ -93,14 +93,14 @@
int numSplits =
random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
//LOG.info("splitting: requesting = " + numSplits);
- FileSplit[] splits = format.getSplits(fs, job, numSplits);
+ InputSplit[] splits = format.getSplits(job, numSplits);
//LOG.info("splitting: got = " + splits.length);
// check each split
BitSet bits = new BitSet(length);
for (int j = 0; j < splits.length; j++) {
RecordReader reader =
- format.getRecordReader(fs, splits[j], job, reporter);
+ format.getRecordReader(splits[j], job, reporter);
try {
int count = 0;
while (reader.next(key, value)) {