You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2010/07/08 01:22:28 UTC

svn commit: r961532 - in /hadoop/common/trunk: ./ src/java/org/apache/hadoop/io/compress/ src/java/org/apache/hadoop/io/compress/bzip2/ src/java/org/apache/hadoop/io/compress/zlib/ src/native/src/org/apache/hadoop/io/compress/zlib/ src/test/core/org/ap...

Author: cdouglas
Date: Wed Jul  7 23:22:28 2010
New Revision: 961532

URL: http://svn.apache.org/viewvc?rev=961532&view=rev
Log:
HADOOP-6835. Add support for concatenated gzip input. Contributed by Greg Roelofs

Added:
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
    hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
    hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
    hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Wed Jul  7 23:22:28 2010
@@ -57,6 +57,9 @@ Trunk (unreleased changes)
     HADOOP-6756. Documentation for common configuration keys.
     (Erik Steffl via shv)
 
+    HADOOP-6835. Add support for concatenated gzip input. (Greg Roelofs via
+    cdouglas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java Wed Jul  7 23:22:28 2010
@@ -43,19 +43,19 @@ public class BlockDecompressorStream ext
    * @param in input stream
    * @param decompressor decompressor to use
    * @param bufferSize size of buffer
- * @throws IOException
+   * @throws IOException
    */
   public BlockDecompressorStream(InputStream in, Decompressor decompressor, 
                                  int bufferSize) throws IOException {
     super(in, decompressor, bufferSize);
   }
-  
+
   /**
    * Create a {@link BlockDecompressorStream}.
    * 
    * @param in input stream
    * @param decompressor decompressor to use
- * @throws IOException
+   * @throws IOException
    */
   public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
     super(in, decompressor);
@@ -76,7 +76,7 @@ public class BlockDecompressorStream ext
       }
       noUncompressedBytes = 0;
     }
-    
+
     int n = 0;
     while ((n = decompressor.decompress(b, off, len)) == 0) {
       if (decompressor.finished() || decompressor.needsDictionary()) {
@@ -86,20 +86,22 @@ public class BlockDecompressorStream ext
         }
       }
       if (decompressor.needsInput()) {
-        getCompressedData();
+        int m = getCompressedData();
+        // Send the read data to the decompressor
+        decompressor.setInput(buffer, 0, m);
       }
     }
-    
+
     // Note the no. of decompressed bytes read from 'current' block
     noUncompressedBytes += n;
 
     return n;
   }
 
