You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/08/07 13:54:25 UTC

svn commit: r801959 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/java/org/apache/hadoop/mapreduce/lib/output/ src/test/mapred/org/apache/h...

Author: ddas
Date: Fri Aug  7 11:54:25 2009
New Revision: 801959

URL: http://svn.apache.org/viewvc?rev=801959&view=rev
Log:
MAPREDUCE-375. Change org.apache.hadoop.mapred.lib.NLineInputFormat and org.apache.hadoop.mapred.MapFileOutputFormat to use new api. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug  7 11:54:25 2009
@@ -171,6 +171,10 @@
     MAPREDUCE-670. Creates ant target for 10 mins patch test build.
     (Jothi Padmanabhan via gkesavan)
 
+    MAPREDUCE-375. Change org.apache.hadoop.mapred.lib.NLineInputFormat 
+    and org.apache.hadoop.mapred.MapFileOutputFormat to use new api.
+    (Amareshwari Sriramadasu via ddas)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java Fri Aug  7 11:54:25 2009
@@ -22,7 +22,6 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.fs.Path;
 
 /** A section of an input file.  Returned by {@link
@@ -34,12 +33,10 @@
 @Deprecated
 public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit 
                        implements InputSplit {
-  private Path file;
-  private long start;
-  private long length;
-  private String[] hosts;
-  
-  FileSplit() {}
+  org.apache.hadoop.mapreduce.lib.input.FileSplit fs; 
+  FileSplit() {
+    fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
+  }
 
   /** Constructs a split.
    * @deprecated
@@ -60,45 +57,38 @@
    * @param hosts the list of hosts containing the block, possibly null
    */
   public FileSplit(Path file, long start, long length, String[] hosts) {
-    this.file = file;
-    this.start = start;
-    this.length = length;
-    this.hosts = hosts;
+    fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
+           length, hosts);
+  }
+  
+  public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
+    this.fs = fs;
   }
 
   /** The file containing this split's data. */
-  public Path getPath() { return file; }
+  public Path getPath() { return fs.getPath(); }
   
   /** The position of the first byte in the file to process. */
-  public long getStart() { return start; }
+  public long getStart() { return fs.getStart(); }
   
   /** The number of bytes in the file to process. */
-  public long getLength() { return length; }
+  public long getLength() { return fs.getLength(); }
 
-  public String toString() { return file + ":" + start + "+" + length; }
+  public String toString() { return fs.toString(); }
 
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
 
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, file.toString());
-    out.writeLong(start);
-    out.writeLong(length);
+    fs.write(out);
   }
   public void readFields(DataInput in) throws IOException {
-    file = new Path(UTF8.readString(in));
-    start = in.readLong();
-    length = in.readLong();
-    hosts = null;
+    fs.readFields(in);
   }
 
   public String[] getLocations() throws IOException {
-    if (this.hosts == null) {
-      return new String[]{};
-    } else {
-      return this.hosts;
-    }
+    return fs.getLocations();
   }
   
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Fri Aug  7 11:54:25 2009
@@ -19,11 +19,9 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.Arrays;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileUtil;
 
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.WritableComparable;
@@ -35,7 +33,11 @@
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-/** An {@link OutputFormat} that writes {@link MapFile}s. */
+/** An {@link OutputFormat} that writes {@link MapFile}s.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat} instead
+ */
+@Deprecated
 public class MapFileOutputFormat 
 extends FileOutputFormat<WritableComparable, Writable> {
 
@@ -81,18 +83,9 @@
   /** Open the output generated by this format. */
   public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
                                             Configuration conf)
