You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2010/07/08 01:22:28 UTC
svn commit: r961532 - in /hadoop/common/trunk: ./
src/java/org/apache/hadoop/io/compress/
src/java/org/apache/hadoop/io/compress/bzip2/
src/java/org/apache/hadoop/io/compress/zlib/
src/native/src/org/apache/hadoop/io/compress/zlib/ src/test/core/org/ap...
Author: cdouglas
Date: Wed Jul 7 23:22:28 2010
New Revision: 961532
URL: http://svn.apache.org/viewvc?rev=961532&view=rev
Log:
HADOOP-6835. Add support for concatenated gzip input. Contributed by Greg Roelofs
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Wed Jul 7 23:22:28 2010
@@ -57,6 +57,9 @@ Trunk (unreleased changes)
HADOOP-6756. Documentation for common configuration keys.
(Erik Steffl via shv)
+ HADOOP-6835. Add support for concatenated gzip input. (Greg Roelofs via
+ cdouglas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/BlockDecompressorStream.java Wed Jul 7 23:22:28 2010
@@ -43,19 +43,19 @@ public class BlockDecompressorStream ext
* @param in input stream
* @param decompressor decompressor to use
* @param bufferSize size of buffer
- * @throws IOException
+ * @throws IOException
*/
public BlockDecompressorStream(InputStream in, Decompressor decompressor,
int bufferSize) throws IOException {
super(in, decompressor, bufferSize);
}
-
+
/**
* Create a {@link BlockDecompressorStream}.
*
* @param in input stream
* @param decompressor decompressor to use
- * @throws IOException
+ * @throws IOException
*/
public BlockDecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
super(in, decompressor);
@@ -76,7 +76,7 @@ public class BlockDecompressorStream ext
}
noUncompressedBytes = 0;
}
-
+
int n = 0;
while ((n = decompressor.decompress(b, off, len)) == 0) {
if (decompressor.finished() || decompressor.needsDictionary()) {
@@ -86,20 +86,22 @@ public class BlockDecompressorStream ext
}
}
if (decompressor.needsInput()) {
- getCompressedData();
+ int m = getCompressedData();
+ // Send the read data to the decompressor
+ decompressor.setInput(buffer, 0, m);
}
}
-
+
// Note the no. of decompressed bytes read from 'current' block
noUncompressedBytes += n;
return n;
}
- protected void getCompressedData() throws IOException {
+ protected int getCompressedData() throws IOException {
checkStream();
- // Get the size of the compressed chunk
+ // Get the size of the compressed chunk (always non-negative)
int len = rawReadInt();
// Read len bytes from underlying stream
@@ -110,13 +112,12 @@ public class BlockDecompressorStream ext
while (n < len) {
int count = in.read(buffer, off + n, len - n);
if (count < 0) {
- throw new EOFException();
+ throw new EOFException("Unexpected end of block in input stream");
}
n += count;
}
-
- // Send the read data to the decompressor
- decompressor.setInput(buffer, 0, len);
+
+ return len;
}
public void resetState() throws IOException {
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/Decompressor.java Wed Jul 7 23:22:28 2010
@@ -34,8 +34,13 @@ import org.apache.hadoop.classification.
public interface Decompressor {
/**
* Sets input data for decompression.
- * This should be called whenever #needsInput() returns
+ * This should be called if and only if {@link #needsInput()} returns
* <code>true</code> indicating that more input data is required.
+ * (Both native and non-native versions of various Decompressors require
+ * that the data passed in via <code>b[]</code> remain unmodified until
+ * the caller is explicitly notified--via {@link #needsInput()}--that the
+ * buffer may be safely modified. With this requirement, an extra
+ * buffer-copy can be avoided.)
*
* @param b Input data
* @param off Start offset
@@ -45,10 +50,12 @@ public interface Decompressor {
/**
* Returns true if the input data buffer is empty and
- * #setInput() should be called to provide more input.
+ * {@link #setInput(byte[], int, int)} should be called to
+ * provide more input.
*
* @return <code>true</code> if the input data buffer is empty and
- * #setInput() should be called in order to provide more input.
+ * {@link #setInput(byte[], int, int)} should be called in
+ * order to provide more input.
*/
public boolean needsInput();
@@ -69,9 +76,9 @@ public interface Decompressor {
public boolean needsDictionary();
/**
- * Returns true if the end of the compressed
+ * Returns true if the end of the decompressed
* data output stream has been reached.
- * @return <code>true</code> if the end of the compressed
+ * @return <code>true</code> if the end of the decompressed
* data output stream has been reached.
*/
public boolean finished();
@@ -79,8 +86,8 @@ public interface Decompressor {
/**
* Fills specified buffer with uncompressed data. Returns actual number
* of bytes of uncompressed data. A return value of 0 indicates that
- * #needsInput() should be called in order to determine if more input
- * data is required.
+ * {@link #needsInput()} should be called in order to determine if more
+ * input data is required.
*
* @param b Buffer for the compressed data
* @param off Start offset of the data
@@ -89,12 +96,20 @@ public interface Decompressor {
* @throws IOException
*/
public int decompress(byte[] b, int off, int len) throws IOException;
-
+
/**
- * Resets decompressor so that a new set of input data can be processed.
+ * Returns the number of bytes remaining in the compressed-data buffer;
+ * typically called after the decompressor has finished decompressing
+ * the current gzip stream (a.k.a. "member").
+ */
+ public int getRemaining();
+
+ /**
+ * Resets decompressor and input and output buffers so that a new set of
+ * input data can be processed.
*/
public void reset();
-
+
/**
* Closes the decompressor and discards any unprocessed input.
*/
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/DecompressorStream.java Wed Jul 7 23:22:28 2010
@@ -33,8 +33,11 @@ public class DecompressorStream extends
protected byte[] buffer;
protected boolean eof = false;
protected boolean closed = false;
-
- public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) throws IOException {
+ private int lastBytesSent = 0;
+
+ public DecompressorStream(InputStream in, Decompressor decompressor,
+ int bufferSize)
+ throws IOException {
super(in);
if (in == null || decompressor == null) {
@@ -47,7 +50,8 @@ public class DecompressorStream extends
buffer = new byte[bufferSize];
}
- public DecompressorStream(InputStream in, Decompressor decompressor) throws IOException {
+ public DecompressorStream(InputStream in, Decompressor decompressor)
+ throws IOException {
this(in, decompressor, 512);
}
@@ -55,7 +59,7 @@ public class DecompressorStream extends
* Allow derived classes to directly set the underlying stream.
*
* @param in Underlying input stream.
- * @throws IOException
+ * @throws IOException
*/
protected DecompressorStream(InputStream in) throws IOException {
super(in);
@@ -81,31 +85,78 @@ public class DecompressorStream extends
protected int decompress(byte[] b, int off, int len) throws IOException {
int n = 0;
-
+
while ((n = decompressor.decompress(b, off, len)) == 0) {
- if (decompressor.finished() || decompressor.needsDictionary()) {
+ if (decompressor.needsDictionary()) {
eof = true;
return -1;
}
- if (decompressor.needsInput()) {
- getCompressedData();
+
+ if (decompressor.finished()) {
+ // First see if there was any leftover buffered input from previous
+ // stream; if not, attempt to refill buffer. If refill -> EOF, we're
+ // all done; else reset, fix up input buffer, and get ready for next
+ // concatenated substream/"member".
+ int nRemaining = decompressor.getRemaining();
+ if (nRemaining == 0) {
+ int m = getCompressedData();
+ if (m == -1) {
+ // apparently the previous end-of-stream was also end-of-file:
+ // return success, as if we had never called getCompressedData()
+ eof = true;
+ return -1;
+ }
+ decompressor.reset();
+ decompressor.setInput(buffer, 0, m);
+ lastBytesSent = m;
+ } else {
+ // looks like it's a concatenated stream: reset low-level zlib (or
+ // other engine) and buffers, then "resend" remaining input data
+ decompressor.reset();
+ int leftoverOffset = lastBytesSent - nRemaining;
+ assert (leftoverOffset >= 0);
+ // this recopies userBuf -> direct buffer if using native libraries:
+ decompressor.setInput(buffer, leftoverOffset, nRemaining);
+ // NOTE: this is the one place we do NOT want to save the number
+ // of bytes sent (nRemaining here) into lastBytesSent: since we
+ // are resending what we've already sent before, offset is nonzero
+ // in general (only way it could be zero is if it already equals
+ // nRemaining), which would then screw up the offset calculation
+ // _next_ time around. IOW, getRemaining() is in terms of the
+ // original, zero-offset bufferload, so lastBytesSent must be as
+ // well. Cheesy ASCII art:
+ //
+ // <------------ m, lastBytesSent ----------->
+ // +===============================================+
+ // buffer: |1111111111|22222222222222222|333333333333| |
+ // +===============================================+
+ // #1: <-- off -->|<-------- nRemaining --------->
+ // #2: <----------- off ----------->|<-- nRem. -->
+ // #3: (final substream: nRemaining == 0; eof = true)
+ //
+ // If lastBytesSent is anything other than m, as shown, then "off"
+ // will be calculated incorrectly.
+ }
+ } else if (decompressor.needsInput()) {
+ int m = getCompressedData();
+ if (m == -1) {
+ throw new EOFException("Unexpected end of input stream");
+ }
+ decompressor.setInput(buffer, 0, m);
+ lastBytesSent = m;
}
}
-
+
return n;
}
-
- protected void getCompressedData() throws IOException {
+
+ protected int getCompressedData() throws IOException {
checkStream();
- int n = in.read(buffer, 0, buffer.length);
- if (n == -1) {
- throw new EOFException("Unexpected end of input stream");
- }
-
- decompressor.setInput(buffer, 0, n);
+ // note that the _caller_ is now required to call setInput() or throw
+ return in.read(buffer, 0, buffer.length);
}
-
+
protected void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream closed");
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/GzipCodec.java Wed Jul 7 23:22:28 2010
@@ -91,58 +91,7 @@ public class GzipCodec extends DefaultCo
((ResetableGZIPOutputStream) out).resetState();
}
}
-
- @InterfaceStability.Evolving
- protected static class GzipInputStream extends DecompressorStream {
-
- private static class ResetableGZIPInputStream extends GZIPInputStream {
- public ResetableGZIPInputStream(InputStream in) throws IOException {
- super(in);
- }
-
- public void resetState() throws IOException {
- inf.reset();
- }
- }
-
- public GzipInputStream(InputStream in) throws IOException {
- super(new ResetableGZIPInputStream(in));
- }
-
- /**
- * Allow subclasses to directly set the inflater stream.
- * @throws IOException
- */
- protected GzipInputStream(DecompressorStream in) throws IOException {
- super(in);
- }
-
- public int available() throws IOException {
- return in.available();
- }
-
- public void close() throws IOException {
- in.close();
- }
-
- public int read() throws IOException {
- return in.read();
- }
-
- public int read(byte[] data, int offset, int len) throws IOException {
- return in.read(data, offset, len);
- }
-
- public long skip(long offset) throws IOException {
- return in.skip(offset);
- }
-
- public void resetState() throws IOException {
- ((ResetableGZIPInputStream) in).resetState();
- }
- }
-
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return (ZlibFactory.isNativeZlibLoaded(conf)) ?
@@ -158,8 +107,7 @@ public class GzipCodec extends DefaultCo
new CompressorStream(out, compressor,
conf.getInt("io.file.buffer.size",
4*1024)) :
- createOutputStream(out);
-
+ createOutputStream(out);
}
public Compressor createCompressor() {
@@ -174,35 +122,31 @@ public class GzipCodec extends DefaultCo
: null;
}
- public CompressionInputStream createInputStream(InputStream in)
+ public CompressionInputStream createInputStream(InputStream in)
throws IOException {
- return (ZlibFactory.isNativeZlibLoaded(conf)) ?
- new DecompressorStream(in, createDecompressor(),
- conf.getInt("io.file.buffer.size",
- 4*1024)) :
- new GzipInputStream(in);
+ return createInputStream(in, null);
}
- public CompressionInputStream createInputStream(InputStream in,
- Decompressor decompressor)
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor)
throws IOException {
- return (decompressor != null) ?
- new DecompressorStream(in, decompressor,
- conf.getInt("io.file.buffer.size",
- 4*1024)) :
- createInputStream(in);
+ if (decompressor == null) {
+ decompressor = createDecompressor(); // always succeeds (or throws)
+ }
+ return new DecompressorStream(in, decompressor,
+ conf.getInt("io.file.buffer.size", 4*1024));
}
public Decompressor createDecompressor() {
return (ZlibFactory.isNativeZlibLoaded(conf))
? new GzipZlibDecompressor()
- : null;
+ : new BuiltInGzipDecompressor();
}
public Class<? extends Decompressor> getDecompressorType() {
return ZlibFactory.isNativeZlibLoaded(conf)
? GzipZlibDecompressor.class
- : null;
+ : BuiltInGzipDecompressor.class;
}
public String getDefaultExtension() {
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java Wed Jul 7 23:22:28 2010
@@ -53,6 +53,11 @@ public class BZip2DummyDecompressor impl
}
@Override
+ public int getRemaining() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void reset() {
// do nothing
}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java?rev=961532&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java Wed Jul 7 23:22:28 2010
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * A {@link Decompressor} based on the popular gzip compressed file format.
+ * http://www.gzip.org/
+ *
+ */
+public class BuiltInGzipDecompressor implements Decompressor {
+ private static final int GZIP_MAGIC_ID = 0x8b1f; // if read as LE short int
+ private static final int GZIP_DEFLATE_METHOD = 8;
+ private static final int GZIP_FLAGBIT_HEADER_CRC = 0x02;
+ private static final int GZIP_FLAGBIT_EXTRA_FIELD = 0x04;
+ private static final int GZIP_FLAGBIT_FILENAME = 0x08;
+ private static final int GZIP_FLAGBIT_COMMENT = 0x10;
+ private static final int GZIP_FLAGBITS_RESERVED = 0xe0;
+
+ // 'true' (nowrap) => Inflater will handle raw deflate stream only
+ private Inflater inflater = new Inflater(true);
+
+ private byte[] userBuf = null;
+ private int userBufOff = 0;
+ private int userBufLen = 0;
+
+ private byte[] localBuf = new byte[256];
+ private int localBufOff = 0;
+
+ private int headerBytesRead = 0;
+ private int trailerBytesRead = 0;
+ private int numExtraFieldBytesRemaining = -1;
+ private PureJavaCrc32 crc = new PureJavaCrc32();
+ private boolean hasExtraField = false;
+ private boolean hasFilename = false;
+ private boolean hasComment = false;
+ private boolean hasHeaderCRC = false;
+
+ private GzipStateLabel state;
+
+ /**
+ * The current state of the gzip decoder, external to the Inflater context.
+ * (Technically, the private variables localBuf through hasHeaderCRC are
+ * also part of the state, so this enum is merely the label for it.)
+ */
+ private static enum GzipStateLabel {
+ /**
+ * Immediately prior to or (strictly) within the 10-byte basic gzip header.
+ */
+ HEADER_BASIC,
+ /**
+ * Immediately prior to or within the optional "extra field."
+ */
+ HEADER_EXTRA_FIELD,
+ /**
+ * Immediately prior to or within the optional filename field.
+ */
+ HEADER_FILENAME,
+ /**
+ * Immediately prior to or within the optional comment field.
+ */
+ HEADER_COMMENT,
+ /**
+ * Immediately prior to or within the optional 2-byte header CRC value.
+ */
+ HEADER_CRC,
+ /**
+ * Immediately prior to or within the main compressed (deflate) data stream.
+ */
+ DEFLATE_STREAM,
+ /**
+ * Immediately prior to or (strictly) within the 4-byte uncompressed CRC.
+ */
+ TRAILER_CRC,
+ /**
+ * Immediately prior to or (strictly) within the 4-byte uncompressed size.
+ */
+ TRAILER_SIZE,
+ /**
+ * Immediately after the trailer (and potentially prior to the next gzip
+ * member/substream header), without reset() having been called.
+ */
+ FINISHED;
+ }
+
+ /**
+ * Creates a new (pure Java) gzip decompressor.
+ */
+ public BuiltInGzipDecompressor() {
+ state = GzipStateLabel.HEADER_BASIC;
+ crc.reset();
+ // FIXME? Inflater docs say: 'it is also necessary to provide an extra
+ // "dummy" byte as input. This is required by the ZLIB native
+ // library in order to support certain optimizations.' However,
+ // this does not appear to be true, and in any case, it's not
+ // entirely clear where the byte should go or what its value
+ // should be. Perhaps it suffices to have some deflated bytes
+ // in the first buffer load? (But how else would one do it?)
+ }
+
+ /** {@inheritDoc} */
+ public synchronized boolean needsInput() {
+ if (state == GzipStateLabel.DEFLATE_STREAM) { // most common case
+ return inflater.needsInput();
+ }
+ // see userBufLen comment at top of decompress(); currently no need to
+ // verify userBufLen <= 0
+ return (state != GzipStateLabel.FINISHED);
+ }
+
+ /** {@inheritDoc} */
+ /*
+ * In our case, the input data includes both gzip header/trailer bytes (which
+ * we handle in executeState()) and deflate-stream bytes (which we hand off
+ * to Inflater).
+ *
+ * NOTE: This code assumes the data passed in via b[] remains unmodified
+ * until _we_ signal that it's safe to modify it (via needsInput()).
+ * The alternative would require an additional buffer-copy even for
+ * the bulk deflate stream, which is a performance hit we don't want
+ * to absorb. (Decompressor now documents this requirement.)
+ */
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ userBuf = b;
+ userBufOff = off;
+ userBufLen = len; // note: might be zero
+ }
+
+ /**
+ * Decompress the data (gzip header, deflate stream, gzip trailer) in the
+ * provided buffer.
+ *
+ * @return the number of decompressed bytes placed into b
+ */
+ /* From the caller's perspective, this is where the state machine lives.
+ * The code is written such that we never return from decompress() with
+ * data remaining in userBuf unless we're in FINISHED state and there was
+ * data beyond the current gzip member (e.g., we're within a concatenated
+ * gzip stream). If this ever changes, {@link #needsInput()} will also
+ * need to be modified (i.e., uncomment the userBufLen condition).
+ *
+ * The actual deflate-stream processing (decompression) is handled by
+ * Java's Inflater class. Unlike the gzip header/trailer code (execute*
+ * methods below), the deflate stream is never copied; Inflater operates
+ * directly on the user's buffer.
+ */
+ public synchronized int decompress(byte[] b, int off, int len)
+ throws IOException {
+ int numAvailBytes = 0;
+
+ if (state != GzipStateLabel.DEFLATE_STREAM) {
+ executeHeaderState();
+
+ if (userBufLen <= 0) {
+ return numAvailBytes;
+ }
+ }
+
+ // "executeDeflateStreamState()"
+ if (state == GzipStateLabel.DEFLATE_STREAM) {
+ // hand off user data (or what's left of it) to Inflater--but note that
+ // Inflater may not have consumed all of previous bufferload (e.g., if
+ // data highly compressed or output buffer very small), in which case
+ // userBufLen will be zero
+ if (userBufLen > 0) {
+ inflater.setInput(userBuf, userBufOff, userBufLen);
+ userBufOff += userBufLen;
+ userBufLen = 0;
+ }
+
+ // now decompress it into b[]
+ try {
+ numAvailBytes = inflater.inflate(b, off, len);
+ } catch (DataFormatException dfe) {
+ throw new IOException(dfe.getMessage());
+ }
+ crc.update(b, off, numAvailBytes); // CRC-32 is on _uncompressed_ data
+ if (inflater.finished()) {
+ state = GzipStateLabel.TRAILER_CRC;
+ int bytesRemaining = inflater.getRemaining();
+ assert (bytesRemaining >= 0) :
+ "logic error: Inflater finished; byte-count is inconsistent";
+ // could save a copy of userBufLen at call to inflater.setInput() and
+ // verify that bytesRemaining <= origUserBufLen, but would have to
+ // be a (class) member variable...seems excessive for a sanity check
+ userBufOff -= bytesRemaining;
+ userBufLen = bytesRemaining; // or "+=", but guaranteed 0 coming in
+ } else {
+ return numAvailBytes; // minor optimization
+ }
+ }
+
+ executeTrailerState();
+
+ return numAvailBytes;
+ }
+
+ /**
+ * Parse the gzip header (assuming we're in the appropriate state).
+ * In order to deal with degenerate cases (e.g., user buffer is one byte
+ * long), we copy (some) header bytes to another buffer. (Filename,
+ * comment, and extra-field bytes are simply skipped.)</p>
+ *
+ * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec. Note that
+ * no version of gzip to date (at least through 1.4.0, 2010-01-20) supports
+ * the FHCRC header-CRC16 flagbit; instead, the implementation treats it
+ * as a multi-file continuation flag (which it also doesn't support). :-(
+ * Sun's JDK v6 (1.6) supports the header CRC, however, and so do we.
+ */
+ private void executeHeaderState() throws IOException {
+
+ // this can happen because DecompressorStream's decompress() is written
+ // to call decompress() first, setInput() second:
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // "basic"/required header: somewhere in first 10 bytes
+ if (state == GzipStateLabel.HEADER_BASIC) {
+ int n = Math.min(userBufLen, 10-localBufOff); // (or 10-headerBytesRead)
+ checkAndCopyBytesToLocal(n); // modifies userBufLen, etc.
+ if (localBufOff >= 10) { // should be strictly ==
+ processBasicHeader(); // sig, compression method, flagbits
+ localBufOff = 0; // no further need for basic header
+ state = GzipStateLabel.HEADER_EXTRA_FIELD;
+ }
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // optional header stuff (extra field, filename, comment, header CRC)
+
+ if (state == GzipStateLabel.HEADER_EXTRA_FIELD) {
+ if (hasExtraField) {
+ // 2 substates: waiting for 2 bytes => get numExtraFieldBytesRemaining,
+ // or already have 2 bytes & waiting to finish skipping specified length
+ if (numExtraFieldBytesRemaining < 0) {
+ int n = Math.min(userBufLen, 2-localBufOff);
+ checkAndCopyBytesToLocal(n);
+ if (localBufOff >= 2) {
+ numExtraFieldBytesRemaining = readUShortLE(localBuf, 0);
+ localBufOff = 0;
+ }
+ }
+ if (numExtraFieldBytesRemaining > 0 && userBufLen > 0) {
+ int n = Math.min(userBufLen, numExtraFieldBytesRemaining);
+ checkAndSkipBytes(n); // modifies userBufLen, etc.
+ numExtraFieldBytesRemaining -= n;
+ }
+ if (numExtraFieldBytesRemaining == 0) {
+ state = GzipStateLabel.HEADER_FILENAME;
+ }
+ } else {
+ state = GzipStateLabel.HEADER_FILENAME;
+ }
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ if (state == GzipStateLabel.HEADER_FILENAME) {
+ if (hasFilename) {
+ boolean doneWithFilename = checkAndSkipBytesUntilNull();
+ if (!doneWithFilename) {
+ return; // exit early: used up entire buffer without hitting NULL
+ }
+ }
+ state = GzipStateLabel.HEADER_COMMENT;
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ if (state == GzipStateLabel.HEADER_COMMENT) {
+ if (hasComment) {
+ boolean doneWithComment = checkAndSkipBytesUntilNull();
+ if (!doneWithComment) {
+ return; // exit early: used up entire buffer
+ }
+ }
+ state = GzipStateLabel.HEADER_CRC;
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ if (state == GzipStateLabel.HEADER_CRC) {
+ if (hasHeaderCRC) {
+ assert (localBufOff < 2);
+ int n = Math.min(userBufLen, 2-localBufOff);
+ copyBytesToLocal(n);
+ if (localBufOff >= 2) {
+ long headerCRC = readUShortLE(localBuf, 0);
+ if (headerCRC != (crc.getValue() & 0xffff)) {
+ throw new IOException("gzip header CRC failure");
+ }
+ localBufOff = 0;
+ crc.reset();
+ state = GzipStateLabel.DEFLATE_STREAM;
+ }
+ } else {
+ crc.reset(); // will reuse for CRC-32 of uncompressed data
+ state = GzipStateLabel.DEFLATE_STREAM; // switching to Inflater now
+ }
+ }
+ }
+
+ /**
+ * Parse the gzip trailer (assuming we're in the appropriate state).
+ * In order to deal with degenerate cases (e.g., user buffer is one byte
+ * long), we copy trailer bytes (all 8 of 'em) to a local buffer.</p>
+ *
+ * See http://www.ietf.org/rfc/rfc1952.txt for the gzip spec.
+ */
+ private void executeTrailerState() throws IOException {
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // verify that the CRC-32 of the decompressed stream matches the value
+ // stored in the gzip trailer
+ if (state == GzipStateLabel.TRAILER_CRC) {
+ // localBuf was empty before we handed off to Inflater, so we handle this
+ // exactly like header fields
+ assert (localBufOff < 4); // initially 0, but may need multiple calls
+ int n = Math.min(userBufLen, 4-localBufOff);
+ copyBytesToLocal(n);
+ if (localBufOff >= 4) {
+ long streamCRC = readUIntLE(localBuf, 0);
+ if (streamCRC != crc.getValue()) {
+ throw new IOException("gzip stream CRC failure");
+ }
+ localBufOff = 0;
+ crc.reset();
+ state = GzipStateLabel.TRAILER_SIZE;
+ }
+ }
+
+ if (userBufLen <= 0) {
+ return;
+ }
+
+ // verify that the mod-2^32 decompressed stream size matches the value
+ // stored in the gzip trailer
+ if (state == GzipStateLabel.TRAILER_SIZE) {
+ assert (localBufOff < 4); // initially 0, but may need multiple calls
+ int n = Math.min(userBufLen, 4-localBufOff);
+ copyBytesToLocal(n); // modifies userBufLen, etc.
+ if (localBufOff >= 4) { // should be strictly ==
+ long inputSize = readUIntLE(localBuf, 0);
+ if (inputSize != (inflater.getBytesWritten() & 0xffffffff)) {
+ throw new IOException(
+ "stored gzip size doesn't match decompressed size");
+ }
+ localBufOff = 0;
+ state = GzipStateLabel.FINISHED;
+ }
+ }
+
+ if (state == GzipStateLabel.FINISHED) {
+ return;
+ }
+ }
+
+ /**
+ * Returns the total number of compressed bytes input so far, including
+ * gzip header/trailer bytes.</p>
+ *
+ * @return the total (non-negative) number of compressed bytes read so far
+ */
+ public synchronized long getBytesRead() {
+ return headerBytesRead + inflater.getBytesRead() + trailerBytesRead;
+ }
+
+ /**
+ * Returns the number of bytes remaining in the input buffer; normally
+ * called when finished() is true to determine amount of post-gzip-stream
+ * data. Note that, other than the finished state with concatenated data
+ * after the end of the current gzip stream, this will never return a
+ * non-zero value unless called after {@link #setInput(byte[] b, int off,
+ * int len)} and before {@link #decompress(byte[] b, int off, int len)}.
+ * (That is, after {@link #decompress(byte[] b, int off, int len)} it
+ * always returns zero, except in finished state with concatenated data.)</p>
+ *
+ * @return the total (non-negative) number of unprocessed bytes in input
+ */
+ public synchronized int getRemaining() {
+ return userBufLen;
+ }
+
+ /** {@inheritDoc} */
+ public synchronized boolean needsDictionary() {
+ return inflater.needsDictionary();
+ }
+
+ /** {@inheritDoc} */
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ inflater.setDictionary(b, off, len);
+ }
+
+ /**
+ * Returns true if the end of the gzip substream (single "member") has been
+ * reached.</p>
+ */
+ public synchronized boolean finished() {
+ return (state == GzipStateLabel.FINISHED);
+ }
+
+ /**
+ * Resets everything, including the input buffer, regardless of whether the
+ * current gzip substream is finished.</p>
+ */
+ public synchronized void reset() {
+ // could optionally emit INFO message if state != GzipStateLabel.FINISHED
+ inflater.reset();
+ state = GzipStateLabel.HEADER_BASIC;
+ crc.reset();
+ userBufOff = userBufLen = 0;
+ localBufOff = 0;
+ headerBytesRead = 0;
+ trailerBytesRead = 0;
+ numExtraFieldBytesRemaining = -1;
+ hasExtraField = false;
+ hasFilename = false;
+ hasComment = false;
+ hasHeaderCRC = false;
+ }
+
+ /** {@inheritDoc} */
+ public synchronized void end() {
+ inflater.end();
+ }
+
+ /**
+ * Check ID bytes (throw if necessary), compression method (throw if not 8),
+ * and flag bits (set hasExtraField, hasFilename, hasComment, hasHeaderCRC).
+ * Ignore MTIME, XFL, OS. Caller must ensure we have at least 10 bytes (at
+ * the start of localBuf).</p>
+ */
+ /*
+ * Flag bits (remainder are reserved and must be zero):
+ * bit 0 FTEXT
+ * bit 1 FHCRC (never implemented in gzip, at least through version
+ * 1.4.0; instead interpreted as "continuation of multi-
+ * part gzip file," which is unsupported through 1.4.0)
+ * bit 2 FEXTRA
+ * bit 3 FNAME
+ * bit 4 FCOMMENT
+ * [bit 5 encrypted]
+ */
+ private void processBasicHeader() throws IOException {
+ if (readUShortLE(localBuf, 0) != GZIP_MAGIC_ID) {
+ throw new IOException("not a gzip file");
+ }
+ if (readUByte(localBuf, 2) != GZIP_DEFLATE_METHOD) {
+ throw new IOException("gzip data not compressed with deflate method");
+ }
+ int flg = readUByte(localBuf, 3);
+ if ((flg & GZIP_FLAGBITS_RESERVED) != 0) {
+ throw new IOException("unknown gzip format (reserved flagbits set)");
+ }
+ hasExtraField = ((flg & GZIP_FLAGBIT_EXTRA_FIELD) != 0);
+ hasFilename = ((flg & GZIP_FLAGBIT_FILENAME) != 0);
+ hasComment = ((flg & GZIP_FLAGBIT_COMMENT) != 0);
+ hasHeaderCRC = ((flg & GZIP_FLAGBIT_HEADER_CRC) != 0);
+ }
+
+ private void checkAndCopyBytesToLocal(int len) {
+ System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+ localBufOff += len;
+ // alternatively, could call checkAndSkipBytes(len) for rest...
+ crc.update(userBuf, userBufOff, len);
+ userBufOff += len;
+ userBufLen -= len;
+ headerBytesRead += len;
+ }
+
+ private void checkAndSkipBytes(int len) {
+ crc.update(userBuf, userBufOff, len);
+ userBufOff += len;
+ userBufLen -= len;
+ headerBytesRead += len;
+ }
+
+ // returns true if saw NULL, false if ran out of buffer first; called _only_
+ // during gzip-header processing (not trailer)
+ // (caller can check before/after state of userBufLen to compute num bytes)
+ private boolean checkAndSkipBytesUntilNull() {
+ boolean hitNull = false;
+ if (userBufLen > 0) {
+ do {
+ hitNull = (userBuf[userBufOff] == 0);
+ crc.update(userBuf[userBufOff]);
+ ++userBufOff;
+ --userBufLen;
+ ++headerBytesRead;
+ } while (userBufLen > 0 && !hitNull);
+ }
+ return hitNull;
+ }
+
+ // this one doesn't update the CRC and does support trailer processing but
+ // otherwise is same as its "checkAnd" sibling
+ private void copyBytesToLocal(int len) {
+ System.arraycopy(userBuf, userBufOff, localBuf, localBufOff, len);
+ localBufOff += len;
+ userBufOff += len;
+ userBufLen -= len;
+ if (state == GzipStateLabel.TRAILER_CRC ||
+ state == GzipStateLabel.TRAILER_SIZE) {
+ trailerBytesRead += len;
+ } else {
+ headerBytesRead += len;
+ }
+ }
+
+ private int readUByte(byte[] b, int off) {
+ return ((int)b[off] & 0xff);
+ }
+
+ // caller is responsible for not overrunning buffer
+ private int readUShortLE(byte[] b, int off) {
+ return ((((b[off+1] & 0xff) << 8) |
+ ((b[off] & 0xff) )) & 0xffff);
+ }
+
+ // caller is responsible for not overrunning buffer
+ private long readUIntLE(byte[] b, int off) {
+ return ((((long)(b[off+3] & 0xff) << 24) |
+ ((long)(b[off+2] & 0xff) << 16) |
+ ((long)(b[off+1] & 0xff) << 8) |
+ ((long)(b[off] & 0xff) )) & 0xffffffff);
+ }
+
+}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java Wed Jul 7 23:22:28 2010
@@ -166,7 +166,7 @@ public class ZlibDecompressor implements
}
public synchronized boolean needsInput() {
- // Consume remanining compressed data?
+ // Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) {
return false;
}
@@ -189,7 +189,7 @@ public class ZlibDecompressor implements
}
public synchronized boolean finished() {
- // Check if 'zlib' says its 'finished' and
+ // Check if 'zlib' says it's 'finished' and
// all compressed data has been consumed
return (finished && uncompressedDirectBuf.remaining() == 0);
}
@@ -221,7 +221,7 @@ public class ZlibDecompressor implements
n = inflateBytesDirect();
uncompressedDirectBuf.limit(n);
- // Get atmost 'len' bytes
+ // Get at most 'len' bytes
n = Math.min(n, len);
((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
@@ -229,9 +229,9 @@ public class ZlibDecompressor implements
}
/**
- * Returns the total number of compressed bytes output so far.
+ * Returns the total number of uncompressed bytes output so far.
*
- * @return the total (non-negative) number of compressed bytes output so far
+ * @return the total (non-negative) number of uncompressed bytes output so far
*/
public synchronized long getBytesWritten() {
checkStream();
@@ -239,15 +239,30 @@ public class ZlibDecompressor implements
}
/**
- * Returns the total number of uncompressed bytes input so far.</p>
+ * Returns the total number of compressed bytes input so far.</p>
*
- * @return the total (non-negative) number of uncompressed bytes input so far
+ * @return the total (non-negative) number of compressed bytes input so far
*/
public synchronized long getBytesRead() {
checkStream();
return getBytesRead(stream);
}
+ /**
+ * Returns the number of bytes remaining in the input buffers; normally
+ * called when finished() is true to determine amount of post-gzip-stream
+ * data.</p>
+ *
+ * @return the total (non-negative) number of unprocessed bytes in input
+ */
+ public synchronized int getRemaining() {
+ checkStream();
+ return userBufLen + getRemaining(stream); // userBuf + compressedDirectBuf
+ }
+
+ /**
+ * Resets everything including the input buffers (user and direct).</p>
+ */
public synchronized void reset() {
checkStream();
reset(stream);
@@ -282,6 +297,7 @@ public class ZlibDecompressor implements
private native int inflateBytesDirect();
private native static long getBytesRead(long strm);
private native static long getBytesWritten(long strm);
+ private native static int getRemaining(long strm);
private native static void reset(long strm);
private native static void end(long strm);
}
Modified: hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c (original)
+++ hadoop/common/trunk/src/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c Wed Jul 7 23:22:28 2010
@@ -291,6 +291,13 @@ Java_org_apache_hadoop_io_compress_zlib_
return (ZSTREAM(stream))->total_out;
}
+JNIEXPORT jint JNICALL
+Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_getRemaining(
+ JNIEnv *env, jclass cls, jlong stream
+ ) {
+ return (ZSTREAM(stream))->avail_in;
+}
+
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_reset(
JNIEnv *env, jclass cls, jlong stream
Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java?rev=961532&r1=961531&r2=961532&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java (original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/io/compress/TestCodec.java Wed Jul 7 23:22:28 2010
@@ -39,6 +39,7 @@ import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +54,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.CompressorStream;
+import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
@@ -70,8 +72,7 @@ import static org.junit.Assert.*;
public class TestCodec {
- private static final Log LOG=
- LogFactory.getLog(TestCodec.class);
+ private static final Log LOG= LogFactory.getLog(TestCodec.class);
private Configuration conf = new Configuration();
private int count = 10000;
@@ -277,7 +278,7 @@ public class TestCodec {
@Test
public void testCodecPoolGzipReuse() throws Exception {
Configuration conf = new Configuration();
- conf.setBoolean("hadoop.native.lib", true);
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (!ZlibFactory.isNativeZlibLoaded(conf)) {
LOG.warn("testCodecPoolGzipReuse skipped: native libs not loaded");
return;
@@ -362,7 +363,7 @@ public class TestCodec {
@Test
public void testCodecInitWithCompressionLevel() throws Exception {
Configuration conf = new Configuration();
- conf.setBoolean("io.native.lib.available", true);
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) {
LOG.info("testCodecInitWithCompressionLevel with native");
codecTestWithNOCompression(conf,
@@ -374,7 +375,7 @@ public class TestCodec {
+ ": native libs not loaded");
}
conf = new Configuration();
- conf.setBoolean("io.native.lib.available", false);
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
codecTestWithNOCompression( conf,
"org.apache.hadoop.io.compress.DefaultCodec");
}
@@ -382,14 +383,14 @@ public class TestCodec {
@Test
public void testCodecPoolCompressorReinit() throws Exception {
Configuration conf = new Configuration();
- conf.setBoolean("hadoop.native.lib", true);
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
if (ZlibFactory.isNativeZlibLoaded(conf)) {
GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
gzipReinitTest(conf, gzc);
} else {
LOG.warn("testCodecPoolCompressorReinit skipped: native libs not loaded");
}
- conf.setBoolean("hadoop.native.lib", false);
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
gzipReinitTest(conf, dfc);
}
@@ -486,53 +487,93 @@ public class TestCodec {
System.err.println("Caught: " + e);
e.printStackTrace();
}
-
+
}
@Test
- public void testCodecPoolAndGzipDecompressor() {
- // BuiltInZlibInflater should not be used as the GzipCodec decompressor.
- // Assert that this is the case.
+ public void testGzipCompatibility() throws IOException {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info("seed: " + seed);
- // Don't use native libs for this test.
- Configuration conf = new Configuration();
- conf.setBoolean("hadoop.native.lib", false);
- assertFalse("ZlibFactory is using native libs against request",
- ZlibFactory.isNativeZlibLoaded(conf));
+ DataOutputBuffer dflbuf = new DataOutputBuffer();
+ GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+ byte[] b = new byte[r.nextInt(128 * 1024 + 1)];
+ r.nextBytes(b);
+ gzout.write(b);
+ gzout.close();
- // This should give us a BuiltInZlibInflater.
- Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf);
- assertNotNull("zlibDecompressor is null!", zlibDecompressor);
- assertTrue("ZlibFactory returned unexpected inflator",
- zlibDecompressor instanceof BuiltInZlibInflater);
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
- // Asking for a decompressor directly from GzipCodec should return null;
- // its createOutputStream() just wraps the existing stream in a
- // java.util.zip.GZIPOutputStream.
- CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
- CompressionCodec codec = ccf.getCodec(new Path("foo.gz"));
- assertTrue("Codec for .gz file is not GzipCodec", codec instanceof GzipCodec);
- Decompressor codecDecompressor = codec.createDecompressor();
- if (null != codecDecompressor) {
- fail("Got non-null codecDecompressor: " + codecDecompressor);
- }
-
- // Asking the CodecPool for a decompressor for GzipCodec
- // should return null as well.
- Decompressor poolDecompressor = CodecPool.getDecompressor(codec);
- if (null != poolDecompressor) {
- fail("Got non-null poolDecompressor: " + poolDecompressor);
- }
+ Configuration conf = new Configuration();
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
+ CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+ Decompressor decom = codec.createDecompressor();
+ assertNotNull(decom);
+ assertEquals(BuiltInGzipDecompressor.class, decom.getClass());
+ InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+ dflbuf.reset();
+ IOUtils.copyBytes(gzin, dflbuf, 4096);
+ final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+ assertArrayEquals(b, dflchk);
+ }
+
+ void GzipConcatTest(Configuration conf,
+ Class<? extends Decompressor> decomClass) throws IOException {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ LOG.info(decomClass + " seed: " + seed);
+
+ final int CONCAT = r.nextInt(4) + 3;
+ final int BUFLEN = 128 * 1024;
+ DataOutputBuffer dflbuf = new DataOutputBuffer();
+ DataOutputBuffer chkbuf = new DataOutputBuffer();
+ byte[] b = new byte[BUFLEN];
+ for (int i = 0; i < CONCAT; ++i) {
+ GZIPOutputStream gzout = new GZIPOutputStream(dflbuf);
+ r.nextBytes(b);
+ int len = r.nextInt(BUFLEN);
+ int off = r.nextInt(BUFLEN - len);
+ chkbuf.write(b, off, len);
+ gzout.write(b, off, len);
+ gzout.close();
+ }
+ final byte[] chk = Arrays.copyOf(chkbuf.getData(), chkbuf.getLength());
+
+ CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, conf);
+ Decompressor decom = codec.createDecompressor();
+ assertNotNull(decom);
+ assertEquals(decomClass, decom.getClass());
+ DataInputBuffer gzbuf = new DataInputBuffer();
+ gzbuf.reset(dflbuf.getData(), dflbuf.getLength());
+ InputStream gzin = codec.createInputStream(gzbuf, decom);
+
+ dflbuf.reset();
+ IOUtils.copyBytes(gzin, dflbuf, 4096);
+ final byte[] dflchk = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+ assertArrayEquals(chk, dflchk);
+ }
- // If we then ensure that the pool is populated...
- CodecPool.returnDecompressor(zlibDecompressor);
+ @Test
+ public void testBuiltInGzipConcat() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
+ GzipConcatTest(conf, BuiltInGzipDecompressor.class);
+ }
- // Asking the pool another time should still not bind this to GzipCodec.
- poolDecompressor = CodecPool.getDecompressor(codec);
- if (null != poolDecompressor) {
- fail("Second time, got non-null poolDecompressor: "
- + poolDecompressor);
+ @Test
+ public void testNativeGzipConcat() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true);
+ if (!ZlibFactory.isNativeZlibLoaded(conf)) {
+ LOG.warn("skipped: native libs not loaded");
+ return;
}
+ GzipConcatTest(conf, GzipCodec.GzipZlibDecompressor.class);
}
@Test
@@ -542,7 +583,7 @@ public class TestCodec {
// Don't use native libs for this test.
Configuration conf = new Configuration();
- conf.setBoolean("hadoop.native.lib", false);
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
assertFalse("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));
@@ -595,7 +636,7 @@ public class TestCodec {
// Don't use native libs for this test.
Configuration conf = new Configuration();
- conf.setBoolean("hadoop.native.lib", false);
+ conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, false);
assertFalse("ZlibFactory is using native libs against request",
ZlibFactory.isNativeZlibLoaded(conf));