You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:25:16 UTC
svn commit: r1077533 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/io/ core/org/apache/hadoop/io/compress/
core/org/apache/hadoop/io/compress/bzip2/
core/org/apache/hadoop/io/compress/zlib/ native/src/org/apache/...
Author: omalley
Date: Fri Mar 4 04:25:16 2011
New Revision: 1077533
URL: http://svn.apache.org/viewvc?rev=1077533&view=rev
Log:
commit c577ce5f8212832949665bb8b694694f956d3a43
Author: Greg Roelofs <ro...@yahoo-inc.com>
Date: Wed Jul 7 20:23:47 2010 -0700
HADOOP-6835 from https://issues.apache.org/jira/secure/attachment/12448942/HADOOP-6835.v9.yahoo-0.20.2xx-branch.patch
Added:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/IOUtils.java Fri Mar 4 04:25:16 2011
@@ -41,17 +41,8 @@ public class IOUtils {
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
throws IOException {
- PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
- byte buf[] = new byte[buffSize];
try {
- int bytesRead = in.read(buf);
- while (bytesRead >= 0) {
- out.write(buf, 0, bytesRead);
- if ((ps != null) && ps.checkError()) {
- throw new IOException("Unable to write to output stream.");
- }
- bytesRead = in.read(buf);
- }
+ copyBytes(in, out, buffSize);
} finally {
if(close) {
out.close();
@@ -61,6 +52,28 @@ public class IOUtils {
}
/**
+ * Copies from one stream to another.
+ *
+ * @param in InputStrem to read from
+ * @param out OutputStream to write to
+ * @param buffSize the size of the buffer
+ */
+ public static void copyBytes(InputStream in, OutputStream out, int buffSize)
+ throws IOException {
+
+ PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+ byte buf[] = new byte[buffSize];
+ int bytesRead = in.read(buf);
+ while (bytesRead >= 0) {
+ out.write(buf, 0, bytesRead);
+ if ((ps != null) && ps.checkError()) {
+ throw new IOException("Unable to write to output stream.");
+ }
+ bytesRead = in.read(buf);
+ }
+ }
+
+ /**
* Copies from one stream to another. <strong>closes the input and output streams
* at the end</strong>.
* @param in InputStrem to read from
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/BlockDecompressorStream.java Fri Mar 4 04:25:16 2011
@@ -79,7 +79,9 @@ public class BlockDecompressorStream ext
}
}
if (decompressor.needsInput()) {
- getCompressedData();
+ int m = getCompressedData();
+ // Send the read data to the decompressor
+ decompressor.setInput(buffer, 0, m);
}
}
@@ -89,10 +91,10 @@ public class BlockDecompressorStream ext
return n;
}
- protected void getCompressedData() throws IOException {
+ protected int getCompressedData() throws IOException {
checkStream();
- // Get the size of the compressed chunk
+ // Get the size of the compressed chunk (always non-negative)
int len = rawReadInt();
// Read len bytes from underlying stream
@@ -103,13 +105,12 @@ public class BlockDecompressorStream ext
while (n < len) {
int count = in.read(buffer, off + n, len - n);
if (count < 0) {
- throw new EOFException();
+ throw new EOFException("Unexpected end of block in input stream");
}
n += count;
}
- // Send the read data to the decompressor
- decompressor.setInput(buffer, 0, len);
+ return len;
}
public void resetState() throws IOException {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/Decompressor.java Fri Mar 4 04:25:16 2011
@@ -29,8 +29,13 @@ import java.io.IOException;
public interface Decompressor {
/**
* Sets input data for decompression.
- * This should be called whenever #needsInput() returns
+ * This should be called if and only if {@link #needsInput()} returns
* <code>true</code> indicating that more input data is required.
+ * (Both native and non-native versions of various Decompressors require
+ * that the data passed in via <code>b[]</code> remain unmodified until
+ * the caller is explicitly notified--via {@link #needsInput()}--that the
+ * buffer may be safely modified. With this requirement, an extra
+ * buffer-copy can be avoided.)
*
* @param b Input data
* @param off Start offset
@@ -40,10 +45,12 @@ public interface Decompressor {
/**
* Returns true if the input data buffer is empty and
- * #setInput() should be called to provide more input.
+ * {@link #setInput(byte[], int, int)} should be called to
+ * provide more input.
*
* @return <code>true</code> if the input data buffer is empty and
- * #setInput() should be called in order to provide more input.
+ * {@link #setInput(byte[], int, int)} should be called in
+ * order to provide more input.
*/
public boolean needsInput();
@@ -64,9 +71,9 @@ public interface Decompressor {
public boolean needsDictionary();
/**
- * Returns true if the end of the compressed
+ * Returns true if the end of the decompressed
* data output stream has been reached.
- * @return <code>true</code> if the end of the compressed
+ * @return <code>true</code> if the end of the decompressed
* data output stream has been reached.
*/
public boolean finished();
@@ -74,8 +81,8 @@ public interface Decompressor {
/**
* Fills specified buffer with uncompressed data. Returns actual number
* of bytes of uncompressed data. A return value of 0 indicates that
- * #needsInput() should be called in order to determine if more input
- * data is required.
+ * {@link #needsInput()} should be called in order to determine if more
+ * input data is required.
*
* @param b Buffer for the compressed data
* @param off Start offset of the data
@@ -84,12 +91,20 @@ public interface Decompressor {
* @throws IOException
*/
public int decompress(byte[] b, int off, int len) throws IOException;
-
+
/**
- * Resets decompressor so that a new set of input data can be processed.
+ * Returns the number of bytes remaining in the compressed-data buffer;
+ * typically called after the decompressor has finished decompressing
+ * the current gzip stream (a.k.a. "member").
+ */
+ public int getRemaining();
+
+ /**
+ * Resets decompressor and input and output buffers so that a new set of
+ * input data can be processed.
*/
public void reset();
-
+
/**
* Closes the decompressor and discards any unprocessed input.
*/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/DecompressorStream.java Fri Mar 4 04:25:16 2011
@@ -29,8 +29,10 @@ public class DecompressorStream extends
protected byte[] buffer;
protected boolean eof = false;
protected boolean closed = false;
-
- public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) {
+ private int lastBytesSent = 0;
+
+ public DecompressorStream(InputStream in, Decompressor decompressor,
+ int bufferSize) {
super(in);
if (in == null || decompressor == null) {
@@ -76,31 +78,78 @@ public class DecompressorStream extends
protected int decompress(byte[] b, int off, int len) throws IOException {
int n = 0;
-
+
while ((n = decompressor.decompress(b, off, len)) == 0) {
- if (decompressor.finished() || decompressor.needsDictionary()) {
+ if (decompressor.needsDictionary()) {
eof = true;
return -1;
}
- if (decompressor.needsInput()) {
- getCompressedData();
+
+ if (decompressor.finished()) {
+ // First see if there was any leftover buffered input from previous
+ // stream; if not, attempt to refill buffer. If refill -> EOF, we're
+ // all done; else reset, fix up input buffer, and get ready for next
+ // concatenated substream/"member".
+ int nRemaining = decompressor.getRemaining();
+ if (nRemaining == 0) {
+ int m = getCompressedData();
+ if (m == -1) {
+ // apparently the previous end-of-stream was also end-of-file:
+ // return success, as if we had never called getCompressedData()
+ eof = true;
+ return -1;
+ }
+ decompressor.reset();
+ decompressor.setInput(buffer, 0, m);
+ lastBytesSent = m;
+ } else {
+ // looks like it's a concatenated stream: reset low-level zlib (or
+ // other engine) and buffers, then "resend" remaining input data
+ decompressor.reset();
+ int leftoverOffset = lastBytesSent - nRemaining;
+ assert (leftoverOffset >= 0);
+ // this recopies userBuf -> direct buffer if using native libraries:
+ decompressor.setInput(buffer, leftoverOffset, nRemaining);
+ // NOTE: this is the one place we do NOT want to save the number
+ // of bytes sent (nRemaining here) into lastBytesSent: since we
+ // are resending what we've already sent before, offset is nonzero
+ // in general (only way it could be zero is if it already equals
+ // nRemaining), which would then screw up the offset calculation
+ // _next_ time around. IOW, getRemaining() is in terms of the
+ // original, zero-offset bufferload, so lastBytesSent must be as
+ // well. Cheesy ASCII art:
+ //
+ // <------------ m, lastBytesSent ----------->
+ // +===============================================+
+ // buffer: |1111111111|22222222222222222|333333333333| |
+ // +===============================================+
+ // #1: <-- off -->|<-------- nRemaining --------->
+ // #2: <----------- off ----------->|<-- nRem. -->
+ // #3: (final substream: nRemaining == 0; eof = true)
+ //
+ // If lastBytesSent is anything other than m, as shown, then "off"
+ // will be calculated incorrectly.
+ }
+ } else if (decompressor.needsInput()) {
+ int m = getCompressedData();
+ if (m == -1) {
+ throw new EOFException("Unexpected end of input stream");
+ }
+ decompressor.setInput(buffer, 0, m);
+ lastBytesSent = m;
}
}
-
+
return n;
}
-
- protected void getCompressedData() throws IOException {
+
+ protected int getCompressedData() throws IOException {
checkStream();
- int n = in.read(buffer, 0, buffer.length);
- if (n == -1) {
- throw new EOFException("Unexpected end of input stream");
- }
-
- decompressor.setInput(buffer, 0, n);
+ // note that the _caller_ is now required to call setInput() or throw
+ return in.read(buffer, 0, buffer.length);
}
-
+
protected void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream closed");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/GzipCodec.java Fri Mar 4 04:25:16 2011
@@ -86,56 +86,7 @@ public class GzipCodec extends DefaultCo
((ResetableGZIPOutputStream) out).resetState();
}
}
-
- protected static class GzipInputStream extends DecompressorStream {
-
- private static class ResetableGZIPInputStream extends GZIPInputStream {
- public ResetableGZIPInputStream(InputStream in) throws IOException {
- super(in);
- }
-
- public void resetState() throws IOException {
- inf.reset();
- }
- }
-
- public GzipInputStream(InputStream in) throws IOException {
- super(new ResetableGZIPInputStream(in));
- }
-
- /**
- * Allow subclasses to directly set the inflater stream.
- */
- protected GzipInputStream(DecompressorStream in) {
- super(in);
- }
-
- public int available() throws IOException {
- return in.available();
- }
-
- public void close() throws IOException {
- in.close();
- }
-
- public int read() throws IOException {
- return in.read();
- }
-
- public int read(byte[] data, int offset, int len) throws IOException {
- return in.read(data, offset, len);
- }
-
- public long skip(long offset) throws IOException {
- return in.skip(offset);
- }
-
- public void resetState() throws IOException {
- ((ResetableGZIPInputStream) in).resetState();
- }
- }
-
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return (ZlibFactory.isNativeZlibLoaded(conf)) ?
@@ -151,8 +102,7 @@ public class GzipCodec extends DefaultCo
new CompressorStream(out, compressor,
conf.getInt("io.file.buffer.size",
4*1024)) :
- createOutputStream(out);
-
+ createOutputStream(out);
}
public Compressor createCompressor() {
@@ -164,38 +114,34 @@ public class GzipCodec extends DefaultCo
public Class<? extends Compressor> getCompressorType() {
return ZlibFactory.isNativeZlibLoaded(conf)
? GzipZlibCompressor.class
- : BuiltInZlibDeflater.class;
+ : null;
}
- public CompressionInputStream createInputStream(InputStream in)
+ public CompressionInputStream createInputStream(InputStream in)
throws IOException {
- return (ZlibFactory.isNativeZlibLoaded(conf)) ?
- new DecompressorStream(in, createDecompressor(),
- conf.getInt("io.file.buffer.size",
- 4*1024)) :
- new GzipInputStream(in);
+ return createInputStream(in, null);
}
- public CompressionInputStream createInputStream(InputStream in,
- Decompressor decompressor)
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
throws IOException {
- return (decompressor != null) ?
- new DecompressorStream(in, decompressor,
- conf.getInt("io.file.buffer.size",
- 4*1024)) :
- createInputStream(in);
+ if (decompressor == null) {
+ decompressor = createDecompressor(); // always succeeds (or throws)
+ }
+ return new DecompressorStream(in, decompressor,
+ conf.getInt("io.file.buffer.size", 4*1024));
}
public Decompressor createDecompressor() {
return (ZlibFactory.isNativeZlibLoaded(conf))
? new GzipZlibDecompressor()
- : null;
+ : new BuiltInGzipDecompressor();
}
public Class<? extends Decompressor> getDecompressorType() {
return ZlibFactory.isNativeZlibLoaded(conf)
? GzipZlibDecompressor.class
- : BuiltInZlibInflater.class;
+ : BuiltInGzipDecompressor.class;
}
public String getDefaultExtension() {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java Fri Mar 4 04:25:16 2011
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.hadoop.io.compress.bzip2;
import java.io.IOException;
@@ -35,6 +53,11 @@ public class BZip2DummyDecompressor impl
}
@Override
+ public int getRemaining() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void reset() {
// do nothing
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java?rev=1077533&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java Fri Mar 4 04:25:16 2011
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A {@link Decompressor} based on the popular gzip compressed file format.
+ * http://www.gzip.org/
+ *
+ */
+public class BuiltInGzipDecompressor implements Decompressor {
+ private static final int GZIP_MAGIC_ID = 0x8b1f; // if read as LE short int
+ private static final int GZIP_DEFLATE_METHOD = 8;
+ private static final int GZIP_FLAGBIT_HEADER_CRC = 0x02;
+ private static final int GZIP_FLAGBIT_EXTRA_FIELD = 0x04;
+ private static final int GZIP_FLAGBIT_FILENAME = 0x08;
+ private static final int GZIP_FLAGBIT_COMMENT = 0x10;
+ private static final int GZIP_FLAGBITS_RESERVED = 0xe0;
+
+ // 'true' (nowrap) => Inflater will handle raw deflate stream only
+ private Inflater inflater = new Inflater(true);
+
+ private byte[] userBuf = null;
+ private int userBufOff = 0;
+ private int userBufLen = 0;
+
+ private byte[] localBuf = new byte[256];
+ private int localBufOff = 0;
+
+ private int headerBytesRead = 0;
+ private int trailerBytesRead = 0;
+ private int numExtraFieldBytesRemaining = -1;
+ private CRC32 crc = new CRC32();
+ private boolean hasExtraField = false;
+ private boolean hasFilename = false;
+ private boolean hasComment = false;
+ private boolean hasHeaderCRC = false;
+
+ private GzipStateLabel state;
+
+ /**
+ * The current state of the gzip decoder, external to the Inflater context.
+ * (Technically, the private variables localBuf through hasHeaderCRC are
+ * also part of the state, so this enum is merely the label for it.)
+ */
+ private static enum GzipStateLabel {
+ /**
+ * Immediately prior to or (strictly) within the 10-byte basic gzip header.
+ */
+ HEADER_BASIC,
+ /**
+ * Immediately prior to or within the optional "extra field."
+ */
+ HEADER_EXTRA_FIELD,
+ /**
+ * Immediately prior to or within the optional filename field.
+ */
+ HEADER_FILENAME,
+ /**
+ * Immediately prior to or within the optional comment field.
+ */
+ HEADER_COMMENT,
+ /**
+ * Immediately prior to or within the optional 2-byte header CRC value.
+ */
+ HEADER_CRC,
+ /**
+ * Immediately prior to or within the main compressed (deflate) data stream.
+ */
+ DEFLATE_STREAM,
+ /**
+ * Immediately prior to or (strictly) within the 4-byte uncompressed CRC.
+ */
+ TRAILER_CRC,
+ /**
+ * Immediately prior to or (strictly) within the 4-byte uncompressed size.
+ */
+ TRAILER_SIZE,
+ /**
+ * Immediately after the trailer (and potentially prior to the next gzip
+ * member/substream header), without reset() having been called.
+ */
+ FINISHED;
+ }
+
+ /**
+ * Creates a new (pure Java) gzip decompressor.
+ */
+ public BuiltInGzipDecompressor() {
+ state = GzipStateLabel.HEADER_BASIC;
+ crc.reset();
+ // FIXME? Inflater docs say: 'it is also necessary to provide an extra
+ // "dummy" byte as input. This is required by the ZLIB native
+ // library in order to support certain optimizations.' However,
+ // this does not appear to be true, and in any case, it's not
+ // entirely clear where the byte should go or what its value
+ // should be. Perhaps it suffices to have some deflated bytes
+ // in the first buffer load? (But how else would one do it?)
+ }
+
+ /** {@inheritDoc} */
+ public synchronized boolean needsInput() {
+ if (state == GzipStateLabel.DEFLATE_STREAM) { // most common case
+ return inflater.needsInput();
+ }
+ // see userBufLen comment at top of decompress(); currently no need to
+ // verify userBufLen <= 0
+ return (state != GzipStateLabel.FINISHED);
+ }
+
+ /** {@inheritDoc} */
+ /*
+ * In our case, the input data includes both gzip header/trailer bytes (which
+ * we handle in executeState()) and deflate-stream bytes (which we hand off
+ * to Inflater).
+ *
+ * NOTE: This code assumes the data passed in via b[] remains unmodified
+ * until _we_ signal that it's safe to modify it (via needsInput()).
+ * The alternative would require an additional buffer-copy even for
+ * the bulk deflate stream, which is a performance hit we don't want
+ * to absorb. (Decompressor now documents this requirement.)
+ */
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ userBuf = b;
+ userBufOff = off;
+ userBufLen = len; // note: might be zero
+ }
+
+ /**
+ * Decompress the data (gzip header, deflate stream, gzip trailer) in the
+ * provided buffer.
+ *
+ * @return the number of decompressed bytes placed into b
+ */
+ /* From the caller's perspective, this is where the state machine lives.
+ * The code is written such that we never return from decompress() with
+ * data remaining in userBuf unless we're in FINISHED state and there was
+ * data beyond the current gzip member (e.g., we're within a concatenated
+ * gzip stream). If this ever changes, {@link #needsInput()} will also
+ * need to be modified (i.e., uncomment the userBufLen condition).
+ *
+ * The actual deflate-stream processing (decompression) is handled by
+ * Java's Inflater class. Unlike the gzip header/trailer code (execute*
+ * methods below), the deflate stream is never copied; Inflater operates
+ * directly on the user's buffer.
+ */
+ public synchronized int decompress(byte[] b, int off, int len)
+ throws IOException {
+ int numAvailBytes = 0;
+
+ if (state != GzipStateLabel.DEFLATE_STREAM) {
+ executeHeaderState();
+
+ if (userBufLen <= 0) {
+ return numAvailBytes;
+ }
+ }
+
+ // "executeDeflateStreamState()"
+ if (state == GzipStateLabel.DEFLATE_STREAM) {
+ // hand off user data (or what's left of it) to Inflater--but note that
+ // Inflater may not have consumed all of previous bufferload (e.g., if
+ // data highly compressed or output buffer very small), in which case
+ // userBufLen will be zero
+ if (userBufLen > 0) {
+ inflater.setInput(userBuf, userBufOff, userBufLen);
+ userBufOff += userBufLen;
+ userBufLen = 0;
+ }
+
+ // now decompress it into b[]
+ try {
+ numAvailBytes = inflater.inflate(b, off, len);
+ } catch (DataFormatException dfe) {
+ throw new IOException(dfe.getMessage());
+ }
+ crc.update(b, off, numAvailBytes); // CRC-32 is on _uncompressed_ data
+ if (inflater.finished()) {
+ state = GzipStateLabel.TRAILER_CRC;
+ int bytesRemaining = inflater.getRemaining();
+ assert (bytesRemaining >= 0) :
+ "logic error: Inflater finished; byte-count is inconsistent";
+ // could save a copy of userBufLen at call to inflater.setInput() and
+ // verify that bytesRemaining <= origUserBufLen, but would have to
+ // be a (class) member variable...seems excessive for a sanity check
+ userBufOff -= bytesRemaining;
+ userBufLen = bytesRemaining; // or "+=", but guaranteed 0 coming in
+ } else {
+ return numAvailBytes; // minor optimization
+ }
+ }
+
+ executeTrailerState();
+
+ return numAvailBytes;
+ }
+
+ /**
+ * Parse the gzip header (assuming we're in the appropriate state).
+ * In order to deal with degenerate cases (e.g., user buffer is one byte
+ * long), we copy (some) header bytes to another buffer. (Filename,
+ * comment, and extra-field bytes are simply skipped.)</p>
+ *
+ * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec. Note that
+ * no version of gzip to date (at least through 1.4.0, 2010-01-20) supports
+ * the FHCRC header-CRC16 flagbit; instead, the implementation treats it
+ * as a multi-file continuation flag (which it also doesn't support). :-(
+ * Sun's JDK v6 (1.6) supports the header CRC, however, and so do we.
+ */
+ private void executeHeaderState() throws IOException {
+
+ // this can happen because DecompressorStream's decompress() is written
+ // to call decompress() first, setInput() second:
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // "basic"/required header: somewhere in first 10 bytes
+ if (state == GzipStateLabel.HEADER_BASIC) {
+ int n = Math.min(userBufLen, 10-localBufOff); // (or 10-headerBytesRead)
+ checkAndCopyBytesToLocal(n); // modifies userBufLen, etc.
+ if (localBufOff >= 10) { // should be strictly ==
+ processBasicHeader(); // sig, compression method, flagbits
+ localBufOff = 0; // no further need for basic header
+ state = GzipStateLabel.HEADER_EXTRA_FIELD;
+ }
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // optional header stuff (extra field, filename, comment, header CRC)
+
+ if (state == GzipStateLabel.HEADER_EXTRA_FIELD) {
+ if (hasExtraField) {
+ // 2 substates: waiting for 2 bytes => get numExtraFieldBytesRemaining,
+ // or already have 2 bytes & waiting to finish skipping specified length
+ if (numExtraFieldBytesRemaining < 0) {
+ int n = Math.min(userBufLen, 2-localBufOff);
+ checkAndCopyBytesToLocal(n);
+ if (localBufOff >= 2) {
+ numExtraFieldBytesRemaining = readUShortLE(localBuf, 0);
+ localBufOff = 0;
+ }
+ }
+ if (numExtraFieldBytesRemaining > 0 && userBufLen > 0) {
+ int n = Math.min(userBufLen, numExtraFieldBytesRemaining);
+ checkAndSkipBytes(n); // modifies userBufLen, etc.
+ numExtraFieldBytesRemaining -= n;
+ }
+ if (numExtraFieldBytesRemaining == 0) {
+ state = GzipStateLabel.HEADER_FILENAME;
+ }
+ } else {
+ state = GzipStateLabel.HEADER_FILENAME;
+ }
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ if (state == GzipStateLabel.HEADER_FILENAME) {
+ if (hasFilename) {
+ boolean doneWithFilename = checkAndSkipBytesUntilNull();
+ if (!doneWithFilename) {
+ return; // exit early: used up entire buffer without hitting NULL
+ }
+ }
+ state = GzipStateLabel.HEADER_COMMENT;
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ if (state == GzipStateLabel.HEADER_COMMENT) {
+ if (hasComment) {
+ boolean doneWithComment = checkAndSkipBytesUntilNull();
+ if (!doneWithComment) {
+ return; // exit early: used up entire buffer
+ }
+ }
+ state = GzipStateLabel.HEADER_CRC;
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ if (state == GzipStateLabel.HEADER_CRC) {
+ if (hasHeaderCRC) {
+ assert (localBufOff < 2);
+ int n = Math.min(userBufLen, 2-localBufOff);
+ copyBytesToLocal(n);
+ if (localBufOff >= 2) {
+ long headerCRC = readUShortLE(localBuf, 0);
+ if (headerCRC != (crc.getValue() & 0xffff)) {
+ throw new IOException("gzip header CRC failure");
+ }
+ localBufOff = 0;
+ crc.reset();
+ state = GzipStateLabel.DEFLATE_STREAM;
+ }
+ } else {
+ crc.reset(); // will reuse for CRC-32 of uncompressed data
+ state = GzipStateLabel.DEFLATE_STREAM; // switching to Inflater now
+ }
+ }
+ }
+
+ /**
+ * Parse the gzip trailer (assuming we're in the appropriate state).
+ * In order to deal with degenerate cases (e.g., user buffer is one byte
+ * long), we copy trailer bytes (all 8 of 'em) to a local buffer.</p>
+ *
+ * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec.
+ */
+ private void executeTrailerState() throws IOException {
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // verify that the CRC-32 of the decompressed stream matches the value
+ // stored in the gzip trailer
+ if (state == GzipStateLabel.TRAILER_CRC) {
+ // localBuf was empty before we handed off to Inflater, so we handle this
+ // exactly like header fields
+ assert (localBufOff < 4); // initially 0, but may need multiple calls
+ int n = Math.min(userBufLen, 4-localBufOff);
+ copyBytesToLocal(n);
+ if (localBufOff >= 4) {
+ long streamCRC = readUIntLE(localBuf, 0);
+ if (streamCRC != crc.getValue()) {
+ throw new IOException("gzip stream CRC failure");
+ }
+ localBufOff = 0;
+ crc.reset();
+ state = GzipStateLabel.TRAILER_SIZE;
+ }
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // verify that the mod-2^32 decompressed stream size matches the value
+ // stored in the gzip trailer
+ if (state == GzipStateLabel.TRAILER_SIZE) {
+ assert (localBufOff < 4); // initially 0, but may need multiple calls
+ int n = Math.min(userBufLen, 4-localBufOff);
+ copyBytesToLocal(n); // modifies userBufLen, etc.
+ if (localBufOff >= 4) { // should be strictly ==
+ long inputSize = readUIntLE(localBuf, 0);
+ if (inputSize != (inflater.getBytesWritten() & 0xffffffff)) {
+ throw new IOException(
+ "stored gzip size doesn't match decompressed size");
+ }
+ localBufOff = 0;
+ state = GzipStateLabel.FINISHED;
+ }
+ }
+
+ if (state == GzipStateLabel.FINISHED) {
+ return;
+ }
+ }
+
+ /**
+ * Returns the total number of compressed bytes input so far, including
+ * gzip header/trailer bytes.</p>
+ *
+ * @return the total (non-negative) number of compressed bytes read so far
+ */
+ public synchronized long getBytesRead() {
+ return headerBytesRead + inflater.getBytesRead() + trailerBytesRead;
+ }
+
+ /**
+ * Returns the number of bytes remaining in the input buffer; normally
+ * called when finished() is true to determine amount of post-gzip-stream
+ * data. Note that, other than the finished state with concatenated data
+ * after the end of the current gzip stream, this will never return a
+ * non-zero value unless called after {@link #setInput(byte[] b, int off,
+ * int len)} and before {@link #decompress(byte[] b, int off, int len)}.
+ * (That is, after {@link #decompress(byte[] b, int off, int len)} it
+ * always returns zero, except in finished state with concatenated data.)</p>
+ *
+ * @return the total (non-negative) number of unprocessed bytes in input
+ */
+ public synchronized int getRemaining() {
+ return userBufLen;
+ }
+
+ /** {@inheritDoc} */
+ public synchronized boolean needsDictionary() {
+ return inflater.needsDictionary();
+ }
+
+ /** {@inheritDoc} */
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ inflater.setDictionary(b, off, len);
+ }
+
+ /**
+ * Returns true if the end of the gzip substream (single "member") has been
+ * reached.</p>
+ */
+ public synchronized boolean finished() {
+ return (state == GzipStateLabel.FINISHED);
+ }
+
+ /**
+ * Resets everything, including the input buffer, regardless of whether the
+ * current gzip substream is finished.</p>
+ */
+ public synchronized void reset() {
+ // could optionally emit INFO message if state != GzipStateLabel.FINISHED
+ inflater.reset();
+ state = GzipStateLabel.HEADER_BASIC;
+ crc.reset();
+ userBufOff = userBufLen = 0;
+ localBufOff = 0;
+ headerBytesRead = 0;
+ trailerBytesRead = 0;
+ numExtraFieldBytesRemaining = -1;
+ hasExtraField = false;
+ hasFilename = false;
+ hasComment = false;
+ hasHeaderCRC = false;
+ }
+
+ /** {@inheritDoc} */
+ public synchronized void end() {
+ inflater.end();
+ }
+
+ /**
+ * Check ID bytes (throw if necessary), compression method (throw if not 8),
+ * and flag bits (set hasExtraField, hasFilename, hasComment, hasHeaderCRC).
+ * Ignore MTIME, XFL, OS. Caller must ensure we have at least 10 bytes (at
+ * the start of localBuf).</p>
+ */
+ /*
+ * Flag bits (remainder are reserved and must be zero):
+ * bit 0 FTEXT
+ * bit 1 FHCRC (never implemented in gzip, at least through version
+ * 1.4.0; instead interpreted as "continuation of multi-
+ * part gzip file," which is unsupported through 1.4.0)
+ * bit 2 FEXTRA
+ * bit 3 FNAME
+ * bit 4 FCOMMENT
+ * [bit 5 encrypted]
+ */
+ private void processBasicHeader() throws IOException {
+ if (readUShortLE(localBuf, 0) != GZIP_MAGIC_ID) {
+ throw new IOException("not a gzip file");
+ }
+ if (readUByte(localBuf, 2) != GZIP_DEFLATE_METHOD) {
+ throw new IOException("gzip data not compressed with deflate method");
+ }
+ int flg = readUByte(localBuf, 3);
+ if ((flg & GZIP_FLAGBITS_RESERVED) != 0) {
+ throw new IOException("unknown gzip format (reserved flagbits set)");
+ }
+ hasExtraField = ((flg & GZIP_FLAGBIT_EXTRA_FIELD) != 0);
+ hasFilename = ((flg & GZIP_FLAGBIT_FILENAME) != 0);
+ hasComment = ((flg & GZIP_FLAGBIT_COMMENT) != 0);
+ hasHeaderCRC = ((flg & GZIP_FLAGBIT_HEADER_CRC) != 0);
+ }
+
+ private void checkAndCopyBytesToLocal(int len) {
+ System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+ localBufOff += len;
+ // alternatively, could call checkAndSkipBytes(len) for rest...
+ crc.update(userBuf, userBufOff, len);
+ userBufOff += len;
+ userBufLen -= len;
+ headerBytesRead += len;
+ }
+
+ private void checkAndSkipBytes(int len) {
+ crc.update(userBuf, userBufOff, len);
+ userBufOff += len;
+ userBufLen -= len;
+ headerBytesRead += len;
+ }
+
+ // returns true if saw NULL, false if ran out of buffer first; called _only_
+ // during gzip-header processing (not trailer)
+ // (caller can check before/after state of userBufLen to compute num bytes)
+ private boolean checkAndSkipBytesUntilNull() {
+ boolean hitNull = false;
+ if (userBufLen > 0) {
+ do {
+ hitNull = (userBuf[userBufOff] == 0);
+ crc.update(userBuf[userBufOff]);
+ ++userBufOff;
+ --userBufLen;
+ ++headerBytesRead;
+ } while (userBufLen > 0 && !hitNull);
+ }
+ return hitNull;
+ }
+
+ // this one doesn't update the CRC and does support trailer processing but
+ // otherwise is same as its "checkAnd" sibling
+ private void copyBytesToLocal(int len) {
+ System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+ localBufOff += len;
+ userBufOff += len;
+ userBufLen -= len;
+ if (state == GzipStateLabel.TRAILER_CRC ||
+ state == GzipStateLabel.TRAILER_SIZE) {
+ trailerBytesRead += len;
+ } else {
+ headerBytesRead += len;
+ }
+ }
+
+ private int readUByte(byte[] b, int off) {
+ return ((int)b[off] & 0xff);
+ }
+
+ // caller is responsible for not overrunning buffer
+ private int readUShortLE(byte[] b, int off) {
+ return ((((b[off+1] & 0xff) << 8) |
+ ((b[off] & 0xff) )) & 0xffff);
+ }
+
+ // caller is responsible for not overrunning buffer
+ private long readUIntLE(byte[] b, int off) {
+ return ((((long)(b[off+3] & 0xff) << 24) |
+ ((long)(b[off+2] & 0xff) << 16) |
+ ((long)(b[off+1] & 0xff) << 8) |
+ ((long)(b[off] & 0xff) )) & 0xffffffff);
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Fri Mar 4 04:25:16 2011
@@ -166,7 +166,7 @@ public class ZlibDecompressor implements
}
public synchronized boolean needsInput() {
- // Consume remanining compressed data?
+ // Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) {
return false;
}
@@ -189,7 +189,7 @@ public class ZlibDecompressor implements
}
public synchronized boolean finished() {
- // Check if 'zlib' says its 'finished' and
+ // Check if 'zlib' says it's 'finished' and
// all compressed data has been consumed
return (finished && uncompressedDirectBuf.remaining() == 0);
}
@@ -221,7 +221,7 @@ public class ZlibDecompressor implements
n = inflateBytesDirect();
uncompressedDirectBuf.limit(n);
- // Get atmost 'len' bytes
+ // Get at most 'len' bytes
n = Math.min(n, len);
((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
@@ -229,9 +229,9 @@ public class ZlibDecompressor implements
}
/**
- * Returns the total number of compressed bytes output so far.
+ * Returns the total number of uncompressed bytes output so far.
*
- * @return the total (non-negative) number of compressed bytes output so far
+ * @return the total (non-negative) number of uncompressed bytes output so far
*/
public synchronized long getBytesWritten() {
checkStream();
@@ -239,15 +239,30 @@ public class ZlibDecompressor implements
}
/**
- * Returns the total number of uncompressed bytes input so far.</p>
+ * Returns the total number of compressed bytes input so far.</p>
*
- * @return the total (non-negative) number of uncompressed bytes input so far
+ * @return the total (non-negative) number of compressed bytes input so far
*/
public synchronized long getBytesRead() {
checkStream();
return getBytesRead(stream);
}
+ /**
+ * Returns the number of bytes remaining in the input buffers; normally
+ * called when finished() is true to determine amount of post-gzip-stream
+ * data.</p>
+ *
+ * @return the total (non-negative) number of unprocessed bytes in input
+ */
+ public synchronized int getRemaining() {
+ checkStream();
+ return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf
+ }
+
+ /**
+ * Resets everything including the input buffers (user and direct).</p>
+ */
public synchronized void reset() {
checkStream();
reset(stream);
@@ -282,6 +297,7 @@ public class ZlibDecompressor implements
private native int inflateBytesDirect();
private native static long getBytesRead(long strm);
private native static long getBytesWritten(long strm);
+ private native static int getRemaining(long strm);
private native static void reset(long strm);
private native static void end(long strm);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c Fri Mar 4 04:25:16 2011
@@ -291,6 +291,13 @@ Java_org_apache_hadoop_io_compress_zlib_
return (ZSTREAM(stream))->total_out;
}
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_getRemaining(
+ JNIEnv *env, jclass cls, jlong stream
+ ) {
+ return (ZSTREAM(stream))->avail_in;
+}
+
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_reset(
JNIEnv *env, jclass cls, jlong stream
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=1077533&r1=1077532&r2=1077533&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/io/compress/TestCodec.java Fri Mar 4 04:25:16 2011
@@ -23,8 +23,10 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.Random;
+import java.util.zip.GZIPOutputStream;
import junit.framework.TestCase;
@@ -36,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RandomDatum;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -43,14 +46,14 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
public class TestCodec extends TestCase {
- private static final Log LOG=
- LogFactory.getLog(TestCodec.class);
+ private static final Log LOG= LogFactory.getLog(TestCodec.class);
private Configuration conf = new Configuration();
private int count = 10000;
@@ -298,9 +301,92 @@ public class TestCodec extends TestCase
System.err.println("Caught: " + e);
e.printStackTrace();
}
-
+
}
+ public void testGzipCompatibility() throws IOException {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("seed: " + seed);
+
+ DataOutputBuffer dflbuf = new DataOutputBuffer();
+ GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+ byte[] b = new byte[r.nextInt(128 * 1024 + 1)];
+ r.nextBytes(b);
+ gzout.write(b);
+ gzout.close();
+
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
+
+ Configuration conf = new Configuration();
+ conf.setBoolean("hadoop.native.lib", false);
+ CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+ Decompressor decom = codec.createDecompressor();
+ assertNotNull(decom);
+ assertEquals(BuiltInGzipDecompressor.class, decom.getClass());
+ InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+ dflbuf.reset();
+ IOUtils.copyBytes(gzin, dflbuf, 4096);
+ final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+ assertTrue(java.util.Arrays.equals(b, dflchk));
+ }
+
+ void GzipConcatTest(Configuration conf,
+ Class<? extends Decompressor> decomClass) throws IOException {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info(decomClass + " seed: " + seed);
+
+ final int CONCAT = r.nextInt(4) + 3;
+ final int BUFLEN = 128 * 1024;
+ DataOutputBuffer dflbuf = new DataOutputBuffer();
+ DataOutputBuffer chkbuf = new DataOutputBuffer();
+ byte[] b = new byte[BUFLEN];
+ for (int i = 0; i < CONCAT; ++i) {
+ GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+ r.nextBytes(b);
+ int len = r.nextInt(BUFLEN);
+ int off = r.nextInt(BUFLEN - len);
+ chkbuf.write(b, off, len);
+ gzout.write(b, off, len);
+ gzout.close();
+ }
+ final byte[] chk = Arrays.copyOf(chkbuf.getData(), chkbuf.getLength());
+
+ CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+ Decompressor decom = codec.createDecompressor();
+ assertNotNull(decom);
+ assertEquals(decomClass, decom.getClass());
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
+ InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+ dflbuf.reset();
+ IOUtils.copyBytes(gzin, dflbuf, 4096);
+ final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+ assertTrue(java.util.Arrays.equals(chk, dflchk));
+ }
+
+ public void testBuiltInGzipConcat() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean("hadoop.native.lib", false);
+ GzipConcatTest(conf, BuiltInGzipDecompressor.class);
+ }
+
+ public void testNativeGzipConcat() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean("hadoop.native.lib", true);
+ if (!ZlibFactory.isNativeZlibLoaded(conf)) {
+ LOG.warn("skipped: native libs not loaded");
+ return;
+ }
+ GzipConcatTest(conf, GzipCodec.GzipZlibDecompressor.class);
+ }
+
public TestCodec(String name) {
super(name);
}