You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by dk...@apache.org on 2020/05/21 14:17:56 UTC

[avro] branch master updated: [AVRO-2589] Trevni codec and test fixes

This is an automated email from the ASF dual-hosted git repository.

dkulp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new efc7cbb  [AVRO-2589] Trevni codec and test fixes
efc7cbb is described below

commit efc7cbb8f6b586569903bda469a21b442e461108
Author: Jacob Tolar <ac...@sheckel.net>
AuthorDate: Wed Oct 9 17:03:15 2019 -0500

    [AVRO-2589] Trevni codec and test fixes
    
    * Unit test improvements
    * Fix handling of sliced ByteBuffers in codecs
    
    Same as changes for AVRO-2245
---
 .../main/java/org/apache/trevni/BZip2Codec.java    |  11 +-
 .../src/main/java/org/apache/trevni/Codec.java     |   6 ++
 .../main/java/org/apache/trevni/DeflateCodec.java  |  13 ++-
 .../main/java/org/apache/trevni/SnappyCodec.java   |   8 +-
 .../test/java/org/apache/trevni/TestAllCodecs.java | 119 +++++++++++++++++++++
 .../java/org/apache/trevni/TestBZip2Codec.java     |  67 ------------
 6 files changed, 141 insertions(+), 83 deletions(-)

diff --git a/lang/java/trevni/core/src/main/java/org/apache/trevni/BZip2Codec.java b/lang/java/trevni/core/src/main/java/org/apache/trevni/BZip2Codec.java
index ce6ab71..7012bcd 100644
--- a/lang/java/trevni/core/src/main/java/org/apache/trevni/BZip2Codec.java
+++ b/lang/java/trevni/core/src/main/java/org/apache/trevni/BZip2Codec.java
@@ -35,16 +35,16 @@ public class BZip2Codec extends Codec {
     ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
 
     try (BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos)) {
-      outputStream.write(uncompressedData.array());
+      outputStream.write(uncompressedData.array(), computeOffset(uncompressedData), uncompressedData.remaining());
     }
 
-    ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
-    return result;
+    return ByteBuffer.wrap(baos.toByteArray());
   }
 
   @Override
   ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
-    ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array());
+    ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData),
+        compressedData.remaining());
     try (BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais)) {
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
@@ -56,8 +56,7 @@ public class BZip2Codec extends Codec {
         baos.write(buffer, 0, readCount);
       }
 
-      ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
-      return result;
+      return ByteBuffer.wrap(baos.toByteArray());
     }
   }
 
diff --git a/lang/java/trevni/core/src/main/java/org/apache/trevni/Codec.java b/lang/java/trevni/core/src/main/java/org/apache/trevni/Codec.java
index 0041f50..7de0acc 100644
--- a/lang/java/trevni/core/src/main/java/org/apache/trevni/Codec.java
+++ b/lang/java/trevni/core/src/main/java/org/apache/trevni/Codec.java
@@ -43,4 +43,10 @@ abstract class Codec {
   /** Decompress data */
   abstract ByteBuffer decompress(ByteBuffer compressedData) throws IOException;
 
+  // Codecs often reference the array inside a ByteBuffer. Compute the offset
+  // to the start of data correctly in the case that our ByteBuffer
+  // is a slice() of another.
+  protected static int computeOffset(ByteBuffer data) {
+    return data.arrayOffset() + data.position();
+  }
 }
