You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2018/12/10 03:43:59 UTC
[1/6] curator git commit: Make GzipCompressionProvider to recycle
Deflaters and Inflaters in pools
Repository: curator
Updated Branches:
refs/heads/master b1e69a6af -> 5b15f5fab
Make GzipCompressionProvider to recycle Deflaters and Inflaters in pools
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/6457f267
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/6457f267
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/6457f267
Branch: refs/heads/master
Commit: 6457f267ad713854fb89d32a56f266202cf82cd5
Parents: eebff92
Author: Roman Leventov <le...@gmail.com>
Authored: Fri Nov 30 18:49:10 2018 +0100
Committer: Roman Leventov <le...@gmail.com>
Committed: Fri Nov 30 18:55:36 2018 +0100
----------------------------------------------------------------------
.../framework/imps/GzipCompressionProvider.java | 307 +++++++++++++++++--
.../imps/GzipCompressionProviderTest.java | 88 ++++++
2 files changed, 373 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/6457f267/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
index 7b35c37..74d0347 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
@@ -18,47 +18,310 @@
*/
package org.apache.curator.framework.imps;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.curator.framework.api.CompressionProvider;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.zip.*;
public class GzipCompressionProvider implements CompressionProvider
{
+ // This class re-implements java.util.zip.GZIPInputStream and GZIPOutputStream functionality to avoid
+ // creation many finalized Deflater and Inflater objects on heap (see
+ // https://issues.apache.org/jira/browse/CURATOR-487). Even when Curator's minimum supported Java version becomes
+ // no less than Java 12, where finalize() methods are removed in Deflater and Inflater classes and instead they
+ // are phantom-referenced via Cleaner, it still makes sense to avoid GZIPInputStream and GZIPOutputStream because
+ // phantom references are also not entirely free for GC algorithms, and also to allocate less garbage and make
+ // less unnecessary data copies.
+
+ private static final int MAX_SAFE_JAVA_BYTE_ARRAY_SIZE = Integer.MAX_VALUE - 128;
+
+ /** GZIP header magic number. */
+ private static final int GZIP_MAGIC = 0x8b1f;
+
+ /** See {@code java.util.zip.GZIPOutputStream.writeHeader()} */
+ private static final byte[] GZIP_HEADER = new byte[] {
+ (byte) GZIP_MAGIC, // Magic number (byte 0)
+ (byte) (GZIP_MAGIC >> 8), // Magic number (byte 1)
+ Deflater.DEFLATED, // Compression method (CM)
+ 0, // Flags (FLG)
+ 0, // Modification time MTIME (byte 0)
+ 0, // Modification time MTIME (byte 1)
+ 0, // Modification time MTIME (byte 2)
+ 0, // Modification time MTIME (byte 3)
+ 0, // Extra flags (XFLG)
+ 0 // Operating system (OS)
+ };
+
+ /** GZip flags, {@link #GZIP_HEADER}'s 4th byte */
+ private static final int FHCRC = 1 << 1;
+ private static final int FEXTRA = 1 << 2;
+ private static final int FNAME = 1 << 3;
+ private static final int FCOMMENT = 1 << 4;
+
+ private static final int GZIP_HEADER_SIZE = GZIP_HEADER.length;
+
+ /** 32-bit CRC and uncompressed data size */
+ private static final int GZIP_TRAILER_SIZE = Integer.BYTES + Integer.BYTES;
+
+ /**
+ * Since Deflaters and Inflaters are acquired and returned to the pools in try-finally blocks that are free of
+ * blocking calls themselves, it's not expected that the number of objects in the pools could exceed the number of
+ * hardware threads on the machine much. Therefore it's accepted to have simple pools of strongly-referenced
+ * objects.
+ */
+ private static final ConcurrentLinkedQueue<Deflater> DEFLATER_POOL = new ConcurrentLinkedQueue<>();
+ private static final ConcurrentLinkedQueue<Inflater> INFLATER_POOL = new ConcurrentLinkedQueue<>();
+
+ /** The value verified in GzipCompressionProviderTest.testEmpty() */
+ private static final byte[] COMPRESSED_EMPTY_BYTES = new byte[] {
+ 31, -117, 8, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0
+ };
+
+ private static Deflater acquireDeflater()
+ {
+ Deflater deflater = DEFLATER_POOL.poll();
+ if ( deflater == null )
+ {
+ // Using the same settings as in GZIPOutputStream constructor
+ deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ }
+ return deflater;
+ }
+
+ private static Inflater acquireInflater()
+ {
+ Inflater inflater = INFLATER_POOL.poll();
+ if ( inflater == null )
+ {
+ // Using the same nowrap setting as GZIPInputStream constructor
+ inflater = new Inflater(true);
+ }
+ return inflater;
+ }
+
@Override
- public byte[] compress(String path, byte[] data) throws Exception
+ public byte[] compress(String path, byte[] data)
{
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- GZIPOutputStream out = new GZIPOutputStream(bytes);
+ if ( data.length == 0 )
+ {
+ // clone() because clients could update the array
+ return COMPRESSED_EMPTY_BYTES.clone();
+ }
+ return doCompress(data);
+ }
+
+ @VisibleForTesting
+ static byte[] doCompress(byte[] data)
+ {
+ byte[] result = Arrays.copyOf(GZIP_HEADER, conservativeGZippedSizeEstimate(data.length));
+ Deflater deflater = acquireDeflater();
try {
- out.write(data);
- out.finish();
+ deflater.setInput(data);
+ deflater.finish();
+ int offset = GZIP_HEADER_SIZE;
+ while ( true )
+ {
+ int available = result.length - GZIP_TRAILER_SIZE - offset;
+ int numCompressedBytes = deflater.deflate(result, offset, available);
+ offset += numCompressedBytes;
+ if ( deflater.finished() )
+ {
+ break;
+ }
+ // `+ 1` to ensure some growth on the sizes of 0 or 1
+ int newResultLength = result.length + (result.length / 2) + 1;
+ result = Arrays.copyOf(result, newResultLength);
+ }
+ // Write GZip trailer
+ CRC32 crc = new CRC32();
+ crc.update(data, 0, data.length);
+ writeLittleEndianInt(result, offset, (int) crc.getValue());
+ writeLittleEndianInt(result, offset + 4, data.length);
+ int endOffset = offset + GZIP_TRAILER_SIZE;
+ if ( result.length != endOffset )
+ {
+ result = Arrays.copyOf(result, endOffset);
+ }
+ return result;
} finally {
- out.close();
+ deflater.reset();
+ DEFLATER_POOL.add(deflater);
}
- return bytes.toByteArray();
}
- @Override
- public byte[] decompress(String path, byte[] compressedData) throws Exception
+ private static int conservativeGZippedSizeEstimate(int dataSize)
{
- ByteArrayOutputStream bytes = new ByteArrayOutputStream(compressedData.length);
- GZIPInputStream in = new GZIPInputStream(new ByteArrayInputStream(compressedData));
+ int conservativeCompressedDataSizeEstimate;
+ if ( dataSize < 512 )
+ {
+ // Assuming DEFLATE doesn't compress small data well
+ conservativeCompressedDataSizeEstimate = dataSize;
+ }
+ else
+ {
+ // Assuming pretty bad 2:1 compression ratio
+ conservativeCompressedDataSizeEstimate = Math.max(512, dataSize / 2);
+ }
+ return GZIP_HEADER_SIZE + conservativeCompressedDataSizeEstimate + GZIP_TRAILER_SIZE;
+ }
+
+ private static void writeLittleEndianInt(byte[] b, int offset, int v)
+ {
+ b[offset] = (byte) v;
+ b[offset + 1] = (byte) (v >> 8);
+ b[offset + 2] = (byte) (v >> 16);
+ b[offset + 3] = (byte) (v >> 24);
+ }
+
+ @Override
+ public byte[] decompress(String path, byte[] gzippedDataBytes) throws IOException {
+ if ( Arrays.equals(gzippedDataBytes, COMPRESSED_EMPTY_BYTES) )
+ {
+ // Allocating a new array instead of creating a static constant because clients may somehow depend on the
+ // identity of the returned arrays
+ return new byte[0];
+ }
+ ByteBuffer gzippedData = ByteBuffer.wrap(gzippedDataBytes);
+ gzippedData.order(ByteOrder.LITTLE_ENDIAN);
+ int headerSize = readGzipHeader(gzippedData);
+ if ( gzippedDataBytes.length < headerSize + GZIP_TRAILER_SIZE )
+ {
+ throw new EOFException("Too short GZipped data");
+ }
+ int compressedDataSize = gzippedDataBytes.length - headerSize - GZIP_TRAILER_SIZE;
+ // Assuming 3:1 compression ratio. Intentionally a more generous estimation than in
+ // conservativeGZippedSizeEstimate() to reduce the probability of result array reallocation.
+ int initialResultLength = (int) Math.min(compressedDataSize * 3L, MAX_SAFE_JAVA_BYTE_ARRAY_SIZE);
+ byte[] result = new byte[initialResultLength];
+ Inflater inflater = acquireInflater();
try {
- byte[] buffer = new byte[compressedData.length];
- for(;;)
+ inflater.setInput(gzippedDataBytes, headerSize, compressedDataSize);
+ CRC32 crc = new CRC32();
+ int offset = 0;
+ while (true)
{
- int bytesRead = in.read(buffer, 0, buffer.length);
- if ( bytesRead < 0 )
+ int numDecompressedBytes;
+ try {
+ numDecompressedBytes = inflater.inflate(result, offset, result.length - offset);
+ } catch (DataFormatException e) {
+ String s = e.getMessage();
+ throw new ZipException(s != null ? s : "Invalid ZLIB data format");
+ }
+ crc.update(result, offset, numDecompressedBytes);
+ offset += numDecompressedBytes;
+ if ( inflater.finished() || inflater.needsDictionary() )
{
break;
}
- bytes.write(buffer, 0, bytesRead);
+ else if ( inflater.needsInput() )
+ {
+ throw new ZipException("Corrupt GZipped data");
+ }
+ // Inflater's contract doesn't say whether it's able to be finished() without returning 0 from inflate()
+ // call, so the additional `numDecompressedBytes == 0` condition ensures that we did another cycle and
+ // definitely need to inflate some more bytes.
+ if ( result.length == MAX_SAFE_JAVA_BYTE_ARRAY_SIZE && numDecompressedBytes == 0 )
+ {
+ throw new OutOfMemoryError("Unable to uncompress that much data into a single byte[] array");
+ }
+ // `+ 1` to ensure some growth on the sizes of 0 or 1
+ int newResultLength =
+ (int) Math.min((long) result.length + (result.length / 2) + 1, MAX_SAFE_JAVA_BYTE_ARRAY_SIZE);
+ if ( result.length != newResultLength )
+ {
+ result = Arrays.copyOf(result, newResultLength);
+ }
+ }
+ if ( inflater.getRemaining() != 0 )
+ {
+ throw new ZipException("Expected just one GZip block, without garbage in the end");
}
+ int checksum = gzippedData.getInt(gzippedDataBytes.length - GZIP_TRAILER_SIZE);
+ int numUncompressedBytes = gzippedData.getInt(gzippedDataBytes.length - Integer.BYTES);
+ if ( checksum != (int) crc.getValue() || numUncompressedBytes != offset )
+ {
+ throw new ZipException("Corrupt GZIP trailer");
+ }
+ if ( result.length != offset )
+ {
+ result = Arrays.copyOf(result, offset);
+ }
+ return result;
} finally {
- in.close();
+ inflater.reset();
+ INFLATER_POOL.add(inflater);
+ }
+ }
+
+ /**
+ * Returns the header size
+ */
+ private static int readGzipHeader(ByteBuffer gzippedData) throws IOException
+ {
+ try {
+ return doReadHeader(gzippedData);
+ } catch (BufferUnderflowException e) {
+ throw new EOFException();
+ }
+ }
+
+ private static int doReadHeader(ByteBuffer gzippedData) throws IOException {
+ if ( gzippedData.getChar() != GZIP_MAGIC )
+ {
+ throw new ZipException("Not in GZip format");
+ }
+ if ( gzippedData.get() != Deflater.DEFLATED )
+ {
+ throw new ZipException("Unsupported compression method");
+ }
+ int flags = gzippedData.get();
+ // Skip MTIME, XFL, and OS fields
+ skip(gzippedData, Integer.BYTES + Byte.BYTES + Byte.BYTES);
+ if ( (flags & FEXTRA) != 0 )
+ {
+ int extraBytes = gzippedData.getChar();
+ skip(gzippedData, extraBytes);
+ }
+ if ( (flags & FNAME) != 0 )
+ {
+ skipZeroTerminatedString(gzippedData);
+ }
+ if ( (flags & FCOMMENT) != 0 )
+ {
+ skipZeroTerminatedString(gzippedData);
+ }
+ if ( (flags & FHCRC) != 0 )
+ {
+ CRC32 crc = new CRC32();
+ crc.update(gzippedData.array(), 0, gzippedData.position());
+ if ( gzippedData.getChar() != (char) crc.getValue() )
+ {
+ throw new ZipException("Corrupt GZIP header");
+ }
+ }
+ return gzippedData.position();
+ }
+
+ private static void skip(ByteBuffer gzippedData, int skipBytes) throws IOException
+ {
+ try {
+ gzippedData.position(gzippedData.position() + skipBytes);
+ } catch (IllegalArgumentException e) {
+ throw new EOFException();
+ }
+ }
+
+ private static void skipZeroTerminatedString(ByteBuffer gzippedData)
+ {
+ while (gzippedData.get() != 0) {
+ // loop
}
- return bytes.toByteArray();
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/6457f267/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
new file mode 100644
index 0000000..11b8c63
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class GzipCompressionProviderTest
+{
+ @Test
+ public void testSimple() throws IOException
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ byte[] data = "Hello, world!".getBytes();
+ byte[] compressedData = provider.compress(null, data);
+ byte[] decompressedData = provider.decompress(null, compressedData);
+ Assert.assertTrue(Arrays.equals(decompressedData, data));
+ }
+
+ @Test
+ public void testEmpty() throws IOException
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ byte[] compressedData = provider.compress(null, new byte[0]);
+ byte[] compressedData2 = GzipCompressionProvider.doCompress(new byte[0]);
+ // Ensures GzipCompressionProvider.COMPRESSED_EMPTY_BYTES value is valid
+ Assert.assertTrue(Arrays.equals(compressedData, compressedData2));
+ byte[] decompressedData = provider.decompress(null, compressedData);
+ Assert.assertEquals(0, decompressedData.length);
+ }
+
+ /**
+ * This test ensures that in the face of corrupt data, specifically IOException is thrown, rather some other kind
+ * of runtime exception. Users of {@link GzipCompressionProvider#decompress(String, byte[])} may depend on this.
+ */
+ @Test
+ public void testDecompressCorrupt()
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ try {
+ provider.decompress(null, new byte[100]);
+ Assert.fail("Expected IOException");
+ } catch (IOException ignore) {
+ // expected
+ }
+ byte[] compressedData = provider.compress(null, new byte[0]);
+ for (int i = 0; i < compressedData.length; i++)
+ {
+ try {
+ provider.decompress(null, Arrays.copyOf(compressedData, i));
+ } catch (IOException ignore) {
+ // expected
+ }
+ for (int change = 1; change < 256; change++)
+ {
+ byte b = compressedData[i];
+ compressedData[i] = (byte) (b + change);
+ try {
+ provider.decompress(null, compressedData);
+ // No exception is OK
+ } catch (IOException ignore) {
+ // expected
+ }
+ // reset value back
+ compressedData[i] = b;
+ }
+ }
+ }
+}
[4/6] curator git commit: CURATOR-487 - Rename unit test
Posted by ca...@apache.org.
CURATOR-487 - Rename unit test
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ec702086
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ec702086
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ec702086
Branch: refs/heads/master
Commit: ec7020861a724446e663156a7939555623fd70dd
Parents: 388787c
Author: Cam McKenzie <ca...@apache.org>
Authored: Mon Dec 10 09:43:34 2018 +1100
Committer: Cam McKenzie <ca...@apache.org>
Committed: Mon Dec 10 09:43:34 2018 +1100
----------------------------------------------------------------------
.../imps/GzipCompressionProviderTest.java | 125 -------------------
.../imps/TestGzipCompressionProvider.java | 125 +++++++++++++++++++
2 files changed, 125 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ec702086/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
deleted file mode 100644
index 51edaba..0000000
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.imps;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.zip.GZIPOutputStream;
-
-public class GzipCompressionProviderTest
-{
- @Test
- public void testSimple() throws IOException
- {
- GzipCompressionProvider provider = new GzipCompressionProvider();
- byte[] data = "Hello, world!".getBytes();
- byte[] compressedData = provider.compress(null, data);
- byte[] jdkCompressedData = jdkCompress(data);
- Assert.assertTrue(Arrays.equals(compressedData, jdkCompressedData));
- byte[] decompressedData = provider.decompress(null, compressedData);
- Assert.assertTrue(Arrays.equals(decompressedData, data));
- }
-
- @Test
- public void testEmpty() throws IOException
- {
- GzipCompressionProvider provider = new GzipCompressionProvider();
- byte[] compressedData = provider.compress(null, new byte[0]);
- byte[] compressedData2 = GzipCompressionProvider.doCompress(new byte[0]);
- byte[] jdkCompress = jdkCompress(new byte[0]);
- // Ensures GzipCompressionProvider.COMPRESSED_EMPTY_BYTES value is valid
- Assert.assertTrue(Arrays.equals(compressedData, compressedData2));
- Assert.assertTrue(Arrays.equals(compressedData, jdkCompress));
- byte[] decompressedData = provider.decompress(null, compressedData);
- Assert.assertEquals(0, decompressedData.length);
- }
-
- /**
- * This test ensures that in the face of corrupt data, specifically IOException is thrown, rather some other kind
- * of runtime exception. Users of {@link GzipCompressionProvider#decompress(String, byte[])} may depend on this.
- */
- @Test
- public void testDecompressCorrupt()
- {
- GzipCompressionProvider provider = new GzipCompressionProvider();
- try {
- provider.decompress(null, new byte[100]);
- Assert.fail("Expected IOException");
- } catch (IOException ignore) {
- // expected
- }
- byte[] compressedData = provider.compress(null, new byte[0]);
- for (int i = 0; i < compressedData.length; i++)
- {
- try {
- provider.decompress(null, Arrays.copyOf(compressedData, i));
- } catch (IOException ignore) {
- // expected
- }
- for (int change = 1; change < 256; change++)
- {
- byte b = compressedData[i];
- compressedData[i] = (byte) (b + change);
- try {
- provider.decompress(null, compressedData);
- // No exception is OK
- } catch (IOException ignore) {
- // expected
- }
- // reset value back
- compressedData[i] = b;
- }
- }
- }
-
- @Test
- public void smokeTestRandomDataWithJdk() throws IOException
- {
- GzipCompressionProvider provider = new GzipCompressionProvider();
- ThreadLocalRandom random = ThreadLocalRandom.current();
- for (int len = 1; len < 100; len++)
- {
- byte[] data = new byte[len];
- for (int i = 0; i < 100; i++) {
- byte[] compressedData = provider.compress(null, data);
- byte[] jdkCompressedData = jdkCompress(data);
- Assert.assertTrue(Arrays.equals(compressedData, jdkCompressedData));
- byte[] decompressedData = provider.decompress(null, compressedData);
- Assert.assertTrue(Arrays.equals(decompressedData, data));
- // in the end of the iteration to test empty array first
- random.nextBytes(data);
- }
- }
- }
-
- private static byte[] jdkCompress(byte[] data) throws IOException
- {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
- out.write(data);
- out.finish();
- }
- return bytes.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/ec702086/curator-framework/src/test/java/org/apache/curator/framework/imps/TestGzipCompressionProvider.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestGzipCompressionProvider.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestGzipCompressionProvider.java
new file mode 100644
index 0000000..2856b4d
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestGzipCompressionProvider.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.imps;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.zip.GZIPOutputStream;
+
+public class TestGzipCompressionProvider
+{
+ @Test
+ public void testSimple() throws IOException
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ byte[] data = "Hello, world!".getBytes();
+ byte[] compressedData = provider.compress(null, data);
+ byte[] jdkCompressedData = jdkCompress(data);
+ Assert.assertTrue(Arrays.equals(compressedData, jdkCompressedData));
+ byte[] decompressedData = provider.decompress(null, compressedData);
+ Assert.assertTrue(Arrays.equals(decompressedData, data));
+ }
+
+ @Test
+ public void testEmpty() throws IOException
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ byte[] compressedData = provider.compress(null, new byte[0]);
+ byte[] compressedData2 = GzipCompressionProvider.doCompress(new byte[0]);
+ byte[] jdkCompress = jdkCompress(new byte[0]);
+ // Ensures GzipCompressionProvider.COMPRESSED_EMPTY_BYTES value is valid
+ Assert.assertTrue(Arrays.equals(compressedData, compressedData2));
+ Assert.assertTrue(Arrays.equals(compressedData, jdkCompress));
+ byte[] decompressedData = provider.decompress(null, compressedData);
+ Assert.assertEquals(0, decompressedData.length);
+ }
+
+ /**
+ * This test ensures that in the face of corrupt data, specifically IOException is thrown, rather some other kind
+ * of runtime exception. Users of {@link GzipCompressionProvider#decompress(String, byte[])} may depend on this.
+ */
+ @Test
+ public void testDecompressCorrupt()
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ try {
+ provider.decompress(null, new byte[100]);
+ Assert.fail("Expected IOException");
+ } catch (IOException ignore) {
+ // expected
+ }
+ byte[] compressedData = provider.compress(null, new byte[0]);
+ for (int i = 0; i < compressedData.length; i++)
+ {
+ try {
+ provider.decompress(null, Arrays.copyOf(compressedData, i));
+ } catch (IOException ignore) {
+ // expected
+ }
+ for (int change = 1; change < 256; change++)
+ {
+ byte b = compressedData[i];
+ compressedData[i] = (byte) (b + change);
+ try {
+ provider.decompress(null, compressedData);
+ // No exception is OK
+ } catch (IOException ignore) {
+ // expected
+ }
+ // reset value back
+ compressedData[i] = b;
+ }
+ }
+ }
+
+ @Test
+ public void smokeTestRandomDataWithJdk() throws IOException
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int len = 1; len < 100; len++)
+ {
+ byte[] data = new byte[len];
+ for (int i = 0; i < 100; i++) {
+ byte[] compressedData = provider.compress(null, data);
+ byte[] jdkCompressedData = jdkCompress(data);
+ Assert.assertTrue(Arrays.equals(compressedData, jdkCompressedData));
+ byte[] decompressedData = provider.decompress(null, compressedData);
+ Assert.assertTrue(Arrays.equals(decompressedData, data));
+ // in the end of the iteration to test empty array first
+ random.nextBytes(data);
+ }
+ }
+ }
+
+ private static byte[] jdkCompress(byte[] data) throws IOException
+ {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
+ out.write(data);
+ out.finish();
+ }
+ return bytes.toByteArray();
+ }
+}
[6/6] curator git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/curator
Posted by ca...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/curator
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5b15f5fa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5b15f5fa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5b15f5fa
Branch: refs/heads/master
Commit: 5b15f5fab8f444c1a66ef2d5550254ffaa1cfaff
Parents: f6868f4 b1e69a6
Author: Cam McKenzie <ca...@apache.org>
Authored: Mon Dec 10 10:55:54 2018 +1100
Committer: Cam McKenzie <ca...@apache.org>
Committed: Mon Dec 10 10:55:54 2018 +1100
----------------------------------------------------------------------
pom.xml | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
[2/6] curator git commit: Add tests
Posted by ca...@apache.org.
Add tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7467e596
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7467e596
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7467e596
Branch: refs/heads/master
Commit: 7467e5966fea5cabfa47d189688a6b0bd88a7a6d
Parents: 6457f26
Author: Roman Leventov <le...@gmail.com>
Authored: Fri Nov 30 20:22:51 2018 +0100
Committer: Roman Leventov <le...@gmail.com>
Committed: Fri Nov 30 20:22:51 2018 +0100
----------------------------------------------------------------------
.../framework/imps/GzipCompressionProvider.java | 22 +++++++-----
.../imps/GzipCompressionProviderTest.java | 37 ++++++++++++++++++++
2 files changed, 50 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/7467e596/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
index 74d0347..fa357d2 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GzipCompressionProvider.java
@@ -70,11 +70,14 @@ public class GzipCompressionProvider implements CompressionProvider
/** 32-bit CRC and uncompressed data size */
private static final int GZIP_TRAILER_SIZE = Integer.BYTES + Integer.BYTES;
+ /** DEFLATE doesn't produce shorter compressed data */
+ private static final int MIN_COMPRESSED_DATA_SIZE = 2;
+
/**
* Since Deflaters and Inflaters are acquired and returned to the pools in try-finally blocks that are free of
* blocking calls themselves, it's not expected that the number of objects in the pools could exceed the number of
- * hardware threads on the machine much. Therefore it's accepted to have simple pools of strongly-referenced
- * objects.
+ * hardware threads on the machine much. Therefore it's accepted to have simple "ever-growing" (in fact, no) pools
+ * of strongly-referenced objects.
*/
private static final ConcurrentLinkedQueue<Deflater> DEFLATER_POOL = new ConcurrentLinkedQueue<>();
private static final ConcurrentLinkedQueue<Inflater> INFLATER_POOL = new ConcurrentLinkedQueue<>();
@@ -135,8 +138,7 @@ public class GzipCompressionProvider implements CompressionProvider
{
break;
}
- // `+ 1` to ensure some growth on the sizes of 0 or 1
- int newResultLength = result.length + (result.length / 2) + 1;
+ int newResultLength = result.length + (result.length / 2);
result = Arrays.copyOf(result, newResultLength);
}
// Write GZip trailer
@@ -162,7 +164,7 @@ public class GzipCompressionProvider implements CompressionProvider
if ( dataSize < 512 )
{
// Assuming DEFLATE doesn't compress small data well
- conservativeCompressedDataSizeEstimate = dataSize;
+ conservativeCompressedDataSizeEstimate = Math.max(dataSize, MIN_COMPRESSED_DATA_SIZE);
}
else
{
@@ -191,7 +193,7 @@ public class GzipCompressionProvider implements CompressionProvider
ByteBuffer gzippedData = ByteBuffer.wrap(gzippedDataBytes);
gzippedData.order(ByteOrder.LITTLE_ENDIAN);
int headerSize = readGzipHeader(gzippedData);
- if ( gzippedDataBytes.length < headerSize + GZIP_TRAILER_SIZE )
+ if ( gzippedDataBytes.length < headerSize + MIN_COMPRESSED_DATA_SIZE + GZIP_TRAILER_SIZE )
{
throw new EOFException("Too short GZipped data");
}
@@ -220,7 +222,10 @@ public class GzipCompressionProvider implements CompressionProvider
{
break;
}
- else if ( inflater.needsInput() )
+ // Just calling inflater.needsInput() doesn't work as expected, apparently it doesn't uphold it's own
+ // contract and could have needsInput() == true if numDecompressedBytes != 0 and that just means that
+ // there is not enough space in the result array
+ else if ( numDecompressedBytes == 0 && inflater.needsInput() )
{
throw new ZipException("Corrupt GZipped data");
}
@@ -231,9 +236,8 @@ public class GzipCompressionProvider implements CompressionProvider
{
throw new OutOfMemoryError("Unable to uncompress that much data into a single byte[] array");
}
- // `+ 1` to ensure some growth on the sizes of 0 or 1
int newResultLength =
- (int) Math.min((long) result.length + (result.length / 2) + 1, MAX_SAFE_JAVA_BYTE_ARRAY_SIZE);
+ (int) Math.min((long) result.length + (result.length / 2), MAX_SAFE_JAVA_BYTE_ARRAY_SIZE);
if ( result.length != newResultLength )
{
result = Arrays.copyOf(result, newResultLength);
http://git-wip-us.apache.org/repos/asf/curator/blob/7467e596/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
index 11b8c63..51edaba 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/GzipCompressionProviderTest.java
@@ -21,8 +21,11 @@ package org.apache.curator.framework.imps;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.zip.GZIPOutputStream;
public class GzipCompressionProviderTest
{
@@ -32,6 +35,8 @@ public class GzipCompressionProviderTest
GzipCompressionProvider provider = new GzipCompressionProvider();
byte[] data = "Hello, world!".getBytes();
byte[] compressedData = provider.compress(null, data);
+ byte[] jdkCompressedData = jdkCompress(data);
+ Assert.assertTrue(Arrays.equals(compressedData, jdkCompressedData));
byte[] decompressedData = provider.decompress(null, compressedData);
Assert.assertTrue(Arrays.equals(decompressedData, data));
}
@@ -42,8 +47,10 @@ public class GzipCompressionProviderTest
GzipCompressionProvider provider = new GzipCompressionProvider();
byte[] compressedData = provider.compress(null, new byte[0]);
byte[] compressedData2 = GzipCompressionProvider.doCompress(new byte[0]);
+ byte[] jdkCompress = jdkCompress(new byte[0]);
// Ensures GzipCompressionProvider.COMPRESSED_EMPTY_BYTES value is valid
Assert.assertTrue(Arrays.equals(compressedData, compressedData2));
+ Assert.assertTrue(Arrays.equals(compressedData, jdkCompress));
byte[] decompressedData = provider.decompress(null, compressedData);
Assert.assertEquals(0, decompressedData.length);
}
@@ -85,4 +92,34 @@ public class GzipCompressionProviderTest
}
}
}
+
+ @Test
+ public void smokeTestRandomDataWithJdk() throws IOException
+ {
+ GzipCompressionProvider provider = new GzipCompressionProvider();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int len = 1; len < 100; len++)
+ {
+ byte[] data = new byte[len];
+ for (int i = 0; i < 100; i++) {
+ byte[] compressedData = provider.compress(null, data);
+ byte[] jdkCompressedData = jdkCompress(data);
+ Assert.assertTrue(Arrays.equals(compressedData, jdkCompressedData));
+ byte[] decompressedData = provider.decompress(null, compressedData);
+ Assert.assertTrue(Arrays.equals(decompressedData, data));
+ // in the end of the iteration to test empty array first
+ random.nextBytes(data);
+ }
+ }
+ }
+
+ private static byte[] jdkCompress(byte[] data) throws IOException
+ {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ try (GZIPOutputStream out = new GZIPOutputStream(bytes)) {
+ out.write(data);
+ out.finish();
+ }
+ return bytes.toByteArray();
+ }
}
[5/6] curator git commit: Merge branch 'CURATOR-487'
Posted by ca...@apache.org.
Merge branch 'CURATOR-487'
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f6868f4d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f6868f4d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f6868f4d
Branch: refs/heads/master
Commit: f6868f4dd76d3ba556268a96a295dc61e161b235
Parents: f49a6d1 ec70208
Author: Cam McKenzie <ca...@apache.org>
Authored: Mon Dec 10 10:55:44 2018 +1100
Committer: Cam McKenzie <ca...@apache.org>
Committed: Mon Dec 10 10:55:44 2018 +1100
----------------------------------------------------------------------
.../framework/imps/GzipCompressionProvider.java | 311 +++++++++++++++++--
.../imps/TestGzipCompressionProvider.java | 125 ++++++++
2 files changed, 414 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[3/6] curator git commit: Merge branch
'GzipCompressionProvider-references' of https://github.com/leventov/curator
into CURATOR-487
Posted by ca...@apache.org.
Merge branch 'GzipCompressionProvider-references' of https://github.com/leventov/curator into CURATOR-487
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/388787c8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/388787c8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/388787c8
Branch: refs/heads/master
Commit: 388787c838cb2e6e0b755b576eb958257e2cef22
Parents: 05f6a56 7467e59
Author: Cam McKenzie <ca...@apache.org>
Authored: Mon Dec 10 08:23:30 2018 +1100
Committer: Cam McKenzie <ca...@apache.org>
Committed: Mon Dec 10 08:23:30 2018 +1100
----------------------------------------------------------------------
.../framework/imps/GzipCompressionProvider.java | 311 +++++++++++++++++--
.../imps/GzipCompressionProviderTest.java | 125 ++++++++
2 files changed, 414 insertions(+), 22 deletions(-)
----------------------------------------------------------------------