-    throws IOException {
-    FileSystem fs = dir.getFileSystem(conf);
-    Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
-
-    // sort names, so that hash partitioning works
-    Arrays.sort(names);
-    
-    MapFile.Reader[] parts = new MapFile.Reader[names.length];
-    for (int i = 0; i < names.length; i++) {
-      parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
-    }
-    return parts;
+      throws IOException {
+    return org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat.
+      getReaders(dir, conf);
   }
     
   /** Get an entry from output generated by this class. */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java Fri Aug  7 11:54:25 2009
@@ -21,10 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -35,7 +32,6 @@
 import org.apache.hadoop.mapred.LineRecordReader;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.LineReader;
 
 /**
  * NLineInputFormat which splits N lines of input as one split.
@@ -54,8 +50,10 @@
  * a value to one map task, and key is the offset.
  * i.e. (k,v) is (LongWritable, Text).
  * The location hints will span the whole mapred cluster.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.NLineInputFormat} instead
  */
-
+@Deprecated
 public class NLineInputFormat extends FileInputFormat<LongWritable, Text> 
                               implements JobConfigurable { 
   private int N = 1;
@@ -79,46 +77,10 @@
   throws IOException {
     ArrayList<FileSplit> splits = new ArrayList<FileSplit>();
     for (FileStatus status : listStatus(job)) {
-      Path fileName = status.getPath();
-      if (status.isDir()) {
-        throw new IOException("Not a file: " + fileName);
-      }
-      FileSystem  fs = fileName.getFileSystem(job);
-      LineReader lr = null;
-      try {
-        FSDataInputStream in  = fs.open(fileName);
-        lr = new LineReader(in, job);
-        Text line = new Text();
-        int numLines = 0;
-        long begin = 0;
-        long length = 0;
-        int num = -1;
-        while ((num = lr.readLine(line)) > 0) {
-          numLines++;
-          length += num;
-          if (numLines == N) {
-            // NLineInputFormat uses LineRecordReader, which always reads (and consumes) 
-            //at least one character out of its upper split boundary. So to make sure that
-            //each mapper gets N lines, we move back the upper split limits of each split 
-            //by one character here.
-            if (begin == 0) {
-              splits.add(new FileSplit(fileName, begin, length - 1, new String[] {}));
-            } else {
-              splits.add(new FileSplit(fileName, begin - 1, length, new String[] {}));
-            }
-            begin += length;
-            length = 0;
-            numLines = 0;
-          }
-        }
-        if (numLines != 0) {
-          splits.add(new FileSplit(fileName, begin, length, new String[]{}));
-        }
-   
-      } finally {
-        if (lr != null) {
-          lr.close();
-        }
+      for (org.apache.hadoop.mapreduce.lib.input.FileSplit split : 
+          org.apache.hadoop.mapreduce.lib.input.
+          NLineInputFormat.getSplitsForFile(status, job, N)) {
+        splits.add(new FileSplit(split));
       }
     }
     return splits.toArray(new FileSplit[splits.size()]);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=801959&r1=801958&r2=801959&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Fri Aug  7 11:54:25 2009
@@ -38,7 +38,7 @@
   private long length;
   private String[] hosts;
 
-  FileSplit() {}
+  public FileSplit() {}
 
   /** Constructs a split with host information
    *

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/NLineInputFormat.java Fri Aug  7 11:54:25 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * NLineInputFormat which splits N lines of input as one split.
+ *
+ * In many "pleasantly" parallel applications, each process/mapper 
+ * processes the same input file (s), but with computations are 
+ * controlled by different parameters.(Referred to as "parameter sweeps").
+ * One way to achieve this, is to specify a set of parameters 
+ * (one set per line) as input in a control file 
+ * (which is the input path to the map-reduce application,
+ * where as the input dataset is specified 
+ * via a config variable in JobConf.).
+ * 
+ * The NLineInputFormat can be used in such applications, that splits 
+ * the input file such that by default, one line is fed as
+ * a value to one map task, and key is the offset.
+ * i.e. (k,v) is (LongWritable, Text).
+ * The location hints will span the whole mapred cluster.
+ */
+
+public class NLineInputFormat extends FileInputFormat<LongWritable, Text> { 
+
+  public RecordReader<LongWritable, Text> createRecordReader(
+      InputSplit genericSplit, TaskAttemptContext context) 
+      throws IOException {
+    context.setStatus(genericSplit.toString());
+    return new LineRecordReader();
+  }
+
+  /** 
+   * Logically splits the set of input files for the job, splits N lines
+   * of the input as one split.
+   * 
+   * @see FileInputFormat#getSplits(JobContext)
+   */
+  public List<InputSplit> getSplits(JobContext job)
+  throws IOException {
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    int numLinesPerSplit = getNumLinesPerSplit(job);
+    for (FileStatus status : listStatus(job)) {
+      splits.addAll(getSplitsForFile(status,
+        job.getConfiguration(), numLinesPerSplit));
+    }
+    return splits;
+  }
+  
+  public static List<FileSplit> getSplitsForFile(FileStatus status,
+      Configuration conf, int numLinesPerSplit) throws IOException {
+    List<FileSplit> splits = new ArrayList<FileSplit> ();
+    Path fileName = status.getPath();
+    if (status.isDir()) {
+      throw new IOException("Not a file: " + fileName);
+    }
+    FileSystem  fs = fileName.getFileSystem(conf);
+    LineReader lr = null;
+    try {
+      FSDataInputStream in  = fs.open(fileName);
+      lr = new LineReader(in, conf);
+      Text line = new Text();
+      int numLines = 0;
+      long begin = 0;
+      long length = 0;
+      int num = -1;
+      while ((num = lr.readLine(line)) > 0) {
+        numLines++;
+        length += num;
+        if (numLines == numLinesPerSplit) {
+          // NLineInputFormat uses LineRecordReader, which always reads
+          // (and consumes) at least one character out of its upper split
+          // boundary. So to make sure that each mapper gets N lines, we
+          // move back the upper split limits of each split 
+          // by one character here.
+          if (begin == 0) {
+            splits.add(new FileSplit(fileName, begin, length - 1,
+              new String[] {}));
+          } else {
+            splits.add(new FileSplit(fileName, begin - 1, length,
+              new String[] {}));
+          }
+          begin += length;
+          length = 0;
+          numLines = 0;
+        }
+      }
+      if (numLines != 0) {
+        splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+      }
+    } finally {
+      if (lr != null) {
+        lr.close();
+      }
+    }
+    return splits; 
+  }
+  
+  /**
+   * Set the number of lines per split
+   * @param job the job to modify
+   * @param numLines the number of lines per split
+   */
+  public static void setNumLinesPerSplit(Job job, int numLines) {
+    job.getConfiguration().setInt(
+      "mapred.line.input.format.linespermap", numLines);
+  }
+
+  /**
+   * Get the number of lines per split
+   * @param job the job
+   * @return the number of lines per split
+   */
+  public static int getNumLinesPerSplit(JobContext job) {
+    return job.getConfiguration().getInt(
+      "mapred.line.input.format.linespermap", 1);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java Fri Aug  7 11:54:25 2009
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileUtil;
+
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/** 
+ * An {@link org.apache.hadoop.mapreduce.OutputFormat} that writes 
+ * {@link MapFile}s.
+ */
+public class MapFileOutputFormat 
+    extends FileOutputFormat<WritableComparable<?>, Writable> {
+
+  public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
+      TaskAttemptContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    CompressionCodec codec = null;
+    CompressionType compressionType = CompressionType.NONE;
+    if (getCompressOutput(context)) {
+      // find the kind of compression to do
+      compressionType = SequenceFileOutputFormat.getOutputCompressionType(context);
+
+      // find the right codec
+      Class<?> codecClass = getOutputCompressorClass(context,
+	                          DefaultCodec.class);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+    }
+
+    Path file = getDefaultWorkFile(context, "");
+    FileSystem fs = file.getFileSystem(conf);
+    // ignore the progress parameter, since MapFile is local
+    final MapFile.Writer out =
+      new MapFile.Writer(conf, fs, file.toString(),
+        context.getOutputKeyClass().asSubclass(WritableComparable.class),
+        context.getOutputValueClass().asSubclass(Writable.class),
+        compressionType, codec, context);
+
+    return new RecordWriter<WritableComparable<?>, Writable>() {
+        public void write(WritableComparable<?> key, Writable value)
+            throws IOException {
+          out.append(key, value);
+        }
+
+        public void close(TaskAttemptContext context) throws IOException { 
+          out.close();
+        }
+      };
+  }
+
+  /** Open the output generated by this format. */
+  public static MapFile.Reader[] getReaders(Path dir,
+      Configuration conf) throws IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    Path[] names = FileUtil.stat2Paths(fs.listStatus(dir));
+
+    // sort names, so that hash partitioning works
+    Arrays.sort(names);
+    
+    MapFile.Reader[] parts = new MapFile.Reader[names.length];
+    for (int i = 0; i < names.length; i++) {
+      parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
+    }
+    return parts;
+  }
+    
+  /** Get an entry from output generated by this class. */
+  public static <K extends WritableComparable<?>, V extends Writable>
+      Writable getEntry(MapFile.Reader[] readers, 
+      Partitioner<K, V> partitioner, K key, V value) throws IOException {
+    int part = partitioner.getPartition(key, value, readers.length);
+    return readers[part].get(key, value);
+  }
+}
+

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduce.java Fri Aug  7 11:54:25 2009
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+/**********************************************************
+ * MapredLoadTest generates a bunch of work that exercises
+ * a Hadoop Map-Reduce system (and DFS, too).  It goes through
+ * the following steps:
+ *
+ * 1) Take inputs 'range' and 'counts'.
+ * 2) Generate 'counts' random integers between 0 and range-1.
+ * 3) Create a file that lists each integer between 0 and range-1,
+ *    and lists the number of times that integer was generated.
+ * 4) Emit a (very large) file that contains all the integers
+ *    in the order generated.
+ * 5) After the file has been generated, read it back and count
+ *    how many times each int was generated.
+ * 6) Compare this big count-map against the original one.  If
+ *    they match, then SUCCESS!  Otherwise, FAILURE!
+ *
+ * OK, that's how we can think about it.  What are the map-reduce
+ * steps that get the job done?
+ *
+ * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
+ * 2) In a non-mapread thread, generate the answer-key and write to disk.
+ * 3) In a mapred job, divide the answer key into K jobs.
+ * 4) A mapred 'generator' task consists of K map jobs.  Each reads
+ *    an individual "sub-key", and generates integers according to
+ *    to it (though with a random ordering).
+ * 5) The generator's reduce task agglomerates all of those files
+ *    into a single one.
+ * 6) A mapred 'reader' task consists of M map jobs.  The output
+ *    file is cut into M pieces. Each of the M jobs counts the 
+ *    individual ints in its chunk and creates a map of all seen ints.
+ * 7) A mapred job integrates all the count files into a single one.
+ *
+ **********************************************************/
+public class TestMapReduce extends TestCase {
+  
+  private static FileSystem fs;
+  
+  static {
+    try {
+       fs = FileSystem.getLocal(new Configuration());
+    } catch (IOException ioe) {
+      fs = null;
+    }
+  }
+  
+  /**
+   * Modified to make it a junit test.
+   * The RandomGen Job does the actual work of creating
+   * a huge file of assorted numbers.  It receives instructions
+   * as to how many times each number should be counted.  Then
+   * it emits those numbers in a crazy order.
+   *
+   * The map() function takes a key/val pair that describes
+   * a value-to-be-emitted (the key) and how many times it 
+   * should be emitted (the value), aka "numtimes".  map() then
+   * emits a series of intermediate key/val pairs.  It emits
+   * 'numtimes' of these.  The key is a random number and the
+   * value is the 'value-to-be-emitted'.
+   *
+   * The system collates and merges these pairs according to
+   * the random number.  reduce() function takes in a key/value
+   * pair that consists of a crazy random number and a series
+   * of values that should be emitted.  The random number key
+   * is now dropped, and reduce() emits a pair for every intermediate value.
+   * The emitted key is an intermediate value.  The emitted value
+   * is just a blank string.  Thus, we've created a huge file
+   * of numbers in random order, but where each number appears
+   * as many times as we were instructed.
+   */
+  static class RandomGenMapper
+      extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+    
+    public void map(IntWritable key, IntWritable val,
+        Context context) throws IOException, InterruptedException {
+      int randomVal = key.get();
+      int randomCount = val.get();
+
+      for (int i = 0; i < randomCount; i++) {
+        context.write(new IntWritable(Math.abs(r.nextInt())),
+          new IntWritable(randomVal));
+      }
+    }
+  }
+  /**
+   */
+  static class RandomGenReducer
+      extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+    
+    public void reduce(IntWritable key, Iterable<IntWritable> it,
+        Context context) throws IOException, InterruptedException {
+      for (IntWritable iw : it) {
+        context.write(iw, null);
+      }
+    }
+  }
+
+  /**
+   * The RandomCheck Job does a lot of our work.  It takes
+   * in a num/string keyspace, and transforms it into a
+   * key/count(int) keyspace.
+   *
+   * The map() function just emits a num/1 pair for every
+   * num/string input pair.
+   *
+   * The reduce() function sums up all the 1s that were
+   * emitted for a single key.  It then emits the key/total
+   * pair.
+   *
+   * This is used to regenerate the random number "answer key".
+   * Each key here is a random number, and the count is the
+   * number of times the number was emitted.
+   */
+  static class RandomCheckMapper
+      extends Mapper<WritableComparable<?>, Text, IntWritable, IntWritable> {
+    
+    public void map(WritableComparable<?> key, Text val,
+        Context context) throws IOException, InterruptedException {
+      context.write(new IntWritable(
+        Integer.parseInt(val.toString().trim())), new IntWritable(1));
+    }
+  }
+  
+  /**
+   */
+  static class RandomCheckReducer
+      extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+    public void reduce(IntWritable key, Iterable<IntWritable> it,
+        Context context) throws IOException, InterruptedException {
+      int keyint = key.get();
+      int count = 0;
+      for (IntWritable iw : it) {
+        count++;
+      }
+      context.write(new IntWritable(keyint), new IntWritable(count));
+    }
+  }
+
+  /**
+   * The Merge Job is a really simple one.  It takes in
+   * an int/int key-value set, and emits the same set.
+   * But it merges identical keys by adding their values.
+   *
+   * Thus, the map() function is just the identity function
+   * and reduce() just sums.  Nothing to see here!
+   */
+  static class MergeMapper
+      extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
+    
+    public void map(IntWritable key, IntWritable val,
+        Context context) throws IOException, InterruptedException {
+      int keyint = key.get();
+      int valint = val.get();
+      context.write(new IntWritable(keyint), new IntWritable(valint));
+    }
+  }
+  
+  static class MergeReducer
+      extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+    public void reduce(IntWritable key, Iterator<IntWritable> it,
+        Context context) throws IOException, InterruptedException {
+      int keyint = key.get();
+      int total = 0;
+      while (it.hasNext()) {
+        total += it.next().get();
+      }
+      context.write(new IntWritable(keyint), new IntWritable(total));
+    }
+  }
+
+  private static int range = 10;
+  private static int counts = 100;
+  private static Random r = new Random();
+
+  public void testMapred() throws Exception {
+    launch();
+  }
+
+  private static void launch() throws Exception {
+    //
+    // Generate distribution of ints.  This is the answer key.
+    //
+    Configuration conf = new Configuration();
+    int countsToGo = counts;
+    int dist[] = new int[range];
+    for (int i = 0; i < range; i++) {
+      double avgInts = (1.0 * countsToGo) / (range - i);
+      dist[i] = (int) Math.max(0, Math.round(avgInts + 
+        (Math.sqrt(avgInts) * r.nextGaussian())));
+      countsToGo -= dist[i];
+    }
+    if (countsToGo > 0) {
+      dist[dist.length-1] += countsToGo;
+    }
+
+    //
+    // Write the answer key to a file.  
+    //
+    Path testdir = new Path("mapred.loadtest");
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+
+    Path randomIns = new Path(testdir, "genins");
+    if (!fs.mkdirs(randomIns)) {
+      throw new IOException("Mkdirs failed to create " + randomIns.toString());
+    }
+
+    Path answerkey = new Path(randomIns, "answer.key");
+    SequenceFile.Writer out = 
+      SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
+                                IntWritable.class, 
+                                SequenceFile.CompressionType.NONE);
+    try {
+      for (int i = 0; i < range; i++) {
+        out.append(new IntWritable(i), new IntWritable(dist[i]));
+      }
+    } finally {
+      out.close();
+    }
+    
+    printFiles(randomIns, conf);
+
+    //
+    // Now we need to generate the random numbers according to
+    // the above distribution.
+    //
+    // We create a lot of map tasks, each of which takes at least
+    // one "line" of the distribution.  (That is, a certain number
+    // X is to be generated Y number of times.)
+    //
+    // A map task emits Y key/val pairs.  The val is X.  The key
+    // is a randomly-generated number.
+    //
+    // The reduce task gets its input sorted by key.  That is, sorted
+    // in random order.  It then emits a single line of text that
+    // for the given values.  It does not emit the key.
+    //
+    // Because there's just one reduce task, we emit a single big
+    // file of random numbers.
+    //
+    Path randomOuts = new Path(testdir, "genouts");
+    fs.delete(randomOuts, true);
+
+
+    Job genJob = new Job(conf);
+    FileInputFormat.setInputPaths(genJob, randomIns);
+    genJob.setInputFormatClass(SequenceFileInputFormat.class);
+    genJob.setMapperClass(RandomGenMapper.class);
+
+    FileOutputFormat.setOutputPath(genJob, randomOuts);
+    genJob.setOutputKeyClass(IntWritable.class);
+    genJob.setOutputValueClass(IntWritable.class);
+    genJob.setReducerClass(RandomGenReducer.class);
+    genJob.setNumReduceTasks(1);
+
+    genJob.waitForCompletion(true);
+    printFiles(randomOuts, conf);
+
+    //
+    // Next, we read the big file in and regenerate the 
+    // original map.  It's split into a number of parts.
+    // (That number is 'intermediateReduces'.)
+    //
+    // We have many map tasks, each of which read at least one
+    // of the output numbers.  For each number read in, the
+    // map task emits a key/value pair where the key is the
+    // number and the value is "1".
+    //
+    // We have a single reduce task, which receives its input
+    // sorted by the key emitted above.  For each key, there will
+    // be a certain number of "1" values.  The reduce task sums
+    // these values to compute how many times the given key was
+    // emitted.
+    //
+    // The reduce task then emits a key/val pair where the key
+    // is the number in question, and the value is the number of
+    // times the key was emitted.  This is the same format as the
+    // original answer key (except that numbers emitted zero times
+    // will not appear in the regenerated key.)  The answer set
+    // is split into a number of pieces.  A final MapReduce job
+    // will merge them.
+    //
+    // There's not really a need to go to 10 reduces here 
+    // instead of 1.  But we want to test what happens when
+    // you have multiple reduces at once.
+    //
+    int intermediateReduces = 10;
+    Path intermediateOuts = new Path(testdir, "intermediateouts");
+    fs.delete(intermediateOuts, true);
+    Job checkJob = new Job(conf);
+    FileInputFormat.setInputPaths(checkJob, randomOuts);
+    checkJob.setMapperClass(RandomCheckMapper.class);
+
+    FileOutputFormat.setOutputPath(checkJob, intermediateOuts);
+    checkJob.setOutputKeyClass(IntWritable.class);
+    checkJob.setOutputValueClass(IntWritable.class);
+    checkJob.setOutputFormatClass(MapFileOutputFormat.class);
+    checkJob.setReducerClass(RandomCheckReducer.class);
+    checkJob.setNumReduceTasks(intermediateReduces);
+    checkJob.waitForCompletion(true);
+    printFiles(intermediateOuts, conf); 
+
+    //
+    // OK, now we take the output from the last job and
+    // merge it down to a single file.  The map() and reduce()
+    // functions don't really do anything except reemit tuples.
+    // But by having a single reduce task here, we end up merging
+    // all the files.
+    //
+    Path finalOuts = new Path(testdir, "finalouts");
+    fs.delete(finalOuts, true);
+    Job mergeJob = new Job(conf);
+    FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
+    mergeJob.setInputFormatClass(SequenceFileInputFormat.class);
+    mergeJob.setMapperClass(MergeMapper.class);
+        
+    FileOutputFormat.setOutputPath(mergeJob, finalOuts);
+    mergeJob.setOutputKeyClass(IntWritable.class);
+    mergeJob.setOutputValueClass(IntWritable.class);
+    mergeJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+    mergeJob.setReducerClass(MergeReducer.class);
+    mergeJob.setNumReduceTasks(1);
+        
+    mergeJob.waitForCompletion(true);
+    printFiles(finalOuts, conf); 
+ 
+    //
+    // Finally, we compare the reconstructed answer key with the
+    // original one.  Remember, we need to ignore zero-count items
+    // in the original key.
+    //
+    boolean success = true;
+    Path recomputedkey = new Path(finalOuts, "part-r-00000");
+    SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
+    int totalseen = 0;
+    try {
+      IntWritable key = new IntWritable();
+      IntWritable val = new IntWritable();            
+      for (int i = 0; i < range; i++) {
+        if (dist[i] == 0) {
+          continue;
+        }
+        if (!in.next(key, val)) {
+          System.err.println("Cannot read entry " + i);
+          success = false;
+          break;
+        } else {
+          if (!((key.get() == i) && (val.get() == dist[i]))) {
+            System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + 
+                               ", val=" + val.get() + ", dist[i]=" + dist[i]);
+            success = false;
+          }
+          totalseen += val.get();
+        }
+      }
+      if (success) {
+        if (in.next(key, val)) {
+          System.err.println("Unnecessary lines in recomputed key!");
+          success = false;
+        }
+      }
+    } finally {
+      in.close();
+    }
+    int originalTotal = 0;
+    for (int i = 0; i < dist.length; i++) {
+      originalTotal += dist[i];
+    }
+    System.out.println("Original sum: " + originalTotal);
+    System.out.println("Recomputed sum: " + totalseen);
+
+    //
+    // Write to "results" whether the test succeeded or not.
+    //
+    Path resultFile = new Path(testdir, "results");
+    BufferedWriter bw = new BufferedWriter(
+      new OutputStreamWriter(fs.create(resultFile)));
+    try {
+      bw.write("Success=" + success + "\n");
+      System.out.println("Success=" + success);
+    } finally {
+      bw.close();
+    }
+    assertTrue("testMapRed failed", success);
+    fs.delete(testdir, true);
+  }
+
+  private static void printTextFile(FileSystem fs, Path p) throws IOException {
+    BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p)));
+    String line;
+    while ((line = in.readLine()) != null) {
+      System.out.println("  Row: " + line);
+    }
+    in.close();
+  }
+
+  private static void printSequenceFile(FileSystem fs, Path p, 
+      Configuration conf) throws IOException {
+    SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
+    Object key = null;
+    Object value = null;
+    while ((key = r.next(key)) != null) {
+      value = r.getCurrentValue(value);
+      System.out.println("  Row: " + key + ", " + value);
+    }
+    r.close();    
+  }
+
+  private static boolean isSequenceFile(FileSystem fs,
+                                        Path f) throws IOException {
+    DataInputStream in = fs.open(f);
+    byte[] seq = "SEQ".getBytes();
+    for(int i=0; i < seq.length; ++i) {
+      if (seq[i] != in.read()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static void printFiles(Path dir, 
+                                 Configuration conf) throws IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    for(FileStatus f: fs.listStatus(dir)) {
+      System.out.println("Reading " + f.getPath() + ": ");
+      if (f.isDir()) {
+        System.out.println("  it is a map file.");
+        printSequenceFile(fs, new Path(f.getPath(), "data"), conf);
+      } else if (isSequenceFile(fs, f.getPath())) {
+        System.out.println("  it is a sequence file.");
+        printSequenceFile(fs, f.getPath(), conf);
+      } else {
+        System.out.println("  it is a text file.");
+        printTextFile(fs, f.getPath());
+      }
+    }
+  }
+
+  /**
+   * Launches all the tasks in order.
+   */
+  public static void main(String[] argv) throws Exception {
+    if (argv.length < 2) {
+      System.err.println("Usage: TestMapReduce <range> <counts>");
+      System.err.println();
+      System.err.println("Note: a good test will have a <counts> value" +
+        " that is substantially larger than the <range>");
+      return;
+    }
+
+    int i = 0;
+    range = Integer.parseInt(argv[i++]);
+    counts = Integer.parseInt(argv[i++]);
+    launch();
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java?rev=801959&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestNLineInputFormat.java Fri Aug  7 11:54:25 2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+
+public class TestNLineInputFormat extends TestCase {
+  private static int MAX_LENGTH = 200;
+  
+  private static Configuration conf = new Configuration();
+  private static FileSystem localFs = null; 
+
+  static {
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestNLineInputFormat");
+  
+  public void testFormat() throws Exception {
+    Job job = new Job(conf);
+    Path file = new Path(workDir, "test.txt");
+
+    int seed = new Random().nextInt();
+    Random random = new Random(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+    int numLinesPerMap = 5;
+    NLineInputFormat.setNumLinesPerSplit(job, numLinesPerMap);
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH / 10) + 1) {
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+      checkFormat(job, numLinesPerMap);
+    }
+  }
+
+  void checkFormat(Job job, int expectedN) 
+      throws IOException, InterruptedException {
+    NLineInputFormat format = new NLineInputFormat();
+    List<InputSplit> splits = format.getSplits(job);
+    // check all splits except last one
+    int count = 0;
+    for (int i = 0; i < splits.size() -1; i++) {
+      assertEquals("There are no split locations", 0,
+                   splits.get(i).getLocations().length);
+      TaskAttemptContext context = MapReduceTestUtil.
+        createDummyMapTaskAttemptContext(job.getConfiguration());
+      RecordReader<LongWritable, Text> reader = format.createRecordReader(
+        splits.get(i), context);
+      Class<?> clazz = reader.getClass();
+      assertEquals("reader class is LineRecordReader.", 
+        LineRecordReader.class, clazz);
+      MapContext<LongWritable, Text, LongWritable, Text> mcontext = 
+        new MapContext<LongWritable, Text, LongWritable, Text>(
+          job.getConfiguration(), context.getTaskAttemptID(), reader, null,
+          null, MapReduceTestUtil.createDummyReporter(), splits.get(i));
+      reader.initialize(splits.get(i), mcontext);
+         
+      try {
+        count = 0;
+        while (reader.nextKeyValue()) {
+          count++;
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals("number of lines in split is " + expectedN ,
+                   expectedN, count);
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new TestNLineInputFormat().testFormat();
+  }
+}