You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/08/07 13:54:25 UTC
svn commit: r801959 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/lib/
src/java/org/apache/hadoop/mapreduce/lib/input/
src/java/org/apache/hadoop/mapreduce/lib/output/
src/test/mapred/org/apache/h...
Author: ddas
Date: Fri Aug 7 11:54:25 2009
New Revision: 801959
URL: http://svn.apache.org/viewvc?rev=801959&view=rev
Log:
MAPREDUCE-375. Change org.apache.hadoop.mapred.lib.NLineInputFormat and org.apache.hadoop.mapred.MapFileOutputFormat to use new api. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug 7 11:54:25 2009
@@ -171,6 +171,10 @@
MAPREDUCE-670. Creates ant target for 10 mins patch test build.
(Jothi Padmanabhan via gkesavan)
+ MAPREDUCE-375. Change org.apache.hadoop.mapred.lib.NLineInputFormat
+ and org.apache.hadoop.mapred.MapFileOutputFormat to use new api.
+ (Amareshwari Sriramadasu via ddas)
+
BUG FIXES
MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
(Aaron Kimball via matei)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Fri Aug 7 11:54:25 2009
@@ -22,7 +22,6 @@
import java.io.DataInput;
import java.io.DataOutput;
-import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.fs.Path;
/** A section of an input file. Returned by {@link
@@ -34,12 +33,10 @@
@Deprecated
public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
implements InputSplit {
- private Path file;
- private long start;
- private long length;
- private String[] hosts;
-
- FileSplit() {}
+ org.apache.hadoop.mapreduce.lib.input.FileSplit fs;
+ FileSplit() {
+ fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
+ }
/** Constructs a split.
* @deprecated
@@ -60,45 +57,38 @@
* @param hosts the list of hosts containing the block, possibly null
*/
public FileSplit(Path file, long start, long length, String[] hosts) {
- this.file = file;
- this.start = start;
- this.length = length;
- this.hosts = hosts;
+ fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
+ length, hosts);
+ }
+
+ public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
+ this.fs = fs;
}
/** The file containing this split's data. */
- public Path getPath() { return file; }
+ public Path getPath() { return fs.getPath(); }
/** The position of the first byte in the file to process. */
- public long getStart() { return start; }
+ public long getStart() { return fs.getStart(); }
/** The number of bytes in the file to process. */
- public long getLength() { return length; }
+ public long getLength() { return fs.getLength(); }
- public String toString() { return file + ":" + start + "+" + length; }
+ public String toString() { return fs.toString(); }
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, file.toString());
- out.writeLong(start);
- out.writeLong(length);
+ fs.write(out);
}
public void readFields(DataInput in) throws IOException {
- file = new Path(UTF8.readString(in));
- start = in.readLong();
- length = in.readLong();
- hosts = null;
+ fs.readFields(in);
}
public String[] getLocations() throws IOException {
- if (this.hosts == null) {
- return new String[]{};
- } else {
- return this.hosts;
- }
+ return fs.getLocations();
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Fri Aug 7 11:54:25 2009
@@ -19,11 +19,9 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.util.Arrays;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +33,11 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-/** An {@link OutputFormat} that writes {@link MapFile}s. */
+/** An {@link OutputFormat} that writes {@link MapFile}s.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat} instead
+ */
+@Deprecated
public class MapFileOutputFormat
extends FileOutputFormat<WritableComparable, Writable> {
@@ -81,18 +83,9 @@
/** Open the output generated by this format. */
public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
- throws IOException {
- FileSystem fs = dir.getFileSystem(conf);
- Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
-
- // sort names, so that hash partitioning works
- Arrays.sort(names);
-
- MapFile.Reader[] parts = new MapFile.Reader[names.length];
- for (int i = 0; i < names.length; i++) {
- parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
- }
- return parts;
+ throws IOException {
+ return org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat.
+ getReaders(dir, conf);
}
/** Get an entry from output generated by this class. */
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java Fri Aug 7 11:54:25 2009
@@ -21,10 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -35,7 +32,6 @@
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.LineReader;
/**
* NLineInputFormat which splits N lines of input as one split.
@@ -54,8 +50,10 @@
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
+ * @deprecated Use
+ * {@link org.apache.hadoop.mapreduce.lib.input.NLineInputFormat} instead
*/
-
+@Deprecated
public class NLineInputFormat extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
private int N = 1;
@@ -79,46 +77,10 @@
throws IOException {
ArrayList<FileSplit> splits = new ArrayList<FileSplit>();
for (FileStatus status : listStatus(job)) {
- Path fileName = status.getPath();
- if (status.isDir()) {
- throw new IOException("Not a file: " + fileName);
- }
- FileSystem fs = fileName.getFileSystem(job);
- LineReader lr = null;
- try {
- FSDataInputStream in = fs.open(fileName);
- lr = new LineReader(in, job);
- Text line = new Text();
- int numLines = 0;
- long begin = 0;
- long length = 0;
- int num = -1;
- while ((num = lr.readLine(line)) > 0) {
- numLines++;
- length += num;
- if (numLines == N) {
- // NLineInputFormat uses LineRecordReader, which always reads (and consumes)
- //at least one character out of its upper split boundary. So to make sure that
- //each mapper gets N lines, we move back the upper split limits of each split
- //by one character here.
- if (begin == 0) {
- splits.add(new FileSplit(fileName, begin, length - 1, new String[] {}));
- } else {
- splits.add(new FileSplit(fileName, begin - 1, length, new String[] {}));
- }
- begin += length;
- length = 0;
- numLines = 0;
- }
- }
- if (numLines != 0) {
- splits.add(new FileSplit(fileName, begin, length, new String[]{}));
- }
-
- } finally {
- if (lr != null) {
- lr.close();
- }
+ for (org.apache.hadoop.mapreduce.lib.input.FileSplit split :
+ org.apache.hadoop.mapreduce.lib.input.
+ NLineInputFormat.getSplitsForFile(status, job, N)) {
+ splits.add(new FileSplit(split));
}
}
return splits.toArray(new FileSplit[splits.size()]);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Fri Aug 7 11:54:25 2009
@@ -38,7 +38,7 @@
private long length;
private String[] hosts;
- FileSplit() {}
+ public FileSplit() {}
/** Constructs a split with host information
*
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Fri Aug 7 11:54:25 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * NLineInputFormat which splits N lines of input as one split.
+ *
+ * In many "pleasantly" parallel applications, each process/mapper
+ * processes the same input file (s), but with computations are
+ * controlled by different parameters.(Referred to as "parameter sweeps").
+ * One way to achieve this, is to specify a set of parameters
+ * (one set per line) as input in a control file
+ * (which is the input path to the map-reduce application,
+ * where as the input dataset is specified
+ * via a config variable in JobConf.).
+ *
+ * The NLineInputFormat can be used in such applications, that splits
+ * the input file such that by default, one line is fed as
+ * a value to one map task, and key is the offset.
+ * i.e. (k,v) is (LongWritable, Text).
+ * The location hints will span the whole mapred cluster.
+ */
+
+public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {
+
+ public RecordReader<LongWritable, Text> createRecordReader(
+ InputSplit genericSplit, TaskAttemptContext context)
+ throws IOException {
+ context.setStatus(genericSplit.toString());
+ return new LineRecordReader();
+ }
+
+ /**
+ * Logically splits the set of input files for the job, splits N lines
+ * of the input as one split.
+ *
+ * @see FileInputFormat#getSplits(JobContext)
+ */
+ public List<InputSplit> getSplits(JobContext job)
+ throws IOException {
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ int numLinesPerSplit = getNumLinesPerSplit(job);
+ for (FileStatus status : listStatus(job)) {
+ splits.addAll(getSplitsForFile(status,
+ job.getConfiguration(), numLinesPerSplit));
+ }
+ return splits;
+ }
+
+ public static List<FileSplit> getSplitsForFile(FileStatus status,
+ Configuration conf, int numLinesPerSplit) throws IOException {
+ List<FileSplit> splits = new ArrayList<FileSplit> ();
+ Path fileName = status.getPath();
+ if (status.isDir()) {
+ throw new IOException("Not a file: " + fileName);
+ }
+ FileSystem fs = fileName.getFileSystem(conf);
+ LineReader lr = null;
+ try {
+ FSDataInputStream in = fs.open(fileName);
+ lr = new LineReader(in, conf);
+ Text line = new Text();
+ int numLines = 0;
+ long begin = 0;
+ long length = 0;
+ int num = -1;
+ while ((num = lr.readLine(line)) > 0) {
+ numLines++;
+ length += num;
+ if (numLines == numLinesPerSplit) {
+ // NLineInputFormat uses LineRecordReader, which always reads
+ // (and consumes) at least one character out of its upper split
+ // boundary. So to make sure that each mapper gets N lines, we
+ // move back the upper split limits of each split
+ // by one character here.
+ if (begin == 0) {
+ splits.add(new FileSplit(fileName, begin, length - 1,
+ new String[] {}));
+ } else {
+ splits.add(new FileSplit(fileName, begin - 1, length,
+ new String[] {}));
+ }
+ begin += length;
+ length = 0;
+ numLines = 0;
+ }
+ }
+ if (numLines != 0) {
+ splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+ }
+ } finally {
+ if (lr != null) {
+ lr.close();
+ }
+ }
+ return splits;
+ }
+
+ /**
+ * Set the number of lines per split
+ * @param job the job to modify
+ * @param numLines the number of lines per split
+ */
+ public static void setNumLinesPerSplit(Job job, int numLines) {
+ job.getConfiguration().setInt(
+ "mapred.line.input.format.linespermap", numLines);
+ }
+
+ /**
+ * Get the number of lines per split
+ * @param job the job
+ * @return the number of lines per split
+ */
+ public static int getNumLinesPerSplit(JobContext job) {
+ return job.getConfiguration().getInt(
+ "mapred.line.input.format.linespermap", 1);
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java Fri Aug 7 11:54:25 2009
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes
+ * {@link MapFile}s.
+ */
+public class MapFileOutputFormat
+ extends FileOutputFormat<WritableComparable<?>, Writable> {
+
+ public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
+ TaskAttemptContext context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ CompressionCodec codec = null;
+ CompressionType compressionType = CompressionType.NONE;
+ if (getCompressOutput(context)) {
+ // find the kind of compression to do
+ compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
+
+ // find the right codec
+ Class<?> codecClass = getOutputCompressorClass(context,
+ DefaultCodec.class);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+ }
+
+ Path file = getDefaultWorkFile(context, "");
+ FileSystem fs = file.getFileSystem(conf);
+ // ignore the progress parameter, since MapFile is local
+ final MapFile.Writer out =
+ new MapFile.Writer(conf, fs, file.toString(),
+ context.getOutputKeyClass().asSubclass(WritableComparable.class),
+ context.getOutputValueClass().asSubclass(Writable.class),
+ compressionType, codec, context);
+
+ return new RecordWriter<WritableComparable<?>, Writable>() {
+ public void write(WritableComparable<?> key, Writable value)
+ throws IOException {
+ out.append(key, value);
+ }
+
+ public void close(TaskAttemptContext context) throws IOException {
+ out.close();
+ }
+ };
+ }
+
+ /** Open the output generated by this format. */
+ public static MapFile.Reader[] getReaders(Path dir,
+ Configuration conf) throws IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
+
+ // sort names, so that hash partitioning works
+ Arrays.sort(names);
+
+ MapFile.Reader[] parts = new MapFile.Reader[names.length];
+ for (int i = 0; i < names.length; i++) {
+ parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
+ }
+ return parts;
+ }
+
+ /** Get an entry from output generated by this class. */
+ public static <K extends WritableComparable<?>, V extends Writable>
+ Writable getEntry(MapFile.Reader[] readers,
+ Partitioner<K, V> partitioner, K key, V value) throws IOException {
+ int part = partitioner.getPartition(key, value, readers.length);
+ return readers[part].get(key, value);
+ }
+}
+
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java Fri Aug 7 11:54:25 2009
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+/**********************************************************
+ * MapredLoadTest generates a bunch of work that exercises
+ * a Hadoop Map-Reduce system (and DFS, too). It goes through
+ * the following steps:
+ *
+ * 1) Take inputs 'range' and 'counts'.
+ * 2) Generate 'counts' random integers between 0 and range-1.
+ * 3) Create a file that lists each integer between 0 and range-1,
+ * and lists the number of times that integer was generated.
+ * 4) Emit a (very large) file that contains all the integers
+ * in the order generated.
+ * 5) After the file has been generated, read it back and count
+ * how many times each int was generated.
+ * 6) Compare this big count-map against the original one. If
+ * they match, then SUCCESS! Otherwise, FAILURE!
+ *
+ * OK, that's how we can think about it. What are the map-reduce
+ * steps that get the job done?
+ *
+ * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
+ * 2) In a non-mapread thread, generate the answer-key and write to disk.
+ * 3) In a mapred job, divide the answer key into K jobs.
+ * 4) A mapred 'generator' task consists of K map jobs. Each reads
+ * an individual "sub-key", and generates integers according to
+ * to it (though with a random ordering).
+ * 5) The generator's reduce task agglomerates all of those files
+ * into a single one.
+ * 6) A mapred 'reader' task consists of M map jobs. The output
+ * file is cut into M pieces. Each of the M jobs counts the
+ * individual ints in its chunk and creates a map of all seen ints.
+ * 7) A mapred job integrates all the count files into a single one.
+ *
+ **********************************************************/
+public class TestMapReduce extends TestCase {
+
+ private static FileSystem fs;
+
+ static {
+ try {
+ fs = FileSystem.getLocal(new Configuration());
+ } catch (IOException ioe) {
+ fs = null;
+ }
+ }
+
+ /**
+ * Modified to make it a junit test.
+ * The RandomGen Job does the actual work of creating
+ * a huge file of assorted numbers. It receives instructions
+ * as to how many times each number should be counted. Then
+ * it emits those numbers in a crazy order.
+ *
+ * The map() function takes a key/val pair that describes
+ * a value-to-be-emitted (the key) and how many times it
+ * should be emitted (the value), aka "numtimes". map() then
+ * emits a series of intermediate key/val pairs. It emits
+ * 'numtimes' of these. The key is a random number and the
+ * value is the 'value-to-be-emitted'.
+ *
+ * The system collates and merges these pairs according to
+ * the random number. reduce() function takes in a key/value
+ * pair that consists of a crazy random number and a series
+ * of values that should be emitted. The random number key
+ * is now dropped, and reduce() emits a pair for every intermediate value.
+ * The emitted key is an intermediate value. The emitted value
+ * is just a blank string. Thus, we've created a huge file
+ * of numbers in random order, but where each number appears
+ * as many times as we were instructed.
+ */
+ static class RandomGenMapper
+ extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+ public void map(IntWritable key, IntWritable val,
+ Context context) throws IOException, InterruptedException {
+ int randomVal = key.get();
+ int randomCount = val.get();
+
+ for (int i = 0; i < randomCount; i++) {
+ context.write(new IntWritable(Math.abs(r.nextInt())),
+ new IntWritable(randomVal));
+ }
+ }
+ }
+ /**
+ */
+ static class RandomGenReducer
+ extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+ public void reduce(IntWritable key, Iterable<IntWritable> it,
+ Context context) throws IOException, InterruptedException {
+ for (IntWritable iw : it) {
+ context.write(iw, null);
+ }
+ }
+ }
+
+ /**
+ * The RandomCheck Job does a lot of our work. It takes
+ * in a num/string keyspace, and transforms it into a
+ * key/count(int) keyspace.
+ *
+ * The map() function just emits a num/1 pair for every
+ * num/string input pair.
+ *
+ * The reduce() function sums up all the 1s that were
+ * emitted for a single key. It then emits the key/total
+ * pair.
+ *
+ * This is used to regenerate the random number "answer key".
+ * Each key here is a random number, and the count is the
+ * number of times the number was emitted.
+ */
+ static class RandomCheckMapper
+ extends Mapper<WritableComparable<?>, Text, IntWritable, IntWritable> {
+
+ public void map(WritableComparable<?> key, Text val,
+ Context context) throws IOException, InterruptedException {
+ context.write(new IntWritable(
+ Integer.parseInt(val.toString().trim())), new IntWritable(1));
+ }
+ }
+
+ /**
+ */
+ static class RandomCheckReducer
+ extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+ public void reduce(IntWritable key, Iterable<IntWritable> it,
+ Context context) throws IOException, InterruptedException {
+ int keyint = key.get();
+ int count = 0;
+ for (IntWritable iw : it) {
+ count++;
+ }
+ context.write(new IntWritable(keyint), new IntWritable(count));
+ }
+ }
+
+ /**
+ * The Merge Job is a really simple one. It takes in
+ * an int/int key-value set, and emits the same set.
+ * But it merges identical keys by adding their values.
+ *
+ * Thus, the map() function is just the identity function
+ * and reduce() just sums. Nothing to see here!
+ */
+ static class MergeMapper
+ extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+ public void map(IntWritable key, IntWritable val,
+ Context context) throws IOException, InterruptedException {
+ int keyint = key.get();
+ int valint = val.get();
+ context.write(new IntWritable(keyint), new IntWritable(valint));
+ }
+ }
+
+ static class MergeReducer
+ extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+ public void reduce(IntWritable key, Iterator<IntWritable> it,
+ Context context) throws IOException, InterruptedException {
+ int keyint = key.get();
+ int total = 0;
+ while (it.hasNext()) {
+ total += it.next().get();
+ }
+ context.write(new IntWritable(keyint), new IntWritable(total));
+ }
+ }
+
+ private static int range = 10;
+ private static int counts = 100;
+ private static Random r = new Random();
+
+ public void testMapred() throws Exception {
+ launch();
+ }
+
+ private static void launch() throws Exception {
+ //
+ // Generate distribution of ints. This is the answer key.
+ //
+ Configuration conf = new Configuration();
+ int countsToGo = counts;
+ int dist[] = new int[range];
+ for (int i = 0; i < range; i++) {
+ double avgInts = (1.0 * countsToGo) / (range - i);
+ dist[i] = (int) Math.max(0, Math.round(avgInts +
+ (Math.sqrt(avgInts) * r.nextGaussian())));
+ countsToGo -= dist[i];
+ }
+ if (countsToGo > 0) {
+ dist[dist.length-1] += countsToGo;
+ }
+
+ //
+ // Write the answer key to a file.
+ //
+ Path testdir = new Path("mapred.loadtest");
+ if (!fs.mkdirs(testdir)) {
+ throw new IOException("Mkdirs failed to create " + testdir.toString());
+ }
+
+ Path randomIns = new Path(testdir, "genins");
+ if (!fs.mkdirs(randomIns)) {
+ throw new IOException("Mkdirs failed to create " + randomIns.toString());
+ }
+
+ Path answerkey = new Path(randomIns, "answer.key");
+ SequenceFile.Writer out =
+ SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
+ IntWritable.class,
+ SequenceFile.CompressionType.NONE);
+ try {
+ for (int i = 0; i < range; i++) {
+ out.append(new IntWritable(i), new IntWritable(dist[i]));
+ }
+ } finally {
+ out.close();
+ }
+
+ printFiles(randomIns, conf);
+
+ //
+ // Now we need to generate the random numbers according to
+ // the above distribution.
+ //
+ // We create a lot of map tasks, each of which takes at least
+ // one "line" of the distribution. (That is, a certain number
+ // X is to be generated Y number of times.)
+ //
+ // A map task emits Y key/val pairs. The val is X. The key
+ // is a randomly-generated number.
+ //
+ // The reduce task gets its input sorted by key. That is, sorted
+ // in random order. It then emits a single line of text that
+ // for the given values. It does not emit the key.
+ //
+ // Because there's just one reduce task, we emit a single big
+ // file of random numbers.
+ //
+ Path randomOuts = new Path(testdir, "genouts");
+ fs.delete(randomOuts, true);
+
+
+ Job genJob = new Job(conf);
+ FileInputFormat.setInputPaths(genJob, randomIns);
+ genJob.setInputFormatClass(SequenceFileInputFormat.class);
+ genJob.setMapperClass(RandomGenMapper.class);
+
+ FileOutputFormat.setOutputPath(genJob, randomOuts);
+ genJob.setOutputKeyClass(IntWritable.class);
+ genJob.setOutputValueClass(IntWritable.class);
+ genJob.setReducerClass(RandomGenReducer.class);
+ genJob.setNumReduceTasks(1);
+
+ genJob.waitForCompletion(true);
+ printFiles(randomOuts, conf);
+
+ //
+ // Next, we read the big file in and regenerate the
+ // original map. It's split into a number of parts.
+ // (That number is 'intermediateReduces'.)
+ //
+ // We have many map tasks, each of which read at least one
+ // of the output numbers. For each number read in, the
+ // map task emits a key/value pair where the key is the
+ // number and the value is "1".
+ //
+ // We have a single reduce task, which receives its input
+ // sorted by the key emitted above. For each key, there will
+ // be a certain number of "1" values. The reduce task sums
+ // these values to compute how many times the given key was
+ // emitted.
+ //
+ // The reduce task then emits a key/val pair where the key
+ // is the number in question, and the value is the number of
+ // times the key was emitted. This is the same format as the
+ // original answer key (except that numbers emitted zero times
+ // will not appear in the regenerated key.) The answer set
+ // is split into a number of pieces. A final MapReduce job
+ // will merge them.
+ //
+ // There's not really a need to go to 10 reduces here
+ // instead of 1. But we want to test what happens when
+ // you have multiple reduces at once.
+ //
+ int intermediateReduces = 10;
+ Path intermediateOuts = new Path(testdir, "intermediateouts");
+ fs.delete(intermediateOuts, true);
+ Job checkJob = new Job(conf);
+ FileInputFormat.setInputPaths(checkJob, randomOuts);
+ checkJob.setMapperClass(RandomCheckMapper.class);
+
+ FileOutputFormat.setOutputPath(checkJob, intermediateOuts);
+ checkJob.setOutputKeyClass(IntWritable.class);
+ checkJob.setOutputValueClass(IntWritable.class);
+ checkJob.setOutputFormatClass(MapFileOutputFormat.class);
+ checkJob.setReducerClass(RandomCheckReducer.class);
+ checkJob.setNumReduceTasks(intermediateReduces);
+ checkJob.waitForCompletion(true);
+ printFiles(intermediateOuts, conf);
+
+ //
+ // OK, now we take the output from the last job and
+ // merge it down to a single file. The map() and reduce()
+ // functions don't really do anything except reemit tuples.
+ // But by having a single reduce task here, we end up merging
+ // all the files.
+ //
+ Path finalOuts = new Path(testdir, "finalouts");
+ fs.delete(finalOuts, true);
+ Job mergeJob = new Job(conf);
+ FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
+ mergeJob.setInputFormatClass(SequenceFileInputFormat.class);
+ mergeJob.setMapperClass(MergeMapper.class);
+
+ FileOutputFormat.setOutputPath(mergeJob, finalOuts);
+ mergeJob.setOutputKeyClass(IntWritable.class);
+ mergeJob.setOutputValueClass(IntWritable.class);
+ mergeJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+ mergeJob.setReducerClass(MergeReducer.class);
+ mergeJob.setNumReduceTasks(1);
+
+ mergeJob.waitForCompletion(true);
+ printFiles(finalOuts, conf);
+
+ //
+ // Finally, we compare the reconstructed answer key with the
+ // original one. Remember, we need to ignore zero-count items
+ // in the original key.
+ //
+ boolean success = true;
+ Path recomputedkey = new Path(finalOuts, "part-r-00000");
+ SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
+ int totalseen = 0;
+ try {
+ IntWritable key = new IntWritable();
+ IntWritable val = new IntWritable();
+ for (int i = 0; i < range; i++) {
+ if (dist[i] == 0) {
+ continue;
+ }
+ if (!in.next(key, val)) {
+ System.err.println("Cannot read entry " + i);
+ success = false;
+ break;
+ } else {
+ if (!((key.get() == i) && (val.get() == dist[i]))) {
+ System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i +
+ ", val=" + val.get() + ", dist[i]=" + dist[i]);
+ success = false;
+ }
+ totalseen += val.get();
+ }
+ }
+ if (success) {
+ if (in.next(key, val)) {
+ System.err.println("Unnecessary lines in recomputed key!");
+ success = false;
+ }
+ }
+ } finally {
+ in.close();
+ }
+ int originalTotal = 0;
+ for (int i = 0; i < dist.length; i++) {
+ originalTotal += dist[i];
+ }
+ System.out.println("Original sum: " + originalTotal);
+ System.out.println("Recomputed sum: " + totalseen);
+
+ //
+ // Write to "results" whether the test succeeded or not.
+ //
+ Path resultFile = new Path(testdir, "results");
+ BufferedWriter bw = new BufferedWriter(
+ new OutputStreamWriter(fs.create(resultFile)));
+ try {
+ bw.write("Success=" + success + "\n");
+ System.out.println("Success=" + success);
+ } finally {
+ bw.close();
+ }
+ assertTrue("testMapRed failed", success);
+ fs.delete(testdir, true);
+ }
+
+ private static void printTextFile(FileSystem fs, Path p) throws IOException {
+ BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p)));
+ String line;
+ while ((line = in.readLine()) != null) {
+ System.out.println(" Row: " + line);
+ }
+ in.close();
+ }
+
+ private static void printSequenceFile(FileSystem fs, Path p,
+ Configuration conf) throws IOException {
+ SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
+ Object key = null;
+ Object value = null;
+ while ((key = r.next(key)) != null) {
+ value = r.getCurrentValue(value);
+ System.out.println(" Row: " + key + ", " + value);
+ }
+ r.close();
+ }
+
+ private static boolean isSequenceFile(FileSystem fs,
+ Path f) throws IOException {
+ DataInputStream in = fs.open(f);
+ byte[] seq = "SEQ".getBytes();
+ for(int i=0; i < seq.length; ++i) {
+ if (seq[i] != in.read()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void printFiles(Path dir,
+ Configuration conf) throws IOException {
+ FileSystem fs = dir.getFileSystem(conf);
+ for(FileStatus f: fs.listStatus(dir)) {
+ System.out.println("Reading " + f.getPath() + ": ");
+ if (f.isDir()) {
+ System.out.println(" it is a map file.");
+ printSequenceFile(fs, new Path(f.getPath(), "data"), conf);
+ } else if (isSequenceFile(fs, f.getPath())) {
+ System.out.println(" it is a sequence file.");
+ printSequenceFile(fs, f.getPath(), conf);
+ } else {
+ System.out.println(" it is a text file.");
+ printTextFile(fs, f.getPath());
+ }
+ }
+ }
+
+ /**
+ * Launches all the tasks in order.
+ */
+ public static void main(String[] argv) throws Exception {
+ if (argv.length < 2) {
+ System.err.println("Usage: TestMapReduce <range> <counts>");
+ System.err.println();
+ System.err.println("Note: a good test will have a <counts> value" +
+ " that is substantially larger than the <range>");
+ return;
+ }
+
+ int i = 0;
+ range = Integer.parseInt(argv[i++]);
+ counts = Integer.parseInt(argv[i++]);
+ launch();
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Aug 7 11:54:25 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+
+public class TestNLineInputFormat extends TestCase {
+ private static int MAX_LENGTH = 200;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem localFs = null;
+
+ static {
+ try {
+ localFs = FileSystem.getLocal(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("init failure", e);
+ }
+ }
+
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ "TestNLineInputFormat");
+
+ public void testFormat() throws Exception {
+ Job job = new Job(conf);
+ Path file = new Path(workDir, "test.txt");
+
+ int seed = new Random().nextInt();
+ Random random = new Random(seed);
+
+ localFs.delete(workDir, true);
+ FileInputFormat.setInputPaths(job, workDir);
+ int numLinesPerMap = 5;
+ NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
+ // for a variety of lengths
+ for (int length = 0; length < MAX_LENGTH;
+ length += random.nextInt(MAX_LENGTH / 10) + 1) {
+ // create a file with length entries
+ Writer writer = new OutputStreamWriter(localFs.create(file));
+ try {
+ for (int i = 0; i < length; i++) {
+ writer.write(Integer.toString(i));
+ writer.write("\n");
+ }
+ } finally {
+ writer.close();
+ }
+ checkFormat(job, numLinesPerMap);
+ }
+ }
+
+ void checkFormat(Job job, int expectedN)
+ throws IOException, InterruptedException {
+ NLineInputFormat format = new NLineInputFormat();
+ List<InputSplit> splits = format.getSplits(job);
+ // check all splits except last one
+ int count = 0;
+ for (int i = 0; i < splits.size() -1; i++) {
+ assertEquals("There are no split locations", 0,
+ splits.get(i).getLocations().length);
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ RecordReader<LongWritable, Text> reader = format.createRecordReader(
+ splits.get(i), context);
+ Class<?> clazz = reader.getClass();
+ assertEquals("reader class is LineRecordReader.",
+ LineRecordReader.class, clazz);
+ MapContext<LongWritable, Text, LongWritable, Text> mcontext =
+ new MapContext<LongWritable, Text, LongWritable, Text>(
+ job.getConfiguration(), context.getTaskAttemptID(), reader, null,
+ null, MapReduceTestUtil.createDummyReporter(), splits.get(i));
+ reader.initialize(splits.get(i), mcontext);
+
+ try {
+ count = 0;
+ while (reader.nextKeyValue()) {
+ count++;
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals("number of lines in split is " + expectedN ,
+ expectedN, count);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ new TestNLineInputFormat().testFormat();
+ }
+}