-  protected void getCompressedData() throws IOException {
+  protected int getCompressedData() throws IOException {
     checkStream();
 
-    // Get the size of the compressed chunk
+    // Get the size of the compressed chunk (always non-negative)
     int len = rawReadInt();
 
     // Read len bytes from underlying stream 
@@ -110,13 +112,12 @@ public class BlockDecompressorStream ext
     while (n < len) {
       int count = in.read(buffer, off + n, len - n);
       if (count < 0) {
-        throw new EOFException();
+        throw new EOFException("Unexpected end of block in input stream");
       }
       n += count;
     }
-    
-    // Send the read data to the decompressor
-    decompressor.setInput(buffer, 0, len);
+
+    return len;
   }
 
   public void resetState() throws IOException {

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java Wed Jul  7 23:22:28 2010
@@ -34,8 +34,13 @@ import org.apache.hadoop.classification.
 public interface Decompressor {
   /**
    * Sets input data for decompression. 
-   * This should be called whenever #needsInput() returns 
+   * This should be called if and only if {@link #needsInput()} returns 
    * <code>true</code> indicating that more input data is required.
+   * (Both native and non-native versions of various Decompressors require
+   * that the data passed in via <code>b[]</code> remain unmodified until
+   * the caller is explicitly notified--via {@link #needsInput()}--that the
+   * buffer may be safely modified.  With this requirement, an extra
+   * buffer-copy can be avoided.)
    * 
    * @param b Input data
    * @param off Start offset
@@ -45,10 +50,12 @@ public interface Decompressor {
   
   /**
    * Returns true if the input data buffer is empty and 
-   * #setInput() should be called to provide more input. 
+   * {@link #setInput(byte[], int, int)} should be called to
+   * provide more input. 
    * 
    * @return <code>true</code> if the input data buffer is empty and 
-   * #setInput() should be called in order to provide more input.
+   * {@link #setInput(byte[], int, int)} should be called in
+   * order to provide more input.
    */
   public boolean needsInput();
   
@@ -69,9 +76,9 @@ public interface Decompressor {
   public boolean needsDictionary();
 
   /**
-   * Returns true if the end of the compressed 
+   * Returns true if the end of the decompressed 
    * data output stream has been reached.
-   * @return <code>true</code> if the end of the compressed
+   * @return <code>true</code> if the end of the decompressed
    * data output stream has been reached.
    */
   public boolean finished();
@@ -79,8 +86,8 @@ public interface Decompressor {
   /**
    * Fills specified buffer with uncompressed data. Returns actual number
    * of bytes of uncompressed data. A return value of 0 indicates that
-   * #needsInput() should be called in order to determine if more input
-   * data is required.
+   * {@link #needsInput()} should be called in order to determine if more
+   * input data is required.
    * 
    * @param b Buffer for the compressed data
    * @param off Start offset of the data
@@ -89,12 +96,20 @@ public interface Decompressor {
    * @throws IOException
    */
   public int decompress(byte[] b, int off, int len) throws IOException;
-  
+
   /**
-   * Resets decompressor so that a new set of input data can be processed.
+   * Returns the number of bytes remaining in the compressed-data buffer;
+   * typically called after the decompressor has finished decompressing
+   * the current gzip stream (a.k.a. "member").
+   */
+  public int getRemaining();
+
+  /**
+   * Resets decompressor and input and output buffers so that a new set of
+   * input data can be processed.
    */
   public void reset();
-  
+
   /**
    * Closes the decompressor and discards any unprocessed input.
    */

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java Wed Jul  7 23:22:28 2010
@@ -33,8 +33,11 @@ public class DecompressorStream extends 
   protected byte[] buffer;
   protected boolean eof = false;
   protected boolean closed = false;
-  
-  public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) throws IOException {
+  private int lastBytesSent = 0;
+
+  public DecompressorStream(InputStream in, Decompressor decompressor,
+                            int bufferSize)
+  throws IOException {
     super(in);
 
     if (in == null || decompressor == null) {
@@ -47,7 +50,8 @@ public class DecompressorStream extends 
     buffer = new byte[bufferSize];
   }
 
-  public DecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
+  public DecompressorStream(InputStream in, Decompressor decompressor)
+  throws IOException {
     this(in, decompressor, 512);
   }
 
@@ -55,7 +59,7 @@ public class DecompressorStream extends 
    * Allow derived classes to directly set the underlying stream.
    * 
    * @param in Underlying input stream.
- * @throws IOException
+   * @throws IOException
    */
   protected DecompressorStream(InputStream in) throws IOException {
     super(in);
@@ -81,31 +85,78 @@ public class DecompressorStream extends 
 
   protected int decompress(byte[] b, int off, int len) throws IOException {
     int n = 0;
-    
+
     while ((n = decompressor.decompress(b, off, len)) == 0) {
-      if (decompressor.finished() || decompressor.needsDictionary()) {
+      if (decompressor.needsDictionary()) {
         eof = true;
         return -1;
       }
-      if (decompressor.needsInput()) {
-        getCompressedData();
+
+      if (decompressor.finished()) {
+        // First see if there was any leftover buffered input from previous
+        // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
+        // all done; else reset, fix up input buffer, and get ready for next
+        // concatenated substream/"member".
+        int nRemaining = decompressor.getRemaining();
+        if (nRemaining == 0) {
+          int m = getCompressedData();
+          if (m == -1) {
+            // apparently the previous end-of-stream was also end-of-file:
+            // return success, as if we had never called getCompressedData()
+            eof = true;
+            return -1;
+          }
+          decompressor.reset();
+          decompressor.setInput(buffer, 0, m);
+          lastBytesSent = m;
+        } else {
+          // looks like it's a concatenated stream:  reset low-level zlib (or
+          // other engine) and buffers, then "resend" remaining input data
+          decompressor.reset();
+          int leftoverOffset = lastBytesSent - nRemaining;
+          assert (leftoverOffset >= 0);
+          // this recopies userBuf -> direct buffer if using native libraries:
+          decompressor.setInput(buffer, leftoverOffset, nRemaining);
+          // NOTE:  this is the one place we do NOT want to save the number
+          // of bytes sent (nRemaining here) into lastBytesSent:  since we
+          // are resending what we've already sent before, offset is nonzero
+          // in general (only way it could be zero is if it already equals
+          // nRemaining), which would then screw up the offset calculation
+          // _next_ time around.  IOW, getRemaining() is in terms of the
+          // original, zero-offset bufferload, so lastBytesSent must be as
+          // well.  Cheesy ASCII art:
+          //
+          //          <------------ m, lastBytesSent ----------->
+          //          +===============================================+
+          // buffer:  |1111111111|22222222222222222|333333333333|     |
+          //          +===============================================+
+          //     #1:  <-- off -->|<-------- nRemaining --------->
+          //     #2:  <----------- off ----------->|<-- nRem. -->
+          //     #3:  (final substream:  nRemaining == 0; eof = true)
+          //
+          // If lastBytesSent is anything other than m, as shown, then "off"
+          // will be calculated incorrectly.
+        }
+      } else if (decompressor.needsInput()) {
+        int m = getCompressedData();
+        if (m == -1) {
+          throw new EOFException("Unexpected end of input stream");
+        }
+        decompressor.setInput(buffer, 0, m);
+        lastBytesSent = m;
       }
     }
-    
+
     return n;
   }
-  
-  protected void getCompressedData() throws IOException {
+
+  protected int getCompressedData() throws IOException {
     checkStream();
   
-    int n = in.read(buffer, 0, buffer.length);
-    if (n == -1) {
-      throw new EOFException("Unexpected end of input stream");
-    }
-
-    decompressor.setInput(buffer, 0, n);
+    // note that the _caller_ is now required to call setInput() or throw
+    return in.read(buffer, 0, buffer.length);
   }
-  
+
   protected void checkStream() throws IOException {
     if (closed) {
       throw new IOException("Stream closed");

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Wed Jul  7 23:22:28 2010
@@ -91,58 +91,7 @@ public class GzipCodec extends DefaultCo
       ((ResetableGZIPOutputStream) out).resetState();
     }
   }
-  
-  @InterfaceStability.Evolving
-  protected static class GzipInputStream extends DecompressorStream {
-    
-    private static class ResetableGZIPInputStream extends GZIPInputStream {
 
-      public ResetableGZIPInputStream(InputStream in) throws IOException {
-        super(in);
-      }
-      
-      public void resetState() throws IOException {
-        inf.reset();
-      }
-    }
-    
-    public GzipInputStream(InputStream in) throws IOException {
-      super(new ResetableGZIPInputStream(in));
-    }
-    
-    /**
-     * Allow subclasses to directly set the inflater stream.
-     * @throws IOException
-     */
-    protected GzipInputStream(DecompressorStream in) throws IOException {
-      super(in);
-    }
-
-    public int available() throws IOException {
-      return in.available(); 
-    }
-
-    public void close() throws IOException {
-      in.close();
-    }
-
-    public int read() throws IOException {
-      return in.read();
-    }
-    
-    public int read(byte[] data, int offset, int len) throws IOException {
-      return in.read(data, offset, len);
-    }
-    
-    public long skip(long offset) throws IOException {
-      return in.skip(offset);
-    }
-    
-    public void resetState() throws IOException {
-      ((ResetableGZIPInputStream) in).resetState();
-    }
-  }  
-  
   public CompressionOutputStream createOutputStream(OutputStream out) 
     throws IOException {
     return (ZlibFactory.isNativeZlibLoaded(conf)) ?
@@ -158,8 +107,7 @@ public class GzipCodec extends DefaultCo
                new CompressorStream(out, compressor,
                                     conf.getInt("io.file.buffer.size", 
                                                 4*1024)) :
-               createOutputStream(out);                                               
-
+               createOutputStream(out);
   }
 
   public Compressor createCompressor() {
@@ -174,35 +122,31 @@ public class GzipCodec extends DefaultCo
       : null;
   }
 
-  public CompressionInputStream createInputStream(InputStream in) 
+  public CompressionInputStream createInputStream(InputStream in)
   throws IOException {
-  return (ZlibFactory.isNativeZlibLoaded(conf)) ?
-             new DecompressorStream(in, createDecompressor(),
-                                    conf.getInt("io.file.buffer.size", 
-                                                4*1024)) :
-             new GzipInputStream(in);                                         
+    return createInputStream(in, null);
   }
 
-  public CompressionInputStream createInputStream(InputStream in, 
-                                                  Decompressor decompressor) 
+  public CompressionInputStream createInputStream(InputStream in,
+                                                  Decompressor decompressor)
   throws IOException {
-    return (decompressor != null) ? 
-               new DecompressorStream(in, decompressor,
-                                      conf.getInt("io.file.buffer.size", 
-                                                  4*1024)) :
-               createInputStream(in); 
+    if (decompressor == null) {
+      decompressor = createDecompressor();  // always succeeds (or throws)
+    }
+    return new DecompressorStream(in, decompressor,
+                                  conf.getInt("io.file.buffer.size", 4*1024));
   }
 
   public Decompressor createDecompressor() {
     return (ZlibFactory.isNativeZlibLoaded(conf))
       ? new GzipZlibDecompressor()
-      : null;
+      : new BuiltInGzipDecompressor();
   }
 
   public Class<? extends Decompressor> getDecompressorType() {
     return ZlibFactory.isNativeZlibLoaded(conf)
       ? GzipZlibDecompressor.class
-      : null;
+      : BuiltInGzipDecompressor.class;
   }
 
   public String getDefaultExtension() {

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java Wed Jul  7 23:22:28 2010
@@ -53,6 +53,11 @@ public class BZip2DummyDecompressor impl
   }
 
   @Override
+  public int getRemaining() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void reset() {
     // do nothing
   }

Added: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java?rev=961532&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java Wed Jul  7 23:22:28 2010
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A {@link Decompressor} based on the popular gzip compressed file format.
+ * http://www.gzip.org/
+ *
+ */
+public class BuiltInGzipDecompressor implements Decompressor {
+  private static final int GZIP_MAGIC_ID = 0x8b1f;  // if read as LE short int
+  private static final int GZIP_DEFLATE_METHOD = 8;
+  private static final int GZIP_FLAGBIT_HEADER_CRC  = 0x02;
+  private static final int GZIP_FLAGBIT_EXTRA_FIELD = 0x04;
+  private static final int GZIP_FLAGBIT_FILENAME    = 0x08;
+  private static final int GZIP_FLAGBIT_COMMENT     = 0x10;
+  private static final int GZIP_FLAGBITS_RESERVED   = 0xe0;
+
+  // 'true' (nowrap) => Inflater will handle raw deflate stream only
+  private Inflater inflater = new Inflater(true);
+
+  private byte[] userBuf = null;
+  private int userBufOff = 0;
+  private int userBufLen = 0;
+
+  private byte[] localBuf = new byte[256];
+  private int localBufOff = 0;
+
+  private int headerBytesRead = 0;
+  private int trailerBytesRead = 0;
+  private int numExtraFieldBytesRemaining = -1;
+  private PureJavaCrc32 crc = new PureJavaCrc32();
+  private boolean hasExtraField = false;
+  private boolean hasFilename = false;
+  private boolean hasComment = false;
+  private boolean hasHeaderCRC = false;
+
+  private GzipStateLabel state;
+
+  /**
+   * The current state of the gzip decoder, external to the Inflater context.
+   * (Technically, the private variables localBuf through hasHeaderCRC are
+   * also part of the state, so this enum is merely the label for it.)
+   */
+  private static enum GzipStateLabel {
+    /**
+     * Immediately prior to or (strictly) within the 10-byte basic gzip header.
+     */
+    HEADER_BASIC,
+    /**
+     * Immediately prior to or within the optional "extra field."
+     */
+    HEADER_EXTRA_FIELD,
+    /**
+     * Immediately prior to or within the optional filename field.
+     */
+    HEADER_FILENAME,
+    /**
+     * Immediately prior to or within the optional comment field.
+     */
+    HEADER_COMMENT,
+    /**
+     * Immediately prior to or within the optional 2-byte header CRC value.
+     */
+    HEADER_CRC,
+    /**
+     * Immediately prior to or within the main compressed (deflate) data stream.
+     */
+    DEFLATE_STREAM,
+    /**
+     * Immediately prior to or (strictly) within the 4-byte uncompressed CRC.
+     */
+    TRAILER_CRC,
+    /**
+     * Immediately prior to or (strictly) within the 4-byte uncompressed size.
+     */
+    TRAILER_SIZE,
+    /**
+     * Immediately after the trailer (and potentially prior to the next gzip
+     * member/substream header), without reset() having been called.
+     */
+    FINISHED;
+  }
+
+  /**
+   * Creates a new (pure Java) gzip decompressor.
+   */
+  public BuiltInGzipDecompressor() {
+    state = GzipStateLabel.HEADER_BASIC;
+    crc.reset();
+    // FIXME? Inflater docs say:  'it is also necessary to provide an extra
+    //        "dummy" byte as input. This is required by the ZLIB native
+    //        library in order to support certain optimizations.'  However,
+    //        this does not appear to be true, and in any case, it's not
+    //        entirely clear where the byte should go or what its value
+    //        should be.  Perhaps it suffices to have some deflated bytes
+    //        in the first buffer load?  (But how else would one do it?)
+  }
+
+  /** {@inheritDoc} */
+  public synchronized boolean needsInput() {
+    if (state == GzipStateLabel.DEFLATE_STREAM) {  // most common case
+      return inflater.needsInput();
+    }
+    // see userBufLen comment at top of decompress(); currently no need to
+    // verify userBufLen <= 0
+    return (state != GzipStateLabel.FINISHED);
+  }
+
+  /** {@inheritDoc} */
+  /*
+   * In our case, the input data includes both gzip header/trailer bytes (which
+   * we handle in executeState()) and deflate-stream bytes (which we hand off
+   * to Inflater).
+   *
+   * NOTE:  This code assumes the data passed in via b[] remains unmodified
+   *        until _we_ signal that it's safe to modify it (via needsInput()).
+   *        The alternative would require an additional buffer-copy even for
+   *        the bulk deflate stream, which is a performance hit we don't want
+   *        to absorb.  (Decompressor now documents this requirement.)
+   */
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    userBuf = b;
+    userBufOff = off;
+    userBufLen = len;  // note:  might be zero
+  }
+
+  /**
+   * Decompress the data (gzip header, deflate stream, gzip trailer) in the
+   * provided buffer.
+   *
+   * @return the number of decompressed bytes placed into b
+   */
+  /* From the caller's perspective, this is where the state machine lives.
+   * The code is written such that we never return from decompress() with
+   * data remaining in userBuf unless we're in FINISHED state and there was
+   * data beyond the current gzip member (e.g., we're within a concatenated
+   * gzip stream).  If this ever changes, {@link #needsInput()} will also
+   * need to be modified (i.e., uncomment the userBufLen condition).
+   *
+   * The actual deflate-stream processing (decompression) is handled by
+   * Java's Inflater class.  Unlike the gzip header/trailer code (execute*
+   * methods below), the deflate stream is never copied; Inflater operates
+   * directly on the user's buffer.
+   */
+  public synchronized int decompress(byte[] b, int off, int len)
+  throws IOException {
+    int numAvailBytes = 0;
+
+    if (state != GzipStateLabel.DEFLATE_STREAM) {
+      executeHeaderState();
+
+      if (userBufLen <= 0) {
+        return numAvailBytes;
+      }
+    }
+
+    // "executeDeflateStreamState()"
+    if (state == GzipStateLabel.DEFLATE_STREAM) {
+      // hand off user data (or what's left of it) to Inflater--but note that
+      // Inflater may not have consumed all of previous bufferload (e.g., if
+      // data highly compressed or output buffer very small), in which case
+      // userBufLen will be zero
+      if (userBufLen > 0) {
+        inflater.setInput(userBuf, userBufOff, userBufLen);
+        userBufOff += userBufLen;
+        userBufLen = 0;
+      }
+
+      // now decompress it into b[]
+      try {
+        numAvailBytes = inflater.inflate(b, off, len);
+      } catch (DataFormatException dfe) {
+        throw new IOException(dfe.getMessage());
+      }
+      crc.update(b, off, numAvailBytes);  // CRC-32 is on _uncompressed_ data
+      if (inflater.finished()) {
+        state = GzipStateLabel.TRAILER_CRC;
+        int bytesRemaining = inflater.getRemaining();
+        assert (bytesRemaining >= 0) :
+          "logic error: Inflater finished; byte-count is inconsistent";
+          // could save a copy of userBufLen at call to inflater.setInput() and
+          // verify that bytesRemaining <= origUserBufLen, but would have to
+          // be a (class) member variable...seems excessive for a sanity check
+        userBufOff -= bytesRemaining;
+        userBufLen = bytesRemaining;   // or "+=", but guaranteed 0 coming in
+      } else {
+        return numAvailBytes;  // minor optimization
+      }
+    }
+
+    executeTrailerState();
+
+    return numAvailBytes;
+  }
+
+  /**
+   * Parse the gzip header (assuming we're in the appropriate state).
+   * In order to deal with degenerate cases (e.g., user buffer is one byte
+   * long), we copy (some) header bytes to another buffer.  (Filename,
+   * comment, and extra-field bytes are simply skipped.)</p>
+   *
+   * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec.  Note that
+   * no version of gzip to date (at least through 1.4.0, 2010-01-20) supports
+   * the FHCRC header-CRC16 flagbit; instead, the implementation treats it
+   * as a multi-file continuation flag (which it also doesn't support). :-(
+   * Sun's JDK v6 (1.6) supports the header CRC, however, and so do we.
+   */
+  private void executeHeaderState() throws IOException {
+
+    // this can happen because DecompressorStream's decompress() is written
+    // to call decompress() first, setInput() second:
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // "basic"/required header:  somewhere in first 10 bytes
+    if (state == GzipStateLabel.HEADER_BASIC) {
+      int n = Math.min(userBufLen, 10-localBufOff);  // (or 10-headerBytesRead)
+      checkAndCopyBytesToLocal(n);  // modifies userBufLen, etc.
+      if (localBufOff >= 10) {      // should be strictly ==
+        processBasicHeader();       // sig, compression method, flagbits
+        localBufOff = 0;            // no further need for basic header
+        state = GzipStateLabel.HEADER_EXTRA_FIELD;
+      }
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // optional header stuff (extra field, filename, comment, header CRC)
+
+    if (state == GzipStateLabel.HEADER_EXTRA_FIELD) {
+      if (hasExtraField) {
+        // 2 substates:  waiting for 2 bytes => get numExtraFieldBytesRemaining,
+        // or already have 2 bytes & waiting to finish skipping specified length
+        if (numExtraFieldBytesRemaining < 0) {
+          int n = Math.min(userBufLen, 2-localBufOff);
+          checkAndCopyBytesToLocal(n);
+          if (localBufOff >= 2) {
+            numExtraFieldBytesRemaining = readUShortLE(localBuf, 0);
+            localBufOff = 0;
+          }
+        }
+        if (numExtraFieldBytesRemaining > 0 && userBufLen > 0) {
+          int n = Math.min(userBufLen, numExtraFieldBytesRemaining);
+          checkAndSkipBytes(n);     // modifies userBufLen, etc.
+          numExtraFieldBytesRemaining -= n;
+        }
+        if (numExtraFieldBytesRemaining == 0) {
+          state = GzipStateLabel.HEADER_FILENAME;
+        }
+      } else {
+        state = GzipStateLabel.HEADER_FILENAME;
+      }
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    if (state == GzipStateLabel.HEADER_FILENAME) {
+      if (hasFilename) {
+        boolean doneWithFilename = checkAndSkipBytesUntilNull();
+        if (!doneWithFilename) {
+          return;  // exit early:  used up entire buffer without hitting NULL
+        }
+      }
+      state = GzipStateLabel.HEADER_COMMENT;
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    if (state == GzipStateLabel.HEADER_COMMENT) {
+      if (hasComment) {
+        boolean doneWithComment = checkAndSkipBytesUntilNull();
+        if (!doneWithComment) {
+          return;  // exit early:  used up entire buffer
+        }
+      }
+      state = GzipStateLabel.HEADER_CRC;
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    if (state == GzipStateLabel.HEADER_CRC) {
+      if (hasHeaderCRC) {
+        assert (localBufOff < 2);
+        int n = Math.min(userBufLen, 2-localBufOff);
+        copyBytesToLocal(n);
+        if (localBufOff >= 2) {
+          long headerCRC = readUShortLE(localBuf, 0);
+          if (headerCRC != (crc.getValue() & 0xffff)) {
+            throw new IOException("gzip header CRC failure");
+          }
+          localBufOff = 0;
+          crc.reset();
+          state = GzipStateLabel.DEFLATE_STREAM;
+        }
+      } else {
+        crc.reset();   // will reuse for CRC-32 of uncompressed data
+        state = GzipStateLabel.DEFLATE_STREAM;  // switching to Inflater now
+      }
+    }
+  }
+
+  /**
+   * Parse the gzip trailer (assuming we're in the appropriate state).
+   * In order to deal with degenerate cases (e.g., user buffer is one byte
+   * long), we copy trailer bytes (all 8 of 'em) to a local buffer.</p>
+   *
+   * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec.
+   */
+  private void executeTrailerState() throws IOException {
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // verify that the CRC-32 of the decompressed stream matches the value
+    // stored in the gzip trailer
+    if (state == GzipStateLabel.TRAILER_CRC) {
+      // localBuf was empty before we handed off to Inflater, so we handle this
+      // exactly like header fields
+      assert (localBufOff < 4);  // initially 0, but may need multiple calls
+      int n = Math.min(userBufLen, 4-localBufOff);
+      copyBytesToLocal(n);
+      if (localBufOff >= 4) {
+        long streamCRC = readUIntLE(localBuf, 0);
+        if (streamCRC != crc.getValue()) {
+          throw new IOException("gzip stream CRC failure");
+        }
+        localBufOff = 0;
+        crc.reset();
+        state = GzipStateLabel.TRAILER_SIZE;
+      }
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // verify that the mod-2^32 decompressed stream size matches the value
+    // stored in the gzip trailer
+    if (state == GzipStateLabel.TRAILER_SIZE) {
+      assert (localBufOff < 4);  // initially 0, but may need multiple calls
+      int n = Math.min(userBufLen, 4-localBufOff);
+      copyBytesToLocal(n);       // modifies userBufLen, etc.
+      if (localBufOff >= 4) {    // should be strictly ==
+        long inputSize = readUIntLE(localBuf, 0);
+        if (inputSize != (inflater.getBytesWritten() & 0xffffffff)) {
+          throw new IOException(
+            "stored gzip size doesn't match decompressed size");
+        }
+        localBufOff = 0;
+        state = GzipStateLabel.FINISHED;
+      }
+    }
+
+    if (state == GzipStateLabel.FINISHED) {
+      return;
+    }
+  }
+
+  /**
+   * Returns the total number of compressed bytes input so far, including
+   * gzip header/trailer bytes.</p>
+   *
+   * @return the total (non-negative) number of compressed bytes read so far
+   */
+  public synchronized long getBytesRead() {
+    return headerBytesRead + inflater.getBytesRead() + trailerBytesRead;
+  }
+
+  /**
+   * Returns the number of bytes remaining in the input buffer; normally
+   * called when finished() is true to determine amount of post-gzip-stream
+   * data.  Note that, other than the finished state with concatenated data
+   * after the end of the current gzip stream, this will never return a
+   * non-zero value unless called after {@link #setInput(byte[] b, int off,
+   * int len)} and before {@link #decompress(byte[] b, int off, int len)}.
+   * (That is, after {@link #decompress(byte[] b, int off, int len)} it
+   * always returns zero, except in finished state with concatenated data.)</p>
+   *
+   * @return the total (non-negative) number of unprocessed bytes in input
+   */
+  public synchronized int getRemaining() {
+    return userBufLen;
+  }
+
+  /** {@inheritDoc} */
+  public synchronized boolean needsDictionary() {
+    return inflater.needsDictionary();
+  }
+
+  /** {@inheritDoc} */
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    inflater.setDictionary(b, off, len);
+  }
+
+  /**
+   * Returns true if the end of the gzip substream (single "member") has been
+   * reached.</p>
+   */
+  public synchronized boolean finished() {
+    return (state == GzipStateLabel.FINISHED);
+  }
+
+  /**
+   * Resets everything, including the input buffer, regardless of whether the
+   * current gzip substream is finished.</p>
+   */
+  public synchronized void reset() {
+    // could optionally emit INFO message if state != GzipStateLabel.FINISHED
+    inflater.reset();
+    state = GzipStateLabel.HEADER_BASIC;
+    crc.reset();
+    userBufOff = userBufLen = 0;
+    localBufOff = 0;
+    headerBytesRead = 0;
+    trailerBytesRead = 0;
+    numExtraFieldBytesRemaining = -1;
+    hasExtraField = false;
+    hasFilename = false;
+    hasComment = false;
+    hasHeaderCRC = false;
+  }
+
+  /** {@inheritDoc} */
+  public synchronized void end() {
+    inflater.end();
+  }
+
+  /**
+   * Check ID bytes (throw if necessary), compression method (throw if not 8),
+   * and flag bits (set hasExtraField, hasFilename, hasComment, hasHeaderCRC).
+   * Ignore MTIME, XFL, OS.  Caller must ensure we have at least 10 bytes (at
+   * the start of localBuf).</p>
+   */
+  /*
+   * Flag bits (remainder are reserved and must be zero):
+   *   bit 0   FTEXT
+   *   bit 1   FHCRC   (never implemented in gzip, at least through version
+   *                   1.4.0; instead interpreted as "continuation of multi-
+   *                   part gzip file," which is unsupported through 1.4.0)
+   *   bit 2   FEXTRA
+   *   bit 3   FNAME
+   *   bit 4   FCOMMENT
+   *  [bit 5   encrypted]
+   */
+  private void processBasicHeader() throws IOException {
+    if (readUShortLE(localBuf, 0) != GZIP_MAGIC_ID) {
+      throw new IOException("not a gzip file");
+    }
+    if (readUByte(localBuf, 2) != GZIP_DEFLATE_METHOD) {
+      throw new IOException("gzip data not compressed with deflate method");
+    }
+    int flg = readUByte(localBuf, 3);
+    if ((flg & GZIP_FLAGBITS_RESERVED) != 0) {
+      throw new IOException("unknown gzip format (reserved flagbits set)");
+    }
+    hasExtraField = ((flg & GZIP_FLAGBIT_EXTRA_FIELD) != 0);
+    hasFilename   = ((flg & GZIP_FLAGBIT_FILENAME)    != 0);
+    hasComment    = ((flg & GZIP_FLAGBIT_COMMENT)     != 0);
+    hasHeaderCRC  = ((flg & GZIP_FLAGBIT_HEADER_CRC)  != 0);
+  }
+
+  private void checkAndCopyBytesToLocal(int len) {
+    System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+    localBufOff += len;
+    // alternatively, could call checkAndSkipBytes(len) for rest...
+    crc.update(userBuf, userBufOff, len);
+    userBufOff += len;
+    userBufLen -= len;
+    headerBytesRead += len;
+  }
+
+  private void checkAndSkipBytes(int len) {
+    crc.update(userBuf, userBufOff, len);
+    userBufOff += len;
+    userBufLen -= len;
+    headerBytesRead += len;
+  }
+
+  // returns true if saw NULL, false if ran out of buffer first; called _only_
+  // during gzip-header processing (not trailer)
+  // (caller can check before/after state of userBufLen to compute num bytes)
+  private boolean checkAndSkipBytesUntilNull() {
+    boolean hitNull = false;
+    if (userBufLen > 0) {
+      do {
+        hitNull = (userBuf[userBufOff] == 0);
+        crc.update(userBuf[userBufOff]);
+        ++userBufOff;
+        --userBufLen;
+        ++headerBytesRead;
+      } while (userBufLen > 0 && !hitNull);
+    }
+    return hitNull;
+  }
+
+  // this one doesn't update the CRC and does support trailer processing but
+  // otherwise is same as its "checkAnd" sibling
+  private void copyBytesToLocal(int len) {
+    System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+    localBufOff += len;
+    userBufOff += len;
+    userBufLen -= len;
+    if (state == GzipStateLabel.TRAILER_CRC ||
+        state == GzipStateLabel.TRAILER_SIZE) {
+      trailerBytesRead += len;
+    } else {
+      headerBytesRead += len;
+    }
+  }
+
+  private int readUByte(byte[] b, int off) {
+    return ((int)b[off] & 0xff);
+  }
+
+  // caller is responsible for not overrunning buffer
+  private int readUShortLE(byte[] b, int off) {
+    return ((((b[off+1] & 0xff) << 8) |
+             ((b[off]   & 0xff)     )) & 0xffff);
+  }
+
+  // caller is responsible for not overrunning buffer
+  private long readUIntLE(byte[] b, int off) {
+    return ((((long)(b[off+3] & 0xff) << 24) |
+             ((long)(b[off+2] & 0xff) << 16) |
+             ((long)(b[off+1] & 0xff) <<  8) |
+             ((long)(b[off]   & 0xff)      )) & 0xffffffff);
+  }
+
+}

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Wed Jul  7 23:22:28 2010
@@ -166,7 +166,7 @@ public class ZlibDecompressor implements
   }
 
   public synchronized boolean needsInput() {
-    // Consume remanining compressed data?
+    // Consume remaining compressed data?
     if (uncompressedDirectBuf.remaining() > 0) {
       return false;
     }
@@ -189,7 +189,7 @@ public class ZlibDecompressor implements
   }
 
   public synchronized boolean finished() {
-    // Check if 'zlib' says its 'finished' and
+    // Check if 'zlib' says it's 'finished' and
     // all compressed data has been consumed
     return (finished && uncompressedDirectBuf.remaining() == 0);
   }
@@ -221,7 +221,7 @@ public class ZlibDecompressor implements
     n = inflateBytesDirect();
     uncompressedDirectBuf.limit(n);
 
-    // Get atmost 'len' bytes
+    // Get at most 'len' bytes
     n = Math.min(n, len);
     ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
 
@@ -229,9 +229,9 @@ public class ZlibDecompressor implements
   }
   
   /**
-   * Returns the total number of compressed bytes output so far.
+   * Returns the total number of uncompressed bytes output so far.
    *
-   * @return the total (non-negative) number of compressed bytes output so far
+   * @return the total (non-negative) number of uncompressed bytes output so far
    */
   public synchronized long getBytesWritten() {
     checkStream();
@@ -239,15 +239,30 @@ public class ZlibDecompressor implements
   }
 
   /**
-   * Returns the total number of uncompressed bytes input so far.</p>
+   * Returns the total number of compressed bytes input so far.</p>
    *
-   * @return the total (non-negative) number of uncompressed bytes input so far
+   * @return the total (non-negative) number of compressed bytes input so far
    */
   public synchronized long getBytesRead() {
     checkStream();
     return getBytesRead(stream);
   }
 
+  /**
+   * Returns the number of bytes remaining in the input buffers; normally
+   * called when finished() is true to determine amount of post-gzip-stream
+   * data.</p>
+   *
+   * @return the total (non-negative) number of unprocessed bytes in input
+   */
+  public synchronized int getRemaining() {
+    checkStream();
+    return userBufLen + getRemaining(stream);  // userBuf + compressedDirectBuf
+  }
+
+  /**
+   * Resets everything including the input buffers (user and direct).</p>
+   */
   public synchronized void reset() {
     checkStream();
     reset(stream);
@@ -282,6 +297,7 @@ public class ZlibDecompressor implements
   private native int inflateBytesDirect();
   private native static long getBytesRead(long strm);
   private native static long getBytesWritten(long strm);
+  private native static int getRemaining(long strm);
   private native static void reset(long strm);
   private native static void end(long strm);
 }

Modified: hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c (original)
+++ hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c Wed Jul  7 23:22:28 2010
@@ -291,6 +291,13 @@ Java_org_apache_hadoop_io_compress_zlib_
     return (ZSTREAM(stream))->total_out;
 }
 
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_getRemaining(
+	JNIEnv *env, jclass cls, jlong stream
+	) {
+    return (ZSTREAM(stream))->avail_in;
+}
+
 JNIEXPORT void JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_reset(
 	JNIEnv *env, jclass cls, jlong stream

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java Wed Jul  7 23:22:28 2010
@@ -39,6 +39,7 @@ import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +54,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.CompressorStream;
+import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
@@ -70,8 +72,7 @@ import static org.junit.Assert.*;
 
 public class TestCodec {
 
-  private static final Log LOG= 
-    LogFactory.getLog(TestCodec.class);
+  private static final Log LOG= LogFactory.getLog(TestCodec.class);
 
   private Configuration conf = new Configuration();
   private int count = 10000;
@@ -277,7 +278,7 @@ public class TestCodec {
   @Test
   public void testCodecPoolGzipReuse() throws Exception {
     Configuration conf = new Configuration();
-    conf.setBoolean("hadoop.native.lib", true);
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
     if (!ZlibFactory.isNativeZlibLoaded(conf)) {
       LOG.warn("testCodecPoolGzipReuse skipped: native libs not loaded");
       return;
@@ -362,7 +363,7 @@ public class TestCodec {
   @Test
   public void testCodecInitWithCompressionLevel() throws Exception {
     Configuration conf = new Configuration();
-    conf.setBoolean("io.native.lib.available", true);
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
     if (ZlibFactory.isNativeZlibLoaded(conf)) {
       LOG.info("testCodecInitWithCompressionLevel with native");
       codecTestWithNOCompression(conf,
@@ -374,7 +375,7 @@ public class TestCodec {
                + ": native libs not loaded");
     }
     conf = new Configuration();
-    conf.setBoolean("io.native.lib.available", false);
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
     codecTestWithNOCompression( conf,
                          "org.apache.hadoop.io.compress.DefaultCodec");
   }
@@ -382,14 +383,14 @@ public class TestCodec {
   @Test
   public void testCodecPoolCompressorReinit() throws Exception {
     Configuration conf = new Configuration();
-    conf.setBoolean("hadoop.native.lib", true);
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
     if (ZlibFactory.isNativeZlibLoaded(conf)) {
       GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
       gzipReinitTest(conf, gzc);
     } else {
       LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded");
     }
-    conf.setBoolean("hadoop.native.lib", false);
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
     DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
     gzipReinitTest(conf, dfc);
   }
@@ -486,53 +487,93 @@ public class TestCodec {
       System.err.println("Caught: " + e);
       e.printStackTrace();
     }
-    
+
   }
 
   @Test
-  public void testCodecPoolAndGzipDecompressor() {
-    // BuiltInZlibInflater should not be used as the GzipCodec decompressor.
-    // Assert that this is the case.
+  public void testGzipCompatibility() throws IOException {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("seed: " + seed);
 
-    // Don't use native libs for this test.
-    Configuration conf = new Configuration();
-    conf.setBoolean("hadoop.native.lib", false);
-    assertFalse("ZlibFactory is using native libs against request",
-        ZlibFactory.isNativeZlibLoaded(conf));
+    DataOutputBuffer dflbuf = new DataOutputBuffer();
+    GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+    byte[] b = new byte[r.nextInt(128 * 1024 + 1)];
+    r.nextBytes(b);
+    gzout.write(b);
+    gzout.close();
 
-    // This should give us a BuiltInZlibInflater.
-    Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
-    assertNotNull("zlibDecompressor is null!", zlibDecompressor);
-    assertTrue("ZlibFactory returned unexpected inflator",
-        zlibDecompressor instanceof BuiltInZlibInflater);
+    DataInputBuffer gzbuf = new DataInputBuffer();
+    gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
 
-    // Asking for a decompressor directly from GzipCodec should return null;
-    // its createOutputStream() just wraps the existing stream in a
-    // java.util.zip.GZIPOutputStream.
-    CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
-    CompressionCodec codec = ccf.getCodec(new Path("foo.gz"));
-    assertTrue("Codec for .gz file is not GzipCodec", codec instanceof GzipCodec);
-    Decompressor codecDecompressor = codec.createDecompressor();
-    if (null != codecDecompressor) {
-      fail("Got non-null codecDecompressor: " + codecDecompressor);
-    }
-
-    // Asking the CodecPool for a decompressor for GzipCodec
-    // should return null as well.
-    Decompressor poolDecompressor = CodecPool.getDecompressor(codec);
-    if (null != poolDecompressor) {
-      fail("Got non-null poolDecompressor: " + poolDecompressor);
-    }
+    Configuration conf = new Configuration();
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
+    CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+    Decompressor decom = codec.createDecompressor();
+    assertNotNull(decom);
+    assertEquals(BuiltInGzipDecompressor.class, decom.getClass());
+    InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+    dflbuf.reset();
+    IOUtils.copyBytes(gzin, dflbuf, 4096);
+    final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+    assertArrayEquals(b, dflchk);
+  }
+
+  void GzipConcatTest(Configuration conf,
+      Class<? extends Decompressor> decomClass) throws IOException {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info(decomClass + " seed: " + seed);
+
+    final int CONCAT = r.nextInt(4) + 3;
+    final int BUFLEN = 128 * 1024;
+    DataOutputBuffer dflbuf = new DataOutputBuffer();
+    DataOutputBuffer chkbuf = new DataOutputBuffer();
+    byte[] b = new byte[BUFLEN];
+    for (int i = 0; i < CONCAT; ++i) {
+      GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+      r.nextBytes(b);
+      int len = r.nextInt(BUFLEN);
+      int off = r.nextInt(BUFLEN - len);
+      chkbuf.write(b, off, len);
+      gzout.write(b, off, len);
+      gzout.close();
+    }
+    final byte[] chk = Arrays.copyOf(chkbuf.getData(), chkbuf.getLength());
+
+    CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+    Decompressor decom = codec.createDecompressor();
+    assertNotNull(decom);
+    assertEquals(decomClass, decom.getClass());
+    DataInputBuffer gzbuf = new DataInputBuffer();
+    gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
+    InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+    dflbuf.reset();
+    IOUtils.copyBytes(gzin, dflbuf, 4096);
+    final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+    assertArrayEquals(chk, dflchk);
+  }
 
-    // If we then ensure that the pool is populated...
-    CodecPool.returnDecompressor(zlibDecompressor);
+  @Test
+  public void testBuiltInGzipConcat() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
+    GzipConcatTest(conf, BuiltInGzipDecompressor.class);
+  }
 
-    // Asking the pool another time should still not bind this to GzipCodec.
-    poolDecompressor = CodecPool.getDecompressor(codec);
-    if (null != poolDecompressor) {
-      fail("Second time, got non-null poolDecompressor: "
-          + poolDecompressor);
+  @Test
+  public void testNativeGzipConcat() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
+    if (!ZlibFactory.isNativeZlibLoaded(conf)) {
+      LOG.warn("skipped: native libs not loaded");
+      return;
     }
+    GzipConcatTest(conf, GzipCodec.GzipZlibDecompressor.class);
   }
 
   @Test
@@ -542,7 +583,7 @@ public class TestCodec {
 
     // Don't use native libs for this test.
     Configuration conf = new Configuration();
-    conf.setBoolean("hadoop.native.lib", false);
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
     assertFalse("ZlibFactory is using native libs against request",
         ZlibFactory.isNativeZlibLoaded(conf));
 
@@ -595,7 +636,7 @@ public class TestCodec {
 
     // Don't use native libs for this test.
     Configuration conf = new Configuration();
-    conf.setBoolean("hadoop.native.lib", false);
+    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
     assertFalse("ZlibFactory is using native libs against request",
         ZlibFactory.isNativeZlibLoaded(conf));