You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/06/18 23:59:38 UTC

svn commit: r548505 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/io/compress/zlib/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/io/compress/ src/test/or...

Author: cutting
Date: Mon Jun 18 14:59:36 2007
New Revision: 548505

URL: http://svn.apache.org/viewvc?view=rev&rev=548505
Log:
HADOOP-1193.  Pool allocation of compression codecs.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 18 14:59:36 2007
@@ -164,6 +164,11 @@
      files, so that disk space, writability, etc. is considered.
      (Dhruba Borthakur via cutting)
 
+ 52. HADOOP-1193.  Pool allocation of compression codecs.  This
+     eliminates a memory leak that could cause OutOfMemoryException,
+     and also substantially improves performance.
+     (Arun C Murthy via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Jun 18 14:59:36 2007
@@ -21,8 +21,6 @@
 import java.io.*;
 import java.util.*;
 import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import org.apache.commons.logging.*;
@@ -30,6 +28,8 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
@@ -293,7 +293,7 @@
                  Class keyClass, Class valClass, boolean compress, boolean blockCompress,
                  CompressionCodec codec, Metadata metadata)
     throws IOException {
-    if ((codec instanceof GzipCodec) && 
+    if (codec != null && (codec instanceof GzipCodec) && 
         !NativeCodeLoader.isNativeCodeLoaded() && 
         !ZlibFactory.isNativeZlibLoaded()) {
       throw new IllegalArgumentException("SequenceFile doesn't work with " +
@@ -315,25 +315,47 @@
 
   /**
    * Construct the preferred type of 'raw' SequenceFile Writer.
-   * @param out The stream on top which the writer is to be constructed.
+   * @param fs The configured filesystem. 
+   * @param conf The configuration.
+   * @param file The name of the file. 
    * @param keyClass The 'key' type.
    * @param valClass The 'value' type.
    * @param compress Compress data?
    * @param blockCompress Compress blocks?
+   * @param codec The compression codec.
+   * @param progress
+   * @param metadata The metadata of the file.
    * @return Returns the handle to the constructed SequenceFile Writer.
    * @throws IOException
    */
   private static Writer
-    createWriter(Configuration conf, FSDataOutputStream out, 
-                 Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-                 CompressionCodec codec)
-    throws IOException {
-    Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
-                                 blockCompress, codec, new Metadata());
-    return writer;
+  createWriter(FileSystem fs, Configuration conf, Path file, 
+               Class keyClass, Class valClass, 
+               boolean compress, boolean blockCompress,
+               CompressionCodec codec, Progressable progress, Metadata metadata)
+  throws IOException {
+  if (codec != null && (codec instanceof GzipCodec) && 
+      !NativeCodeLoader.isNativeCodeLoaded() && 
+      !ZlibFactory.isNativeZlibLoaded()) {
+    throw new IllegalArgumentException("SequenceFile doesn't work with " +
+                                       "GzipCodec without native-hadoop code!");
   }
 
+  Writer writer = null;
+
+  if (!compress) {
+    writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
+  } else if (compress && !blockCompress) {
+    writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, 
+                                      codec, progress, metadata);
+  } else {
+    writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, 
+                                     codec, progress, metadata);
+  }
   
+  return writer;
+}
+
   /**
    * Construct the preferred type of 'raw' SequenceFile Writer.
    * @param conf The configuration.
@@ -598,8 +620,16 @@
   
   /** Write key/value pairs to a sequence-format file. */
   public static class Writer {
+    /**
+     * A global compressor pool used to save the expensive 
+     * construction/destruction of (possibly native) compression codecs.
+     */
+    private static final CodecPool<Compressor> compressorPool = 
+      new CodecPool<Compressor>();
+    
     Configuration conf;
     FSDataOutputStream out;
+    boolean ownOutputStream = true;
     DataOutputBuffer buffer = new DataOutputBuffer();
 
     Class keyClass;
@@ -610,7 +640,8 @@
     CompressionOutputStream deflateFilter = null;
     DataOutputStream deflateOut = null;
     Metadata metadata = null;
-
+    Compressor compressor = null;
+    
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
@@ -651,6 +682,7 @@
     private Writer(Configuration conf, FSDataOutputStream out, 
                    Class keyClass, Class valClass, Metadata metadata)
       throws IOException {
+      this.ownOutputStream = false;
       init(null, conf, out, keyClass, valClass, false, null, metadata);
       
       initializeFileHeader();
@@ -703,7 +735,11 @@
       this.metadata = metadata;
       if (this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
-        this.deflateFilter = this.codec.createOutputStream(buffer);
+        compressor = compressorPool.getCodec(this.codec.getCompressorType());
+        if (compressor == null) {
+          compressor = this.codec.createCompressor();
+        }
+        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
       }
@@ -727,8 +763,15 @@
     
     /** Close the file. */
     public synchronized void close() throws IOException {
+      compressorPool.returnCodec(compressor);
+
       if (out != null) {
-        out.close();
+        out.flush();
+        
+        // Close the underlying stream iff we own it...
+        if (ownOutputStream) {
+          out.close();
+        }
         out = null;
       }
     }
@@ -849,6 +892,7 @@
     private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
                                  Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
+      this.ownOutputStream = false;
       super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       
       initializeFileHeader();
@@ -967,6 +1011,7 @@
     private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
                                 Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
       throws IOException {
+      this.ownOutputStream = false;
       super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
       init(1000000);
       
@@ -1035,9 +1080,8 @@
     public synchronized void close() throws IOException {
       if (out != null) {
         writeBlock();
-        out.close();
-        out = null;
       }
+      super.close();
     }
 
     public void sync() throws IOException {
@@ -1107,6 +1151,13 @@
   
   /** Reads key/value pairs from a sequence-format file. */
   public static class Reader {
+    /**
+     * A global decompressor pool used to save the expensive 
+     * construction/destruction of (possibly native) decompression codecs.
+     */
+    private static final CodecPool<Decompressor> decompressorPool = 
+      new CodecPool<Decompressor>();
+    
     private Path file;
     private FSDataInputStream in;
     private DataOutputBuffer outBuf = new DataOutputBuffer();
@@ -1142,43 +1193,62 @@
     private DataInputBuffer keyLenBuffer = null;
     private CompressionInputStream keyLenInFilter = null;
     private DataInputStream keyLenIn = null;
+    private Decompressor keyLenDecompressor = null;
     private DataInputBuffer keyBuffer = null;
     private CompressionInputStream keyInFilter = null;
     private DataInputStream keyIn = null;
+    private Decompressor keyDecompressor = null;
 
     private DataInputBuffer valLenBuffer = null;
     private CompressionInputStream valLenInFilter = null;
     private DataInputStream valLenIn = null;
+    private Decompressor valLenDecompressor = null;
     private DataInputBuffer valBuffer = null;
     private CompressionInputStream valInFilter = null;
     private DataInputStream valIn = null;
+    private Decompressor valDecompressor = null;
 
     /** Open the named file. */
     public Reader(FileSystem fs, Path file, Configuration conf)
       throws IOException {
-      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf);
+      this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
     }
 
-    private Reader(FileSystem fs, Path name, int bufferSize,
-                   Configuration conf) throws IOException {
-      this.file = name;
-      this.in = fs.open(file, bufferSize);
-      this.end = fs.getLength(file);
-      this.conf = conf;
-      init();
+    private Reader(FileSystem fs, Path file, int bufferSize,
+                   Configuration conf, boolean tempReader) throws IOException {
+      this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
     }
     
     private Reader(FileSystem fs, Path file, int bufferSize, long start,
-                   long length, Configuration conf) throws IOException {
+                   long length, Configuration conf, boolean tempReader) 
+    throws IOException {
       this.file = file;
       this.in = fs.open(file, bufferSize);
       this.conf = conf;
       seek(start);
       this.end = in.getPos() + length;
-      init();
+      init(tempReader);
+    }
+    
+    private Decompressor getPooledOrNewDecompressor() {
+      Decompressor decompressor = null;
+      decompressor = decompressorPool.getCodec(codec.getDecompressorType());
+      if (decompressor == null) {
+        decompressor = codec.createDecompressor();
+      }
+      return decompressor;
     }
     
-    private void init() throws IOException {
+
+    /**
+     * Initialize the {@link Reader}
+     * @param tmpReader <code>true</code> if we are constructing a temporary
+     *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
+     *                  and hence do not initialize every component; 
+     *                  <code>false</code> otherwise.
+     * @throws IOException
+     */
+    private void init(boolean tempReader) throws IOException {
       byte[] versionBlock = new byte[VERSION.length];
       in.readFully(versionBlock);
 
@@ -1245,33 +1315,48 @@
         in.readFully(sync);                       // read sync bytes
       }
       
-      // Initialize
-      valBuffer = new DataInputBuffer();
-      if (decompress) {
-        valInFilter = this.codec.createInputStream(valBuffer);
-        valIn = new DataInputStream(valInFilter);
-      } else {
-        valIn = valBuffer;
-      }
-      
-      if (blockCompressed) {
-        keyLenBuffer = new DataInputBuffer();
-        keyBuffer = new DataInputBuffer();
-        valLenBuffer = new DataInputBuffer();
-        
-        keyLenInFilter = this.codec.createInputStream(keyLenBuffer);
-        keyLenIn = new DataInputStream(keyLenInFilter);
-
-        keyInFilter = this.codec.createInputStream(keyBuffer);
-        keyIn = new DataInputStream(keyInFilter);
+      // Initialize... *not* if this we are constructing a temporary Reader
+      if (!tempReader) {
+        valBuffer = new DataInputBuffer();
+        if (decompress) {
+          valDecompressor = getPooledOrNewDecompressor();
+          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
+          valIn = new DataInputStream(valInFilter);
+        } else {
+          valIn = valBuffer;
+        }
 
-        valLenInFilter = this.codec.createInputStream(valLenBuffer);
-        valLenIn = new DataInputStream(valLenInFilter);
+        if (blockCompressed) {
+          keyLenBuffer = new DataInputBuffer();
+          keyBuffer = new DataInputBuffer();
+          valLenBuffer = new DataInputBuffer();
+
+          keyLenDecompressor = getPooledOrNewDecompressor();
+          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
+                                                   keyLenDecompressor);
+          keyLenIn = new DataInputStream(keyLenInFilter);
+
+          keyDecompressor = getPooledOrNewDecompressor();
+          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
+          keyIn = new DataInputStream(keyInFilter);
+
+          valLenDecompressor = getPooledOrNewDecompressor();
+          valLenInFilter = codec.createInputStream(valLenBuffer, 
+                                                   valLenDecompressor);
+          valLenIn = new DataInputStream(valLenInFilter);
+        }
       }
     }
     
     /** Close the file. */
     public synchronized void close() throws IOException {
+      // Return the decompressors to the pool
+      decompressorPool.returnCodec(keyLenDecompressor);
+      decompressorPool.returnCodec(keyDecompressor);
+      decompressorPool.returnCodec(valLenDecompressor);
+      decompressorPool.returnCodec(valDecompressor);
+      
+      // Close the input-stream
       in.close();
     }
 
@@ -1755,6 +1840,49 @@
 
   }
 
+  private static class CodecPool<T> {
+
+    private Map<Class, List<T>> pool = new HashMap<Class, List<T>>();
+    
+    public T getCodec(Class codecClass) {
+      T codec = null;
+      
+      // Check if an appropriate codec is available
+      synchronized (pool) {
+        if (pool.containsKey(codecClass)) {
+          List<T> codecList = pool.get(codecClass);
+          
+          if (codecList != null) {
+            synchronized (codecList) {
+              if (!codecList.isEmpty()) {
+                codec = codecList.remove(0);
+              }
+            }
+          }
+        }
+      }
+      
+      return codec;
+    }
+
+    public void returnCodec(T codec) {
+      if (codec != null) {
+        Class codecClass = codec.getClass();
+        synchronized (pool) {
+          if (!pool.containsKey(codecClass)) {
+            pool.put(codecClass, new ArrayList<T>());
+          }
+
+          List<T> codecList = pool.get(codecClass);
+          synchronized (codecList) {
+            codecList.add(codec);
+          }
+        }
+      }
+    }
+
+  }
+  
   /** Sorts key/value pairs in a sequence-format file.
    *
    * <p>For best performance, applications should make sure that the {@link
@@ -1951,7 +2079,7 @@
               }
               continue;
             }
-            //int length = buffer.getLength() - start;
+
             int keyLength = rawKeys.getLength() - keyOffset;
 
             if (count == keyOffsets.length)
@@ -2026,7 +2154,8 @@
 
         long segmentStart = out.getPos();
         Writer writer = createWriter(conf, out, keyClass, valClass, 
-                                     isCompressed, isBlockCompressed, codec);
+                                     isCompressed, isBlockCompressed, codec, 
+                                     new Metadata());
         
         if (!done) {
           writer.sync = null;                     // disable sync on temp files
@@ -2036,14 +2165,12 @@
           int p = pointers[i];
           writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
         }
-        writer.sync();
-        writer.out.flush();
-        
+        writer.close();
         
         if (!done) {
           // Save the segment length
           WritableUtils.writeVLong(indexOut, segmentStart);
-          WritableUtils.writeVLong(indexOut, (writer.out.getPos()-segmentStart));
+          WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
           indexOut.flush();
         }
       }
@@ -2179,24 +2306,6 @@
     /**
      * Clones the attributes (like compression of the input file and creates a 
      * corresponding Writer
-     * @param ignoredFileSys the (ignored) FileSystem object
-     * @param inputFile the path of the input file whose attributes should be 
-     * cloned 
-     * @param outputFile the path of the output file 
-     * @param prog the Progressable to report status during the file write
-     * @return Writer
-     * @throws IOException
-     * @deprecated call  #cloneFileAttributes(Path,Path,Progressable) instead
-     */
-    public Writer cloneFileAttributes(FileSystem ignoredFileSys,
-                                      Path inputFile, Path outputFile, Progressable prog) 
-      throws IOException {
-      return cloneFileAttributes(inputFile, outputFile, prog);
-    }
-
-    /**
-     * Clones the attributes (like compression of the input file and creates a 
-     * corresponding Writer
      * @param inputFile the path of the input file whose attributes should be 
      * cloned
      * @param outputFile the path of the output file 
@@ -2205,24 +2314,19 @@
      * @throws IOException
      */
     public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
-                                      Progressable prog) throws IOException {
+                                      Progressable prog) 
+    throws IOException {
       FileSystem srcFileSys = inputFile.getFileSystem(conf);
-      Reader reader = new Reader(srcFileSys, inputFile, 4096, conf);
+      Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
       boolean compress = reader.isCompressed();
       boolean blockCompress = reader.isBlockCompressed();
       CompressionCodec codec = reader.getCompressionCodec();
       reader.close();
-      
-      FileSystem dstFileSys = outputFile.getFileSystem(conf);
-      FSDataOutputStream out;
-      if (prog != null)
-        out = dstFileSys.create(outputFile, true, 
-                                conf.getInt("io.file.buffer.size", 4096), prog);
-      else
-        out = dstFileSys.create(outputFile, true, 
-                                conf.getInt("io.file.buffer.size", 4096));
-      Writer writer = createWriter(conf, out, keyClass, valClass, compress, 
-                                   blockCompress, codec);
+
+      Writer writer = createWriter(outputFile.getFileSystem(conf), conf, 
+                                   outputFile, keyClass, valClass, compress, 
+                                   blockCompress, codec, prog,
+                                   new Metadata());
       return writer;
     }
 
@@ -2457,7 +2561,7 @@
             Path outputFile =  lDirAlloc.getLocalPathForWrite(
                                                 tmpFilename.toString(),
                                                 approxOutputSize, conf);
-            LOG.info("writing intermediate results to " + outputFile);
+            LOG.debug("writing intermediate results to " + outputFile);
             Writer writer = cloneFileAttributes(
                                                 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
                                                 fs.makeQualified(outputFile), null);
@@ -2590,7 +2694,7 @@
           }
           Reader reader = new Reader(fs, segmentPathName, 
                                      bufferSize, segmentOffset, 
-                                     segmentLength, conf);
+                                     segmentLength, conf, false);
         
           //sometimes we ignore syncs especially for temp merge files
           if (ignoreSync) reader.sync = null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/CompressionCodec.java Mon Jun 18 14:59:36 2007
@@ -29,19 +29,79 @@
 public interface CompressionCodec {
 
   /**
-   * Create a stream compressor that will write to the given output stream.
+   * Create a {@link CompressionOutputStream} that will write to the given 
+   * {@link OutputStream}.
+   * 
    * @param out the location for the final output stream
-   * @return a stream the user can write uncompressed data to
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
    */
   CompressionOutputStream createOutputStream(OutputStream out) 
-    throws IOException;
+  throws IOException;
+  
+  /**
+   * Create a {@link CompressionOutputStream} that will write to the given 
+   * {@link OutputStream} with the given {@link Compressor}.
+   * 
+   * @param out the location for the final output stream
+   * @param compressor compressor to use
+   * @return a stream the user can write uncompressed data to have it compressed
+   * @throws IOException
+   */
+  CompressionOutputStream createOutputStream(OutputStream out, 
+                                             Compressor compressor) 
+  throws IOException;
+
+  /**
+   * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+   * 
+   * @return the type of compressor needed by this codec.
+   */
+  Class getCompressorType();
+  
+  /**
+   * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+   * 
+   * @return a new compressor for use by this codec
+   */
+  Compressor createCompressor();
   
   /**
    * Create a stream decompressor that will read from the given input stream.
+   * 
    * @param in the stream to read compressed bytes from
    * @return a stream to read uncompressed bytes from
+   * @throws IOException
    */
   CompressionInputStream createInputStream(InputStream in) throws IOException;
+  
+  /**
+   * Create a {@link CompressionInputStream} that will read from the given 
+   * {@link InputStream} with the given {@link Decompressor}.
+   * 
+   * @param in the stream to read compressed bytes from
+   * @param decompressor decompressor to use
+   * @return a stream to read uncompressed bytes from
+   * @throws IOException
+   */
+  CompressionInputStream createInputStream(InputStream in, 
+                                           Decompressor decompressor) 
+  throws IOException;
+
+
+  /**
+   * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+   * 
+   * @return the type of decompressor needed by this codec.
+   */
+  Class getDecompressorType();
+  
+  /**
+   * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+   * 
+   * @return a new decompressor for use by this codec
+   */
+  Decompressor createDecompressor();
   
   /**
    * Get the default filename extension for this kind of compression.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/DefaultCodec.java Mon Jun 18 14:59:36 2007
@@ -29,7 +29,7 @@
 public class DefaultCodec implements Configurable, CompressionCodec {
   
   Configuration conf;
-  
+
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
@@ -38,33 +38,50 @@
     return conf;
   }
   
-  /**
-   * Create a stream compressor that will write to the given output stream.
-   * @param out the location for the final output stream
-   * @return a stream the user can write uncompressed data to
-   */
   public CompressionOutputStream createOutputStream(OutputStream out) 
-    throws IOException {
-    return new CompressorStream(out, ZlibFactory.getZlibCompressor(), 
+  throws IOException {
+    return new CompressorStream(out, createCompressor(), 
                                 conf.getInt("io.file.buffer.size", 4*1024));
   }
-  
-  /**
-   * Create a stream decompressor that will read from the given input stream.
-   * @param in the stream to read compressed bytes from
-   * @return a stream to read uncompressed bytes from
-   */
+
+  public CompressionOutputStream createOutputStream(OutputStream out, 
+                                                    Compressor compressor) 
+  throws IOException {
+    return new CompressorStream(out, compressor, 
+                                conf.getInt("io.file.buffer.size", 4*1024));
+  }
+
+  public Class getCompressorType() {
+    return ZlibFactory.getZlibCompressorType();
+  }
+
+  public Compressor createCompressor() {
+    return ZlibFactory.getZlibCompressor();
+  }
+
   public CompressionInputStream createInputStream(InputStream in) 
-    throws IOException {
-    return new DecompressorStream(in, ZlibFactory.getZlibDecompressor(),
+  throws IOException {
+    return new DecompressorStream(in, createDecompressor(),
+                                  conf.getInt("io.file.buffer.size", 4*1024));
+  }
+
+  public CompressionInputStream createInputStream(InputStream in, 
+                                                  Decompressor decompressor) 
+  throws IOException {
+    return new DecompressorStream(in, decompressor, 
                                   conf.getInt("io.file.buffer.size", 4*1024));
   }
+
+  public Class getDecompressorType() {
+    return ZlibFactory.getZlibDecompressorType();
+  }
+
+  public Decompressor createDecompressor() {
+    return ZlibFactory.getZlibDecompressor();
+  }
   
-  /**
-   * Get the default filename extension for this kind of compression.
-   * @return the extension including the '.'
-   */
   public String getDefaultExtension() {
     return ".deflate";
   }
+
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Mon Jun 18 14:59:36 2007
@@ -135,58 +135,68 @@
     }
   }  
   
-  /**
-   * Create a stream compressor that will write to the given output stream.
-   * @param out the location for the final output stream
-   * @return a stream the user can write uncompressed data to
-   */
   public CompressionOutputStream createOutputStream(OutputStream out) 
     throws IOException {
-    CompressionOutputStream compOutStream = null;
-    
-    if (ZlibFactory.isNativeZlibLoaded()) {
-      Compressor compressor = 
-        new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
-                           ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
-                           ZlibCompressor.CompressionHeader.GZIP_FORMAT,
-                           64*1024); 
-     
-      compOutStream = new CompressorStream(out, compressor,
-                                           conf.getInt("io.file.buffer.size", 4*1024)); 
-    } else {
-      compOutStream = new GzipOutputStream(out);
-    }
-    
-    return compOutStream;
+    return (ZlibFactory.isNativeZlibLoaded()) ?
+               new CompressorStream(out, createCompressor(),
+                                    conf.getInt("io.file.buffer.size", 4*1024)) :
+               new GzipOutputStream(out);
   }
   
-  /**
-   * Create a stream decompressor that will read from the given input stream.
-   * @param in the stream to read compressed bytes from
-   * @return a stream to read uncompressed bytes from
-   */
+  public CompressionOutputStream createOutputStream(OutputStream out, 
+                                                    Compressor compressor) 
+  throws IOException {
+    return (compressor != null) ?
+               new CompressorStream(out, compressor,
+                                    conf.getInt("io.file.buffer.size", 
+                                                4*1024)) :
+               createOutputStream(out);                                               
+
+  }
+
+  public Compressor createCompressor() {
+    return (ZlibFactory.isNativeZlibLoaded()) ?
+               new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
+                                  ZlibCompressor.CompressionStrategy.DEFAULT_STRATEGY,
+                                  ZlibCompressor.CompressionHeader.GZIP_FORMAT,
+                                  64*1024) :
+               null;
+  }
+
+  public Class getCompressorType() {
+    return ZlibFactory.getZlibCompressorType();
+  }
+
   public CompressionInputStream createInputStream(InputStream in) 
-    throws IOException {
-    CompressionInputStream compInStream = null;
-    
-    if (ZlibFactory.isNativeZlibLoaded()) {
-      Decompressor decompressor =
-        new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB,
-                             64*1024);
-
-      compInStream = new DecompressorStream(in, decompressor,
-                                            conf.getInt("io.file.buffer.size", 4*1024)); 
-    } else {
-      compInStream = new GzipInputStream(in);
-    }
-    
-    return compInStream;
+  throws IOException {
+  return (ZlibFactory.isNativeZlibLoaded()) ?
+             new DecompressorStream(in, createDecompressor(),
+                                    conf.getInt("io.file.buffer.size", 
+                                                4*1024)) :
+             new GzipInputStream(in);                                         
   }
-  
-  /**
-   * Get the default filename extension for this kind of compression.
-   * @return the extension including the '.'
-   */
+
+  public CompressionInputStream createInputStream(InputStream in, 
+                                                  Decompressor decompressor) 
+  throws IOException {
+    return (decompressor != null) ? 
+               new DecompressorStream(in, decompressor,
+                                      conf.getInt("io.file.buffer.size", 
+                                                  4*1024)) :
+               createInputStream(in); 
+  }
+
+  public Decompressor createDecompressor() {
+    return (ZlibFactory.isNativeZlibLoaded()) ?
+               new ZlibDecompressor(ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB,
+                                    64*1024) :
+               null;                               
+  }
+
+  public Class getDecompressorType() {
+    return ZlibFactory.getZlibDecompressorType();
+  }
+
   public String getDefaultExtension() {
     return ".gz";
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/LzoCodec.java Mon Jun 18 14:59:36 2007
@@ -41,7 +41,7 @@
   private static final Log LOG = LogFactory.getLog(LzoCodec.class.getName());
 
   private Configuration conf;
-  
+
   public void setConf(Configuration conf) {
     this.conf = conf;
   }
@@ -81,7 +81,7 @@
     throws IOException {
     // Ensure native-lzo library is loaded & initialized
     if (!isNativeLzoLoaded()) {
-      throw new IOException("native-lzo library not available");
+      throw new RuntimeException("native-lzo library not available");
     }
     
     /**
@@ -119,12 +119,66 @@
     } else {
       compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0);
     }
-     
+    
     return new BlockCompressorStream(out, 
                                      new LzoCompressor(strategy, bufferSize), 
                                      bufferSize, compressionOverhead);
   }
   
+  public CompressionOutputStream createOutputStream(OutputStream out, 
+                                                    Compressor compressor) 
+  throws IOException {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new RuntimeException("native-lzo library not available");
+    }
+    
+    LzoCompressor.CompressionStrategy strategy = 
+      LzoCompressor.CompressionStrategy.valueOf(
+                                                conf.get("io.compression.codec.lzo.compressor",
+                                                         LzoCompressor.CompressionStrategy.LZO1X_1.name()
+                                                         )
+                                                ); 
+    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
+                                 64*1024);
+    int compressionOverhead = 0;
+    if (strategy.name().contains("LZO1")) {
+      compressionOverhead = (int)(((bufferSize - (64 + 3)) * 16.0) / 17.0);  
+    } else {
+      compressionOverhead = (int)(((bufferSize - (128 + 3)) * 8.0) / 9.0);
+    }
+    
+    return new BlockCompressorStream(out, compressor, bufferSize, 
+                                     compressionOverhead); 
+  }
+
+  public Class getCompressorType() {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new RuntimeException("native-lzo library not available");
+    }
+    
+    return LzoCompressor.class;
+  }
+
+  public Compressor createCompressor() {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new RuntimeException("native-lzo library not available");
+    }
+    
+    LzoCompressor.CompressionStrategy strategy = 
+      LzoCompressor.CompressionStrategy.valueOf(
+                                                conf.get("io.compression.codec.lzo.compressor",
+                                                         LzoCompressor.CompressionStrategy.LZO1X_1.name()
+                                                         )
+                                                ); 
+    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
+                                 64*1024);
+    
+    return new LzoCompressor(strategy, bufferSize); 
+  }
+
   public CompressionInputStream createInputStream(InputStream in) 
     throws IOException {
     // Ensure native-lzo library is loaded & initialized
@@ -146,7 +200,47 @@
                                        new LzoDecompressor(strategy, bufferSize), 
                                        bufferSize);
   }
-  
+
+  public CompressionInputStream createInputStream(InputStream in, 
+                                                  Decompressor decompressor) 
+  throws IOException {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new RuntimeException("native-lzo library not available");
+    }
+    
+    return new BlockDecompressorStream(in, decompressor, 
+                                       conf.getInt("io.compression.codec.lzo.buffersize", 
+                                                   64*1024));
+  }
+
+  public Class getDecompressorType() {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new RuntimeException("native-lzo library not available");
+    }
+    
+    return LzoDecompressor.class;
+  }
+
+  public Decompressor createDecompressor() {
+    // Ensure native-lzo library is loaded & initialized
+    if (!isNativeLzoLoaded()) {
+      throw new RuntimeException("native-lzo library not available");
+    }
+    
+    LzoDecompressor.CompressionStrategy strategy = 
+      LzoDecompressor.CompressionStrategy.valueOf(
+                                                  conf.get("io.compression.codec.lzo.decompressor",
+                                                           LzoDecompressor.CompressionStrategy.LZO1X.name()
+                                                           )
+                                                  ); 
+    int bufferSize = conf.getInt("io.compression.codec.lzo.buffersize", 
+                                 64*1024);
+
+    return new LzoDecompressor(strategy, bufferSize); 
+  }
+
   /**
    * Get the default filename extension for this kind of compression.
    * @return the extension including the '.'

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java Mon Jun 18 14:59:36 2007
@@ -22,8 +22,6 @@
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
@@ -35,7 +33,7 @@
  * @author Arun C Murthy
  */
 public class ZlibCompressor implements Compressor {
-  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024;
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
   
   private long stream;
   private CompressionLevel level;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Mon Jun 18 14:59:36 2007
@@ -22,8 +22,6 @@
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
@@ -35,7 +33,7 @@
  * @author Arun C Murthy
  */
 public class ZlibDecompressor implements Decompressor {
-  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 1*1024;
+  private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
   
   private long stream;
   private CompressionHeader header;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java Mon Jun 18 14:59:36 2007
@@ -23,6 +23,7 @@
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A collection of factories to create the right 
@@ -60,21 +61,53 @@
   }
   
   /**
+   * Return the appropriate type of the zlib compressor. 
+   * 
+   * @return the appropriate type of the zlib compressor.
+   */
+  public static Class getZlibCompressorType() {
+    return (nativeZlibLoaded) ? 
+            ZlibCompressor.class : BuiltInZlibDeflater.class;
+  }
+  
+  /**
    * Return the appropriate implementation of the zlib compressor. 
    * 
    * @return the appropriate implementation of the zlib compressor.
    */
   public static Compressor getZlibCompressor() {
+    LOG.info("Creating a new ZlibCompressor");
+    try {
+      throw new Exception();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
     return (nativeZlibLoaded) ? 
       new ZlibCompressor() : new BuiltInZlibDeflater(); 
   }
 
   /**
+   * Return the appropriate type of the zlib decompressor. 
+   * 
+   * @return the appropriate type of the zlib decompressor.
+   */
+  public static Class getZlibDecompressorType() {
+    return (nativeZlibLoaded) ? 
+            ZlibDecompressor.class : BuiltInZlibInflater.class;
+  }
+  
+  /**
    * Return the appropriate implementation of the zlib decompressor. 
    * 
    * @return the appropriate implementation of the zlib decompressor.
    */
   public static Decompressor getZlibDecompressor() {
+    LOG.info("Creating a new ZlibDecompressor");
+    try {
+      throw new Exception();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
     return (nativeZlibLoaded) ? 
       new ZlibDecompressor() : new BuiltInZlibInflater(); 
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jun 18 14:59:36 2007
@@ -42,6 +42,7 @@
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Sorter;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -332,10 +333,10 @@
                                   job.getMapOutputValueClass(), compressionType, codec);
     }
     private void endPartition(int partNumber) throws IOException {
-      //Need to write syncs especially if block compression is in use
+      //Need to close the file, especially if block compression is in use
       //We also update the index file to contain the part offsets per 
       //spilled file
-      writer.sync();
+      writer.close();
       indexOut.writeLong(segmentStart);
       //we also store 0 length key/val segments to make the merge phase easier.
       indexOut.writeLong(out.getPos()-segmentStart);
@@ -529,11 +530,13 @@
         //create dummy files
         for (int i = 0; i < partitions; i++) {
           segmentStart = finalOut.getPos();
-          SequenceFile.createWriter(job, finalOut, 
-                                    job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
-                                    compressionType, codec);
+          Writer writer = SequenceFile.createWriter(job, finalOut, 
+                                                    job.getMapOutputKeyClass(), 
+                                                    job.getMapOutputValueClass(), 
+                                                    compressionType, codec);
           finalIndexOut.writeLong(segmentStart);
           finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
+          writer.close();
         }
         finalOut.close();
         finalIndexOut.close();
@@ -560,14 +563,14 @@
             segmentList.add(i, s);
           }
           segmentStart = finalOut.getPos();
+          RawKeyValueIterator kvIter = sorter.merge(segmentList, new Path(getTaskId())); 
           SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut, 
                                                                  job.getMapOutputKeyClass(), job.getMapOutputValueClass(), 
                                                                  compressionType, codec);
-          sorter.writeFile(sorter.merge(segmentList, new Path(getTaskId())), 
-                           writer);
-          //add a sync block - required esp. for block compression to ensure
+          sorter.writeFile(kvIter, writer);
+          //close the file - required esp. for block compression to ensure
           //partition data don't span partition boundaries
-          writer.sync();
+          writer.close();
           //when we write the offset/length to the final index file, we write
           //longs for both. This helps us to reliably seek directly to the
           //offset/length for a partition when we start serving the byte-ranges

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Mon Jun 18 14:59:36 2007
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.io.compress;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.*;
@@ -39,14 +40,44 @@
       return conf;
     }
     
-    public CompressionOutputStream createOutputStream(OutputStream out) {
+    public CompressionOutputStream createOutputStream(OutputStream out) 
+    throws IOException {
       return null;
     }
     
-    public CompressionInputStream createInputStream(InputStream in) {
+    public Class getCompressorType() {
       return null;
     }
-    
+
+    public Compressor createCompressor() {
+      return null;
+    }
+
+    public CompressionInputStream createInputStream(InputStream in, 
+                                                    Decompressor decompressor) 
+    throws IOException {
+      return null;
+    }
+
+    public CompressionInputStream createInputStream(InputStream in) 
+    throws IOException {
+      return null;
+    }
+
+    public CompressionOutputStream createOutputStream(OutputStream out, 
+                                                      Compressor compressor) 
+    throws IOException {
+      return null;
+    }
+
+    public Class getDecompressorType() {
+      return null;
+    }
+
+    public Decompressor createDecompressor() {
+      return null;
+    }
+
     public String getDefaultExtension() {
       return ".base";
     }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java?view=diff&rev=548505&r1=548504&r2=548505
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/BigMapOutput.java Mon Jun 18 14:59:36 2007
@@ -1,42 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapred;
 
 import java.io.*;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.util.ToolBase;
 import org.apache.hadoop.fs.*;
 
-public class BigMapOutput {
+public class BigMapOutput extends ToolBase {
+  public static final Log LOG =
+    LogFactory.getLog(BigMapOutput.class.getName());
+  private static Random random = new Random();
+  
+  private static void randomizeBytes(byte[] data, int offset, int length) {
+    for(int i=offset + length - 1; i >= offset; --i) {
+      data[i] = (byte) random.nextInt(256);
+    }
+  }
+
+  private static void createBigMapInputFile(Configuration conf, FileSystem fs, 
+                                            Path dir, long fileSizeInMB) 
+  throws IOException {
+    // Check if the input path exists and is non-empty
+    if (fs.exists(dir)) {
+      Path[] list = fs.listPaths(dir);
+      if (list != null && list.length > 0) {
+        throw new IOException("Input path: " + dir + " already exists... ");
+      }
+    }
+    
+    Path file = new Path(dir, "part-0");
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fs, conf, file, 
+                                BytesWritable.class, BytesWritable.class,
+                                CompressionType.NONE);
+    long numBytesToWrite = fileSizeInMB * 1024 * 1024;
+    int minKeySize = conf.getInt("test.bmo.min_key", 10);;
+    int keySizeRange = 
+      conf.getInt("test.bmo.max_key", 1000) - minKeySize;
+    int minValueSize = conf.getInt("test.bmo.min_value", 0);
+    int valueSizeRange = 
+      conf.getInt("test.bmo.max_value", 20000) - minValueSize;
+    BytesWritable randomKey = new BytesWritable();
+    BytesWritable randomValue = new BytesWritable();
+
+    LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " +
+             "minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange +
+             " minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange);
+    long start = System.currentTimeMillis();
+    while (numBytesToWrite > 0) {
+      int keyLength = minKeySize + 
+        (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
+      randomKey.setSize(keyLength);
+      randomizeBytes(randomKey.get(), 0, randomKey.getSize());
+      int valueLength = minValueSize +
+        (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
+      randomValue.setSize(valueLength);
+      randomizeBytes(randomValue.get(), 0, randomValue.getSize());
+      writer.append(randomKey, randomValue);
+      numBytesToWrite -= keyLength + valueLength;
+    }
+    writer.close();
+    long end = System.currentTimeMillis();
 
-  public static void main(String[] args) throws IOException {
-    if (args.length != 4) { //input-dir should contain a huge file ( > 2GB)
-      System.err.println("BigMapOutput " +
-                       "-input <input-dir> -output <output-dir>");
-      System.exit(1);
+    LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " + 
+             (end-start)/1000 + "secs");
+  }
+  
+  private static void usage() {
+    System.err.println("BigMapOutput -input <input-dir> -output <output-dir> " +
+                       "[-create <filesize in MB>]");
+    System.exit(1);
+  }
+  public int run(String[] args) throws Exception {    
+    if (args.length < 4) { //input-dir should contain a huge file ( > 2GB)
+      usage();
     } 
     Path bigMapInput = null;
     Path outputPath = null;
+    boolean createInput = false;
+    long fileSizeInMB = 3 * 1024;         // default of 3GB (>2GB)
     for(int i=0; i < args.length; ++i) {
       if ("-input".equals(args[i])){
         bigMapInput = new Path(args[++i]);
       } else if ("-output".equals(args[i])){
         outputPath = new Path(args[++i]);
+      } else if ("-create".equals(args[i])) {
+        createInput = true;
+        fileSizeInMB = Long.parseLong(args[++i]);
+      } else {
+        usage();
       }
     }
-    Configuration defaults = new Configuration();
-    FileSystem fs = FileSystem.get(defaults);
     
-    JobConf jobConf = new JobConf(defaults, BigMapOutput.class);
+    FileSystem fs = FileSystem.get(conf);
+    JobConf jobConf = new JobConf(conf, BigMapOutput.class);
 
     jobConf.setJobName("BigMapOutput");
     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
@@ -50,12 +133,23 @@
     jobConf.setReducerClass(IdentityReducer.class);
     jobConf.setOutputKeyClass(BytesWritable.class);
     jobConf.setOutputValueClass(BytesWritable.class);
-
+    
+    if (createInput) {
+      createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
+    }
+    
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
     JobClient.runJob(jobConf);
     Date end_time = new Date();
     System.out.println("Job ended: " + end_time);
-      
+    
+    return 0;
   }
+
+  public static void main(String argv[]) throws Exception {
+    int res = new BigMapOutput().doMain(new Configuration(), argv);
+    System.exit(res);
+  }
+
 }