You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:25:16 UTC

svn commit: r1077533 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/io/ core/org/apache/hadoop/io/compress/ core/org/apache/hadoop/io/compress/bzip2/ core/org/apache/hadoop/io/compress/zlib/ native/src/org/apache/...

Author: omalley
Date: Fri Mar  4 04:25:16 2011
New Revision: 1077533

URL: http://svn.apache.org/viewvc?rev=1077533&view=rev
Log:
commit c577ce5f8212832949665bb8b694694f956d3a43
Author: Greg Roelofs <ro...@yahoo-inc.com>
Date:   Wed Jul 7 20:23:47 2010 -0700

    HADOOP-6835 from https://issues.apache.org/jira/secure/attachment/12448942/HADOOP-6835.v9.yahoo-0.20.2xx-branch.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
    hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java Fri Mar  4 04:25:16 2011
@@ -41,17 +41,8 @@ public class IOUtils {
   public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
     throws IOException {
 
-    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
-    byte buf[] = new byte[buffSize];
     try {
-      int bytesRead = in.read(buf);
-      while (bytesRead >= 0) {
-        out.write(buf, 0, bytesRead);
-        if ((ps != null) && ps.checkError()) {
-          throw new IOException("Unable to write to output stream.");
-        }
-        bytesRead = in.read(buf);
-      }
+      copyBytes(in, out, buffSize);
     } finally {
       if(close) {
         out.close();
@@ -61,6 +52,28 @@ public class IOUtils {
   }
   
   /**
+   * Copies from one stream to another.
+   * 
+   * @param in InputStrem to read from
+   * @param out OutputStream to write to
+   * @param buffSize the size of the buffer 
+   */
+  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
+    throws IOException {
+
+    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+    byte buf[] = new byte[buffSize];
+    int bytesRead = in.read(buf);
+    while (bytesRead >= 0) {
+      out.write(buf, 0, bytesRead);
+      if ((ps != null) && ps.checkError()) {
+        throw new IOException("Unable to write to output stream.");
+      }
+      bytesRead = in.read(buf);
+    }
+  }
+
+  /**
    * Copies from one stream to another. <strong>closes the input and output streams 
    * at the end</strong>.
    * @param in InputStrem to read from

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java Fri Mar  4 04:25:16 2011
@@ -79,7 +79,9 @@ public class BlockDecompressorStream ext
         }
       }
       if (decompressor.needsInput()) {
-        getCompressedData();
+        int m = getCompressedData();
+        // Send the read data to the decompressor
+        decompressor.setInput(buffer, 0, m);
       }
     }
     
@@ -89,10 +91,10 @@ public class BlockDecompressorStream ext
     return n;
   }
 
-  protected void getCompressedData() throws IOException {
+  protected int getCompressedData() throws IOException {
     checkStream();
 
-    // Get the size of the compressed chunk
+    // Get the size of the compressed chunk (always non-negative)
     int len = rawReadInt();
 
     // Read len bytes from underlying stream 
@@ -103,13 +105,12 @@ public class BlockDecompressorStream ext
     while (n < len) {
       int count = in.read(buffer, off + n, len - n);
       if (count < 0) {
-        throw new EOFException();
+        throw new EOFException("Unexpected end of block in input stream");
       }
       n += count;
     }
     
-    // Send the read data to the decompressor
-    decompressor.setInput(buffer, 0, len);
+    return len;
   }
 
   public void resetState() throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java Fri Mar  4 04:25:16 2011
@@ -29,8 +29,13 @@ import java.io.IOException;
 public interface Decompressor {
   /**
    * Sets input data for decompression. 
-   * This should be called whenever #needsInput() returns 
+   * This should be called if and only if {@link #needsInput()} returns 
    * <code>true</code> indicating that more input data is required.
+   * (Both native and non-native versions of various Decompressors require
+   * that the data passed in via <code>b[]</code> remain unmodified until
+   * the caller is explicitly notified--via {@link #needsInput()}--that the
+   * buffer may be safely modified.  With this requirement, an extra
+   * buffer-copy can be avoided.)
    * 
    * @param b Input data
    * @param off Start offset
@@ -40,10 +45,12 @@ public interface Decompressor {
   
   /**
    * Returns true if the input data buffer is empty and 
-   * #setInput() should be called to provide more input. 
+   * {@link #setInput(byte[], int, int)} should be called to
+   * provide more input. 
    * 
    * @return <code>true</code> if the input data buffer is empty and 
-   * #setInput() should be called in order to provide more input.
+   * {@link #setInput(byte[], int, int)} should be called in
+   * order to provide more input.
    */
   public boolean needsInput();
   
@@ -64,9 +71,9 @@ public interface Decompressor {
   public boolean needsDictionary();
 
   /**
-   * Returns true if the end of the compressed 
+   * Returns true if the end of the decompressed 
    * data output stream has been reached.
-   * @return <code>true</code> if the end of the compressed
+   * @return <code>true</code> if the end of the decompressed
    * data output stream has been reached.
    */
   public boolean finished();
@@ -74,8 +81,8 @@ public interface Decompressor {
   /**
    * Fills specified buffer with uncompressed data. Returns actual number
    * of bytes of uncompressed data. A return value of 0 indicates that
-   * #needsInput() should be called in order to determine if more input
-   * data is required.
+   * {@link #needsInput()} should be called in order to determine if more
+   * input data is required.
    * 
    * @param b Buffer for the compressed data
    * @param off Start offset of the data
@@ -84,12 +91,20 @@ public interface Decompressor {
    * @throws IOException
    */
   public int decompress(byte[] b, int off, int len) throws IOException;
-  
+
   /**
-   * Resets decompressor so that a new set of input data can be processed.
+   * Returns the number of bytes remaining in the compressed-data buffer;
+   * typically called after the decompressor has finished decompressing
+   * the current gzip stream (a.k.a. "member").
+   */
+  public int getRemaining();
+
+  /**
+   * Resets decompressor and input and output buffers so that a new set of
+   * input data can be processed.
    */
   public void reset();
-  
+
   /**
    * Closes the decompressor and discards any unprocessed input.
    */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java Fri Mar  4 04:25:16 2011
@@ -29,8 +29,10 @@ public class DecompressorStream extends 
   protected byte[] buffer;
   protected boolean eof = false;
   protected boolean closed = false;
-  
-  public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) {
+  private int lastBytesSent = 0;
+
+  public DecompressorStream(InputStream in, Decompressor decompressor,
+                            int bufferSize) {
     super(in);
 
     if (in == null || decompressor == null) {
@@ -76,31 +78,78 @@ public class DecompressorStream extends 
 
   protected int decompress(byte[] b, int off, int len) throws IOException {
     int n = 0;
-    
+
     while ((n = decompressor.decompress(b, off, len)) == 0) {
-      if (decompressor.finished() || decompressor.needsDictionary()) {
+      if (decompressor.needsDictionary()) {
         eof = true;
         return -1;
       }
-      if (decompressor.needsInput()) {
-        getCompressedData();
+
+      if (decompressor.finished()) {
+        // First see if there was any leftover buffered input from previous
+        // stream; if not, attempt to refill buffer.  If refill -> EOF, we're
+        // all done; else reset, fix up input buffer, and get ready for next
+        // concatenated substream/"member".
+        int nRemaining = decompressor.getRemaining();
+        if (nRemaining == 0) {
+          int m = getCompressedData();
+          if (m == -1) {
+            // apparently the previous end-of-stream was also end-of-file:
+            // return success, as if we had never called getCompressedData()
+            eof = true;
+            return -1;
+          }
+          decompressor.reset();
+          decompressor.setInput(buffer, 0, m);
+          lastBytesSent = m;
+        } else {
+          // looks like it's a concatenated stream:  reset low-level zlib (or
+          // other engine) and buffers, then "resend" remaining input data
+          decompressor.reset();
+          int leftoverOffset = lastBytesSent - nRemaining;
+          assert (leftoverOffset >= 0);
+          // this recopies userBuf -> direct buffer if using native libraries:
+          decompressor.setInput(buffer, leftoverOffset, nRemaining);
+          // NOTE:  this is the one place we do NOT want to save the number
+          // of bytes sent (nRemaining here) into lastBytesSent:  since we
+          // are resending what we've already sent before, offset is nonzero
+          // in general (only way it could be zero is if it already equals
+          // nRemaining), which would then screw up the offset calculation
+          // _next_ time around.  IOW, getRemaining() is in terms of the
+          // original, zero-offset bufferload, so lastBytesSent must be as
+          // well.  Cheesy ASCII art:
+          //
+          //          <------------ m, lastBytesSent ----------->
+          //          +===============================================+
+          // buffer:  |1111111111|22222222222222222|333333333333|     |
+          //          +===============================================+
+          //     #1:  <-- off -->|<-------- nRemaining --------->
+          //     #2:  <----------- off ----------->|<-- nRem. -->
+          //     #3:  (final substream:  nRemaining == 0; eof = true)
+          //
+          // If lastBytesSent is anything other than m, as shown, then "off"
+          // will be calculated incorrectly.
+        }
+      } else if (decompressor.needsInput()) {
+        int m = getCompressedData();
+        if (m == -1) {
+          throw new EOFException("Unexpected end of input stream");
+        }
+        decompressor.setInput(buffer, 0, m);
+        lastBytesSent = m;
       }
     }
-    
+
     return n;
   }
-  
-  protected void getCompressedData() throws IOException {
+
+  protected int getCompressedData() throws IOException {
     checkStream();
   
-    int n = in.read(buffer, 0, buffer.length);
-    if (n == -1) {
-      throw new EOFException("Unexpected end of input stream");
-    }
-
-    decompressor.setInput(buffer, 0, n);
+    // note that the _caller_ is now required to call setInput() or throw
+    return in.read(buffer, 0, buffer.length);
   }
-  
+
   protected void checkStream() throws IOException {
     if (closed) {
       throw new IOException("Stream closed");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java Fri Mar  4 04:25:16 2011
@@ -86,56 +86,7 @@ public class GzipCodec extends DefaultCo
       ((ResetableGZIPOutputStream) out).resetState();
     }
   }
-  
-  protected static class GzipInputStream extends DecompressorStream {
-    
-    private static class ResetableGZIPInputStream extends GZIPInputStream {
 
-      public ResetableGZIPInputStream(InputStream in) throws IOException {
-        super(in);
-      }
-      
-      public void resetState() throws IOException {
-        inf.reset();
-      }
-    }
-    
-    public GzipInputStream(InputStream in) throws IOException {
-      super(new ResetableGZIPInputStream(in));
-    }
-    
-    /**
-     * Allow subclasses to directly set the inflater stream.
-     */
-    protected GzipInputStream(DecompressorStream in) {
-      super(in);
-    }
-
-    public int available() throws IOException {
-      return in.available(); 
-    }
-
-    public void close() throws IOException {
-      in.close();
-    }
-
-    public int read() throws IOException {
-      return in.read();
-    }
-    
-    public int read(byte[] data, int offset, int len) throws IOException {
-      return in.read(data, offset, len);
-    }
-    
-    public long skip(long offset) throws IOException {
-      return in.skip(offset);
-    }
-    
-    public void resetState() throws IOException {
-      ((ResetableGZIPInputStream) in).resetState();
-    }
-  }  
-  
   public CompressionOutputStream createOutputStream(OutputStream out) 
     throws IOException {
     return (ZlibFactory.isNativeZlibLoaded(conf)) ?
@@ -151,8 +102,7 @@ public class GzipCodec extends DefaultCo
                new CompressorStream(out, compressor,
                                     conf.getInt("io.file.buffer.size", 
                                                 4*1024)) :
-               createOutputStream(out);                                               
-
+               createOutputStream(out);
   }
 
   public Compressor createCompressor() {
@@ -164,38 +114,34 @@ public class GzipCodec extends DefaultCo
   public Class<? extends Compressor> getCompressorType() {
     return ZlibFactory.isNativeZlibLoaded(conf)
       ? GzipZlibCompressor.class
-      : BuiltInZlibDeflater.class;
+      : null;
   }
 
-  public CompressionInputStream createInputStream(InputStream in) 
+  public CompressionInputStream createInputStream(InputStream in)
   throws IOException {
-  return (ZlibFactory.isNativeZlibLoaded(conf)) ?
-             new DecompressorStream(in, createDecompressor(),
-                                    conf.getInt("io.file.buffer.size", 
-                                                4*1024)) :
-             new GzipInputStream(in);                                         
+    return createInputStream(in, null);
   }
 
-  public CompressionInputStream createInputStream(InputStream in, 
-                                                  Decompressor decompressor) 
+  public CompressionInputStream createInputStream(InputStream in,
+                                                  Decompressor decompressor)
   throws IOException {
-    return (decompressor != null) ? 
-               new DecompressorStream(in, decompressor,
-                                      conf.getInt("io.file.buffer.size", 
-                                                  4*1024)) :
-               createInputStream(in); 
+    if (decompressor == null) {
+      decompressor = createDecompressor();  // always succeeds (or throws)
+    }
+    return new DecompressorStream(in, decompressor,
+                                  conf.getInt("io.file.buffer.size", 4*1024));
   }
 
   public Decompressor createDecompressor() {
     return (ZlibFactory.isNativeZlibLoaded(conf))
       ? new GzipZlibDecompressor()
-      : null;
+      : new BuiltInGzipDecompressor();
   }
 
   public Class<? extends Decompressor> getDecompressorType() {
     return ZlibFactory.isNativeZlibLoaded(conf)
       ? GzipZlibDecompressor.class
-      : BuiltInZlibInflater.class;
+      : BuiltInGzipDecompressor.class;
   }
 
   public String getDefaultExtension() {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java Fri Mar  4 04:25:16 2011
@@ -1,3 +1,21 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
 package org.apache.hadoop.io.compress.bzip2;
 
 import java.io.IOException;
@@ -35,6 +53,11 @@ public class BZip2DummyDecompressor impl
   }
 
   @Override
+  public int getRemaining() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void reset() {
     // do nothing
   }

Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java?rev=1077533&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java Fri Mar  4 04:25:16 2011
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A {@link Decompressor} based on the popular gzip compressed file format.
+ * http://www.gzip.org/
+ *
+ */
+public class BuiltInGzipDecompressor implements Decompressor {
+  private static final int GZIP_MAGIC_ID = 0x8b1f;  // if read as LE short int
+  private static final int GZIP_DEFLATE_METHOD = 8;
+  private static final int GZIP_FLAGBIT_HEADER_CRC  = 0x02;
+  private static final int GZIP_FLAGBIT_EXTRA_FIELD = 0x04;
+  private static final int GZIP_FLAGBIT_FILENAME    = 0x08;
+  private static final int GZIP_FLAGBIT_COMMENT     = 0x10;
+  private static final int GZIP_FLAGBITS_RESERVED   = 0xe0;
+
+  // 'true' (nowrap) => Inflater will handle raw deflate stream only
+  private Inflater inflater = new Inflater(true);
+
+  private byte[] userBuf = null;
+  private int userBufOff = 0;
+  private int userBufLen = 0;
+
+  private byte[] localBuf = new byte[256];
+  private int localBufOff = 0;
+
+  private int headerBytesRead = 0;
+  private int trailerBytesRead = 0;
+  private int numExtraFieldBytesRemaining = -1;
+  private CRC32 crc = new CRC32();
+  private boolean hasExtraField = false;
+  private boolean hasFilename = false;
+  private boolean hasComment = false;
+  private boolean hasHeaderCRC = false;
+
+  private GzipStateLabel state;
+
+  /**
+   * The current state of the gzip decoder, external to the Inflater context.
+   * (Technically, the private variables localBuf through hasHeaderCRC are
+   * also part of the state, so this enum is merely the label for it.)
+   */
+  private static enum GzipStateLabel {
+    /**
+     * Immediately prior to or (strictly) within the 10-byte basic gzip header.
+     */
+    HEADER_BASIC,
+    /**
+     * Immediately prior to or within the optional "extra field."
+     */
+    HEADER_EXTRA_FIELD,
+    /**
+     * Immediately prior to or within the optional filename field.
+     */
+    HEADER_FILENAME,
+    /**
+     * Immediately prior to or within the optional comment field.
+     */
+    HEADER_COMMENT,
+    /**
+     * Immediately prior to or within the optional 2-byte header CRC value.
+     */
+    HEADER_CRC,
+    /**
+     * Immediately prior to or within the main compressed (deflate) data stream.
+     */
+    DEFLATE_STREAM,
+    /**
+     * Immediately prior to or (strictly) within the 4-byte uncompressed CRC.
+     */
+    TRAILER_CRC,
+    /**
+     * Immediately prior to or (strictly) within the 4-byte uncompressed size.
+     */
+    TRAILER_SIZE,
+    /**
+     * Immediately after the trailer (and potentially prior to the next gzip
+     * member/substream header), without reset() having been called.
+     */
+    FINISHED;
+  }
+
+  /**
+   * Creates a new (pure Java) gzip decompressor.
+   */
+  public BuiltInGzipDecompressor() {
+    state = GzipStateLabel.HEADER_BASIC;
+    crc.reset();
+    // FIXME? Inflater docs say:  'it is also necessary to provide an extra
+    //        "dummy" byte as input. This is required by the ZLIB native
+    //        library in order to support certain optimizations.'  However,
+    //        this does not appear to be true, and in any case, it's not
+    //        entirely clear where the byte should go or what its value
+    //        should be.  Perhaps it suffices to have some deflated bytes
+    //        in the first buffer load?  (But how else would one do it?)
+  }
+
+  /** {@inheritDoc} */
+  public synchronized boolean needsInput() {
+    if (state == GzipStateLabel.DEFLATE_STREAM) {  // most common case
+      return inflater.needsInput();
+    }
+    // see userBufLen comment at top of decompress(); currently no need to
+    // verify userBufLen <= 0
+    return (state != GzipStateLabel.FINISHED);
+  }
+
+  /** {@inheritDoc} */
+  /*
+   * In our case, the input data includes both gzip header/trailer bytes (which
+   * we handle in executeState()) and deflate-stream bytes (which we hand off
+   * to Inflater).
+   *
+   * NOTE:  This code assumes the data passed in via b[] remains unmodified
+   *        until _we_ signal that it's safe to modify it (via needsInput()).
+   *        The alternative would require an additional buffer-copy even for
+   *        the bulk deflate stream, which is a performance hit we don't want
+   *        to absorb.  (Decompressor now documents this requirement.)
+   */
+  public synchronized void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    userBuf = b;
+    userBufOff = off;
+    userBufLen = len;  // note:  might be zero
+  }
+
+  /**
+   * Decompress the data (gzip header, deflate stream, gzip trailer) in the
+   * provided buffer.
+   *
+   * @return the number of decompressed bytes placed into b
+   */
+  /* From the caller's perspective, this is where the state machine lives.
+   * The code is written such that we never return from decompress() with
+   * data remaining in userBuf unless we're in FINISHED state and there was
+   * data beyond the current gzip member (e.g., we're within a concatenated
+   * gzip stream).  If this ever changes, {@link #needsInput()} will also
+   * need to be modified (i.e., uncomment the userBufLen condition).
+   *
+   * The actual deflate-stream processing (decompression) is handled by
+   * Java's Inflater class.  Unlike the gzip header/trailer code (execute*
+   * methods below), the deflate stream is never copied; Inflater operates
+   * directly on the user's buffer.
+   */
+  public synchronized int decompress(byte[] b, int off, int len)
+  throws IOException {
+    int numAvailBytes = 0;
+
+    if (state != GzipStateLabel.DEFLATE_STREAM) {
+      executeHeaderState();
+
+      if (userBufLen <= 0) {
+        return numAvailBytes;
+      }
+    }
+
+    // "executeDeflateStreamState()"
+    if (state == GzipStateLabel.DEFLATE_STREAM) {
+      // hand off user data (or what's left of it) to Inflater--but note that
+      // Inflater may not have consumed all of previous bufferload (e.g., if
+      // data highly compressed or output buffer very small), in which case
+      // userBufLen will be zero
+      if (userBufLen > 0) {
+        inflater.setInput(userBuf, userBufOff, userBufLen);
+        userBufOff += userBufLen;
+        userBufLen = 0;
+      }
+
+      // now decompress it into b[]
+      try {
+        numAvailBytes = inflater.inflate(b, off, len);
+      } catch (DataFormatException dfe) {
+        throw new IOException(dfe.getMessage());
+      }
+      crc.update(b, off, numAvailBytes);  // CRC-32 is on _uncompressed_ data
+      if (inflater.finished()) {
+        state = GzipStateLabel.TRAILER_CRC;
+        int bytesRemaining = inflater.getRemaining();
+        assert (bytesRemaining >= 0) :
+          "logic error: Inflater finished; byte-count is inconsistent";
+          // could save a copy of userBufLen at call to inflater.setInput() and
+          // verify that bytesRemaining <= origUserBufLen, but would have to
+          // be a (class) member variable...seems excessive for a sanity check
+        userBufOff -= bytesRemaining;
+        userBufLen = bytesRemaining;   // or "+=", but guaranteed 0 coming in
+      } else {
+        return numAvailBytes;  // minor optimization
+      }
+    }
+
+    executeTrailerState();
+
+    return numAvailBytes;
+  }
+
+  /**
+   * Parse the gzip header (assuming we're in the appropriate state).
+   * In order to deal with degenerate cases (e.g., user buffer is one byte
+   * long), we copy (some) header bytes to another buffer.  (Filename,
+   * comment, and extra-field bytes are simply skipped.)</p>
+   *
+   * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec.  Note that
+   * no version of gzip to date (at least through 1.4.0, 2010-01-20) supports
+   * the FHCRC header-CRC16 flagbit; instead, the implementation treats it
+   * as a multi-file continuation flag (which it also doesn't support). :-(
+   * Sun's JDK v6 (1.6) supports the header CRC, however, and so do we.
+   */
+  private void executeHeaderState() throws IOException {
+
+    // this can happen because DecompressorStream's decompress() is written
+    // to call decompress() first, setInput() second:
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // "basic"/required header:  somewhere in first 10 bytes
+    if (state == GzipStateLabel.HEADER_BASIC) {
+      int n = Math.min(userBufLen, 10-localBufOff);  // (or 10-headerBytesRead)
+      checkAndCopyBytesToLocal(n);  // modifies userBufLen, etc.
+      if (localBufOff >= 10) {      // should be strictly ==
+        processBasicHeader();       // sig, compression method, flagbits
+        localBufOff = 0;            // no further need for basic header
+        state = GzipStateLabel.HEADER_EXTRA_FIELD;
+      }
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // optional header stuff (extra field, filename, comment, header CRC)
+
+    if (state == GzipStateLabel.HEADER_EXTRA_FIELD) {
+      if (hasExtraField) {
+        // 2 substates:  waiting for 2 bytes => get numExtraFieldBytesRemaining,
+        // or already have 2 bytes & waiting to finish skipping specified length
+        if (numExtraFieldBytesRemaining < 0) {
+          int n = Math.min(userBufLen, 2-localBufOff);
+          checkAndCopyBytesToLocal(n);
+          if (localBufOff >= 2) {
+            numExtraFieldBytesRemaining = readUShortLE(localBuf, 0);
+            localBufOff = 0;
+          }
+        }
+        if (numExtraFieldBytesRemaining > 0 && userBufLen > 0) {
+          int n = Math.min(userBufLen, numExtraFieldBytesRemaining);
+          checkAndSkipBytes(n);     // modifies userBufLen, etc.
+          numExtraFieldBytesRemaining -= n;
+        }
+        if (numExtraFieldBytesRemaining == 0) {
+          state = GzipStateLabel.HEADER_FILENAME;
+        }
+      } else {
+        state = GzipStateLabel.HEADER_FILENAME;
+      }
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    if (state == GzipStateLabel.HEADER_FILENAME) {
+      if (hasFilename) {
+        boolean doneWithFilename = checkAndSkipBytesUntilNull();
+        if (!doneWithFilename) {
+          return;  // exit early:  used up entire buffer without hitting NULL
+        }
+      }
+      state = GzipStateLabel.HEADER_COMMENT;
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    if (state == GzipStateLabel.HEADER_COMMENT) {
+      if (hasComment) {
+        boolean doneWithComment = checkAndSkipBytesUntilNull();
+        if (!doneWithComment) {
+          return;  // exit early:  used up entire buffer
+        }
+      }
+      state = GzipStateLabel.HEADER_CRC;
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    if (state == GzipStateLabel.HEADER_CRC) {
+      if (hasHeaderCRC) {
+        assert (localBufOff < 2);
+        int n = Math.min(userBufLen, 2-localBufOff);
+        copyBytesToLocal(n);
+        if (localBufOff >= 2) {
+          long headerCRC = readUShortLE(localBuf, 0);
+          if (headerCRC != (crc.getValue() & 0xffff)) {
+            throw new IOException("gzip header CRC failure");
+          }
+          localBufOff = 0;
+          crc.reset();
+          state = GzipStateLabel.DEFLATE_STREAM;
+        }
+      } else {
+        crc.reset();   // will reuse for CRC-32 of uncompressed data
+        state = GzipStateLabel.DEFLATE_STREAM;  // switching to Inflater now
+      }
+    }
+  }
+
+  /**
+   * Parse the gzip trailer (assuming we're in the appropriate state).
+   * In order to deal with degenerate cases (e.g., user buffer is one byte
+   * long), we copy trailer bytes (all 8 of 'em) to a local buffer.</p>
+   *
+   * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec.
+   */
+  private void executeTrailerState() throws IOException {
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // verify that the CRC-32 of the decompressed stream matches the value
+    // stored in the gzip trailer
+    if (state == GzipStateLabel.TRAILER_CRC) {
+      // localBuf was empty before we handed off to Inflater, so we handle this
+      // exactly like header fields
+      assert (localBufOff < 4);  // initially 0, but may need multiple calls
+      int n = Math.min(userBufLen, 4-localBufOff);
+      copyBytesToLocal(n);
+      if (localBufOff >= 4) {
+        long streamCRC = readUIntLE(localBuf, 0);
+        if (streamCRC != crc.getValue()) {
+          throw new IOException("gzip stream CRC failure");
+        }
+        localBufOff = 0;
+        crc.reset();
+        state = GzipStateLabel.TRAILER_SIZE;
+      }
+    }
+
+    if (userBufLen <= 0) {
+      return;
+    }
+
+    // verify that the mod-2^32 decompressed stream size matches the value
+    // stored in the gzip trailer
+    if (state == GzipStateLabel.TRAILER_SIZE) {
+      assert (localBufOff < 4);  // initially 0, but may need multiple calls
+      int n = Math.min(userBufLen, 4-localBufOff);
+      copyBytesToLocal(n);       // modifies userBufLen, etc.
+      if (localBufOff >= 4) {    // should be strictly ==
+        long inputSize = readUIntLE(localBuf, 0);
+        if (inputSize != (inflater.getBytesWritten() & 0xffffffff)) {
+          throw new IOException(
+            "stored gzip size doesn't match decompressed size");
+        }
+        localBufOff = 0;
+        state = GzipStateLabel.FINISHED;
+      }
+    }
+
+    if (state == GzipStateLabel.FINISHED) {
+      return;
+    }
+  }
+
+  /**
+   * Returns the total number of compressed bytes input so far, including
+   * gzip header/trailer bytes.</p>
+   *
+   * @return the total (non-negative) number of compressed bytes read so far
+   */
+  public synchronized long getBytesRead() {
+    return headerBytesRead + inflater.getBytesRead() + trailerBytesRead;
+  }
+
+  /**
+   * Returns the number of bytes remaining in the input buffer; normally
+   * called when finished() is true to determine amount of post-gzip-stream
+   * data.  Note that, other than the finished state with concatenated data
+   * after the end of the current gzip stream, this will never return a
+   * non-zero value unless called after {@link #setInput(byte[] b, int off,
+   * int len)} and before {@link #decompress(byte[] b, int off, int len)}.
+   * (That is, after {@link #decompress(byte[] b, int off, int len)} it
+   * always returns zero, except in finished state with concatenated data.)</p>
+   *
+   * @return the total (non-negative) number of unprocessed bytes in input
+   */
+  public synchronized int getRemaining() {
+    return userBufLen;
+  }
+
+  /** {@inheritDoc} */
+  public synchronized boolean needsDictionary() {
+    return inflater.needsDictionary();
+  }
+
+  /** {@inheritDoc} */
+  public synchronized void setDictionary(byte[] b, int off, int len) {
+    inflater.setDictionary(b, off, len);
+  }
+
+  /**
+   * Returns true if the end of the gzip substream (single "member") has been
+   * reached.</p>
+   */
+  public synchronized boolean finished() {
+    return (state == GzipStateLabel.FINISHED);
+  }
+
+  /**
+   * Resets everything, including the input buffer, regardless of whether the
+   * current gzip substream is finished.</p>
+   */
+  public synchronized void reset() {
+    // could optionally emit INFO message if state != GzipStateLabel.FINISHED
+    inflater.reset();
+    state = GzipStateLabel.HEADER_BASIC;
+    crc.reset();
+    userBufOff = userBufLen = 0;
+    localBufOff = 0;
+    headerBytesRead = 0;
+    trailerBytesRead = 0;
+    numExtraFieldBytesRemaining = -1;
+    hasExtraField = false;
+    hasFilename = false;
+    hasComment = false;
+    hasHeaderCRC = false;
+  }
+
+  /** {@inheritDoc} */
+  public synchronized void end() {
+    inflater.end();
+  }
+
+  /**
+   * Check ID bytes (throw if necessary), compression method (throw if not 8),
+   * and flag bits (set hasExtraField, hasFilename, hasComment, hasHeaderCRC).
+   * Ignore MTIME, XFL, OS.  Caller must ensure we have at least 10 bytes (at
+   * the start of localBuf).</p>
+   */
+  /*
+   * Flag bits (remainder are reserved and must be zero):
+   *   bit 0   FTEXT
+   *   bit 1   FHCRC   (never implemented in gzip, at least through version
+   *                   1.4.0; instead interpreted as "continuation of multi-
+   *                   part gzip file," which is unsupported through 1.4.0)
+   *   bit 2   FEXTRA
+   *   bit 3   FNAME
+   *   bit 4   FCOMMENT
+   *  [bit 5   encrypted]
+   */
+  private void processBasicHeader() throws IOException {
+    if (readUShortLE(localBuf, 0) != GZIP_MAGIC_ID) {
+      throw new IOException("not a gzip file");
+    }
+    if (readUByte(localBuf, 2) != GZIP_DEFLATE_METHOD) {
+      throw new IOException("gzip data not compressed with deflate method");
+    }
+    int flg = readUByte(localBuf, 3);
+    if ((flg & GZIP_FLAGBITS_RESERVED) != 0) {
+      throw new IOException("unknown gzip format (reserved flagbits set)");
+    }
+    hasExtraField = ((flg & GZIP_FLAGBIT_EXTRA_FIELD) != 0);
+    hasFilename   = ((flg & GZIP_FLAGBIT_FILENAME)    != 0);
+    hasComment    = ((flg & GZIP_FLAGBIT_COMMENT)     != 0);
+    hasHeaderCRC  = ((flg & GZIP_FLAGBIT_HEADER_CRC)  != 0);
+  }
+
+  private void checkAndCopyBytesToLocal(int len) {
+    System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+    localBufOff += len;
+    // alternatively, could call checkAndSkipBytes(len) for rest...
+    crc.update(userBuf, userBufOff, len);
+    userBufOff += len;
+    userBufLen -= len;
+    headerBytesRead += len;
+  }
+
+  private void checkAndSkipBytes(int len) {
+    crc.update(userBuf, userBufOff, len);
+    userBufOff += len;
+    userBufLen -= len;
+    headerBytesRead += len;
+  }
+
+  // returns true if saw NULL, false if ran out of buffer first; called _only_
+  // during gzip-header processing (not trailer)
+  // (caller can check before/after state of userBufLen to compute num bytes)
+  private boolean checkAndSkipBytesUntilNull() {
+    boolean hitNull = false;
+    if (userBufLen > 0) {
+      do {
+        hitNull = (userBuf[userBufOff] == 0);
+        crc.update(userBuf[userBufOff]);
+        ++userBufOff;
+        --userBufLen;
+        ++headerBytesRead;
+      } while (userBufLen > 0 && !hitNull);
+    }
+    return hitNull;
+  }
+
+  // this one doesn't update the CRC and does support trailer processing but
+  // otherwise is same as its "checkAnd" sibling
+  private void copyBytesToLocal(int len) {
+    System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+    localBufOff += len;
+    userBufOff += len;
+    userBufLen -= len;
+    if (state == GzipStateLabel.TRAILER_CRC ||
+        state == GzipStateLabel.TRAILER_SIZE) {
+      trailerBytesRead += len;
+    } else {
+      headerBytesRead += len;
+    }
+  }
+
+  private int readUByte(byte[] b, int off) {
+    return ((int)b[off] & 0xff);
+  }
+
+  // caller is responsible for not overrunning buffer
+  private int readUShortLE(byte[] b, int off) {
+    return ((((b[off+1] & 0xff) << 8) |
+             ((b[off]   & 0xff)     )) & 0xffff);
+  }
+
+  // caller is responsible for not overrunning buffer
+  private long readUIntLE(byte[] b, int off) {
+    return ((((long)(b[off+3] & 0xff) << 24) |
+             ((long)(b[off+2] & 0xff) << 16) |
+             ((long)(b[off+1] & 0xff) <<  8) |
+             ((long)(b[off]   & 0xff)      )) & 0xffffffff);
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Fri Mar  4 04:25:16 2011
@@ -166,7 +166,7 @@ public class ZlibDecompressor implements
   }
 
   public synchronized boolean needsInput() {
-    // Consume remanining compressed data?
+    // Consume remaining compressed data?
     if (uncompressedDirectBuf.remaining() > 0) {
       return false;
     }
@@ -189,7 +189,7 @@ public class ZlibDecompressor implements
   }
 
   public synchronized boolean finished() {
-    // Check if 'zlib' says its 'finished' and
+    // Check if 'zlib' says it's 'finished' and
     // all compressed data has been consumed
     return (finished && uncompressedDirectBuf.remaining() == 0);
   }
@@ -221,7 +221,7 @@ public class ZlibDecompressor implements
     n = inflateBytesDirect();
     uncompressedDirectBuf.limit(n);
 
-    // Get atmost 'len' bytes
+    // Get at most 'len' bytes
     n = Math.min(n, len);
     ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
 
@@ -229,9 +229,9 @@ public class ZlibDecompressor implements
   }
   
   /**
-   * Returns the total number of compressed bytes output so far.
+   * Returns the total number of uncompressed bytes output so far.
    *
-   * @return the total (non-negative) number of compressed bytes output so far
+   * @return the total (non-negative) number of uncompressed bytes output so far
    */
   public synchronized long getBytesWritten() {
     checkStream();
@@ -239,15 +239,30 @@ public class ZlibDecompressor implements
   }
 
   /**
-   * Returns the total number of uncompressed bytes input so far.</p>
+   * Returns the total number of compressed bytes input so far.</p>
    *
-   * @return the total (non-negative) number of uncompressed bytes input so far
+   * @return the total (non-negative) number of compressed bytes input so far
    */
   public synchronized long getBytesRead() {
     checkStream();
     return getBytesRead(stream);
   }
 
+  /**
+   * Returns the number of bytes remaining in the input buffers; normally
+   * called when finished() is true to determine amount of post-gzip-stream
+   * data.</p>
+   *
+   * @return the total (non-negative) number of unprocessed bytes in input
+   */
+  public synchronized int getRemaining() {
+    checkStream();
+    return userBufLen + getRemaining(stream);  // userBuf + compressedDirectBuf
+  }
+
+  /**
+   * Resets everything including the input buffers (user and direct).</p>
+   */
   public synchronized void reset() {
     checkStream();
     reset(stream);
@@ -282,6 +297,7 @@ public class ZlibDecompressor implements
   private native int inflateBytesDirect();
   private native static long getBytesRead(long strm);
   private native static long getBytesWritten(long strm);
+  private native static int getRemaining(long strm);
   private native static void reset(long strm);
   private native static void end(long strm);
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c Fri Mar  4 04:25:16 2011
@@ -291,6 +291,13 @@ Java_org_apache_hadoop_io_compress_zlib_
     return (ZSTREAM(stream))->total_out;
 }
 
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_getRemaining(
+	JNIEnv *env, jclass cls, jlong stream
+	) {
+    return (ZSTREAM(stream))->avail_in;
+}
+
 JNIEXPORT void JNICALL
 Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_reset(
 	JNIEnv *env, jclass cls, jlong stream

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java Fri Mar  4 04:25:16 2011
@@ -23,8 +23,10 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Random;
+import java.util.zip.GZIPOutputStream;
 
 import junit.framework.TestCase;
 
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.RandomDatum;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -43,14 +46,14 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 
 public class TestCodec extends TestCase {
 
-  private static final Log LOG= 
-    LogFactory.getLog(TestCodec.class);
+  private static final Log LOG= LogFactory.getLog(TestCodec.class);
 
   private Configuration conf = new Configuration();
   private int count = 10000;
@@ -298,9 +301,92 @@ public class TestCodec extends TestCase 
       System.err.println("Caught: " + e);
       e.printStackTrace();
     }
-    
+
   }
 
+  public void testGzipCompatibility() throws IOException {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info("seed: " + seed);
+
+    DataOutputBuffer dflbuf = new DataOutputBuffer();
+    GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+    byte[] b = new byte[r.nextInt(128 * 1024 + 1)];
+    r.nextBytes(b);
+    gzout.write(b);
+    gzout.close();
+
+    DataInputBuffer gzbuf = new DataInputBuffer();
+    gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
+
+    Configuration conf = new Configuration();
+    conf.setBoolean("hadoop.native.lib", false);
+    CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+    Decompressor decom = codec.createDecompressor();
+    assertNotNull(decom);
+    assertEquals(BuiltInGzipDecompressor.class, decom.getClass());
+    InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+    dflbuf.reset();
+    IOUtils.copyBytes(gzin, dflbuf, 4096);
+    final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+    assertTrue(java.util.Arrays.equals(b, dflchk));
+  }
+
+  void GzipConcatTest(Configuration conf,
+      Class<? extends Decompressor> decomClass) throws IOException {
+    Random r = new Random();
+    long seed = r.nextLong();
+    r.setSeed(seed);
+    LOG.info(decomClass + " seed: " + seed);
+
+    final int CONCAT = r.nextInt(4) + 3;
+    final int BUFLEN = 128 * 1024;
+    DataOutputBuffer dflbuf = new DataOutputBuffer();
+    DataOutputBuffer chkbuf = new DataOutputBuffer();
+    byte[] b = new byte[BUFLEN];
+    for (int i = 0; i < CONCAT; ++i) {
+      GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+      r.nextBytes(b);
+      int len = r.nextInt(BUFLEN);
+      int off = r.nextInt(BUFLEN - len);
+      chkbuf.write(b, off, len);
+      gzout.write(b, off, len);
+      gzout.close();
+    }
+    final byte[] chk = Arrays.copyOf(chkbuf.getData(), chkbuf.getLength());
+
+    CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+    Decompressor decom = codec.createDecompressor();
+    assertNotNull(decom);
+    assertEquals(decomClass, decom.getClass());
+    DataInputBuffer gzbuf = new DataInputBuffer();
+    gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
+    InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+    dflbuf.reset();
+    IOUtils.copyBytes(gzin, dflbuf, 4096);
+    final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+    assertTrue(java.util.Arrays.equals(chk, dflchk));
+  }
+
+  public void testBuiltInGzipConcat() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("hadoop.native.lib", false);
+    GzipConcatTest(conf, BuiltInGzipDecompressor.class);
+  }
+
+  public void testNativeGzipConcat() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean("hadoop.native.lib", true);
+    if (!ZlibFactory.isNativeZlibLoaded(conf)) {
+      LOG.warn("skipped: native libs not loaded");
+      return;
+    }
+    GzipConcatTest(conf, GzipCodec.GzipZlibDecompressor.class);
+    }
+
   public TestCodec(String name) {
     super(name);
   }