diff --git a/lang/java/trevni/core/src/main/java/org/apache/trevni/DeflateCodec.java b/lang/java/trevni/core/src/main/java/org/apache/trevni/DeflateCodec.java
index ddae3c8..18bd91c 100644
--- a/lang/java/trevni/core/src/main/java/org/apache/trevni/DeflateCodec.java
+++ b/lang/java/trevni/core/src/main/java/org/apache/trevni/DeflateCodec.java
@@ -35,22 +35,21 @@ class DeflateCodec extends Codec {
   @Override
   ByteBuffer compress(ByteBuffer data) throws IOException {
     ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
-    writeAndClose(data, new DeflaterOutputStream(baos, getDeflater()));
+    try (OutputStream outputStream = new DeflaterOutputStream(baos, getDeflater())) {
+      outputStream.write(data.array(), computeOffset(data), data.remaining());
+    }
     return ByteBuffer.wrap(baos.toByteArray());
   }
 
   @Override
   ByteBuffer decompress(ByteBuffer data) throws IOException {
     ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
-    writeAndClose(data, new InflaterOutputStream(baos, getInflater()));
+    try (OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) {
+      outputStream.write(data.array(), computeOffset(data), data.remaining());
+    }
     return ByteBuffer.wrap(baos.toByteArray());
   }
 
-  private void writeAndClose(ByteBuffer data, OutputStream out) throws IOException {
-    out.write(data.array(), data.position(), data.remaining());
-    out.close();
-  }
-
   private Inflater getInflater() {
     if (null == inflater)
       inflater = new Inflater(true);
diff --git a/lang/java/trevni/core/src/main/java/org/apache/trevni/SnappyCodec.java b/lang/java/trevni/core/src/main/java/org/apache/trevni/SnappyCodec.java
index 203e074..0b7aa4a 100644
--- a/lang/java/trevni/core/src/main/java/org/apache/trevni/SnappyCodec.java
+++ b/lang/java/trevni/core/src/main/java/org/apache/trevni/SnappyCodec.java
@@ -26,16 +26,18 @@ final class SnappyCodec extends Codec {
 
   @Override
   ByteBuffer compress(ByteBuffer in) throws IOException {
+    int offset = computeOffset(in);
     ByteBuffer out = ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining()));
-    int size = Snappy.compress(in.array(), in.position(), in.remaining(), out.array(), 0);
+    int size = Snappy.compress(in.array(), offset, in.remaining(), out.array(), 0);
     out.limit(size);
     return out;
   }
 
   @Override
   ByteBuffer decompress(ByteBuffer in) throws IOException {
-    ByteBuffer out = ByteBuffer.allocate(Snappy.uncompressedLength(in.array(), in.position(), in.remaining()));
-    int size = Snappy.uncompress(in.array(), in.position(), in.remaining(), out.array(), 0);
+    int offset = computeOffset(in);
+    ByteBuffer out = ByteBuffer.allocate(Snappy.uncompressedLength(in.array(), offset, in.remaining()));
+    int size = Snappy.uncompress(in.array(), offset, in.remaining(), out.array(), 0);
     out.limit(size);
     return out;
   }
diff --git a/lang/java/trevni/core/src/test/java/org/apache/trevni/TestAllCodecs.java b/lang/java/trevni/core/src/test/java/org/apache/trevni/TestAllCodecs.java
new file mode 100644
index 0000000..ae535b9
--- /dev/null
+++ b/lang/java/trevni/core/src/test/java/org/apache/trevni/TestAllCodecs.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestAllCodecs {
+  @Parameterized.Parameters(name = "{index}: codec={0}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] { { "bzip2" }, { "null" }, { "snappy" }, { "deflate" }, });
+  }
+
+  @Parameterized.Parameter(0)
+  public String codec;
+
+  public static Codec getCodec(String name) {
+    MetaData m = new MetaData();
+    m.put(MetaData.CODEC_KEY, name.getBytes());
+    return Codec.get(m);
+  }
+
+  @Test
+  public void testCodec() throws IOException {
+    int inputSize = 500_000;
+
+    byte[] input = generateTestData(inputSize);
+
+    Codec codecInstance = getCodec(codec);
+    ByteBuffer inputByteBuffer = ByteBuffer.wrap(input);
+    ByteBuffer compressedBuffer = codecInstance.compress(inputByteBuffer);
+
+    int compressedSize = compressedBuffer.remaining();
+
+    // Make sure something returned
+    assertTrue(compressedSize > 0);
+
+    // While the compressed size could in many real cases
+    // *increase* compared to the input size, our input data
+    // is extremely easy to compress and all Avro's compression algorithms
+    // should have a compression ratio greater than 1 (except 'null').
+    assertTrue(compressedSize < inputSize || codec.equals("null"));
+
+    // Decompress the data
+    ByteBuffer decompressedBuffer = codecInstance.decompress(compressedBuffer);
+
+    // Validate the the input and output are equal.
+    inputByteBuffer.rewind();
+    assertEquals(decompressedBuffer, inputByteBuffer);
+  }
+
+  @Test
+  public void testCodecSlice() throws IOException {
+    int inputSize = 500_000;
+    byte[] input = generateTestData(inputSize);
+
+    Codec codecInstance = getCodec(codec);
+    ByteBuffer partialBuffer = ByteBuffer.wrap(input);
+    partialBuffer.position(17);
+
+    ByteBuffer inputByteBuffer = partialBuffer.slice();
+    ByteBuffer compressedBuffer = codecInstance.compress(inputByteBuffer);
+
+    int compressedSize = compressedBuffer.remaining();
+
+    // Make sure something returned
+    assertTrue(compressedSize > 0);
+
+    // Create a slice from the compressed buffer
+    ByteBuffer sliceBuffer = ByteBuffer.allocate(compressedSize + 100);
+    sliceBuffer.position(50);
+    sliceBuffer.put(compressedBuffer);
+    sliceBuffer.limit(compressedSize + 50);
+    sliceBuffer.position(50);
+
+    // Decompress the data
+    ByteBuffer decompressedBuffer = codecInstance.decompress(sliceBuffer.slice());
+
+    // Validate the the input and output are equal.
+    inputByteBuffer.rewind();
+    assertEquals(decompressedBuffer, inputByteBuffer);
+  }
+
+  // Generate some test data that will compress easily
+  public static byte[] generateTestData(int inputSize) {
+    byte[] arr = new byte[inputSize];
+    for (int i = 0; i < arr.length; i++) {
+      arr[i] = (byte) (65 + i % 10);
+    }
+
+    return arr;
+  }
+}
diff --git a/lang/java/trevni/core/src/test/java/org/apache/trevni/TestBZip2Codec.java b/lang/java/trevni/core/src/test/java/org/apache/trevni/TestBZip2Codec.java
deleted file mode 100644
index b5c33a7..0000000
--- a/lang/java/trevni/core/src/test/java/org/apache/trevni/TestBZip2Codec.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.trevni;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-
-public class TestBZip2Codec {
-
-  @Test
-  public void testBZip2CompressionAndDecompression() throws IOException {
-
-    MetaData meta = new MetaData();
-    meta.setCodec("bzip2");
-    Codec codec = Codec.get(meta);
-
-    // Confirm that the right codec Came back
-    assertTrue(codec instanceof BZip2Codec);
-
-    // This is 3 times the byte buffer on the BZip2 decompress plus some extra
-    final int inputByteSize = BZip2Codec.DEFAULT_BUFFER_SIZE * 3 + 42;
-
-    byte[] inputByteArray = new byte[inputByteSize];
-
-    // Generate something that will compress well
-    for (int i = 0; i < inputByteSize; i++) {
-      inputByteArray[i] = (byte) (65 + i % 10);
-    }
-
-    ByteBuffer inputByteBuffer = ByteBuffer.wrap(inputByteArray);
-
-    ByteBuffer compressedBuffer = codec.compress(inputByteBuffer);
-
-    // Make sure something returned
-    assertTrue(compressedBuffer.array().length > 0);
-    // Make sure the compressed output is smaller then the original
-    assertTrue(compressedBuffer.array().length < inputByteArray.length);
-
-    ByteBuffer decompressedBuffer = codec.decompress(compressedBuffer);
-
-    // The original array should be the same length as the decompressed array
-    assertTrue(decompressedBuffer.array().length == inputByteArray.length);
-
-    // Every byte in the outputByteArray should equal every byte in the input array
-    byte[] outputByteArray = decompressedBuffer.array();
-    System.arraycopy(outputByteArray, 0, inputByteArray, 0, inputByteSize);
-  }
-}