You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/17 02:57:14 UTC

[GitHub] [arrow] liyafan82 opened a new pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

liyafan82 opened a new pull request #8949:
URL: https://github.com/apache/arrow/pull/8949


   Support compressing/decompressing RecordBatch IPC buffers by LZ4.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-790239103


   > Congratulations @liyafan82 ! Do you have an idea how hard it will be to add zstd support?
   
   @pitrou Support for zstd should be much easier, as you can see, most of the effort is devoted to framework change and library selection. Such effort can be saved when supporting zstd. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] garyelephant commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
garyelephant commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r791429995



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();

Review comment:
       I also encountered this `RefCnt has gone negative` in arrow 6.0.1, is it have not fixed yet ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794124675


   @HedgehogCode thank you for the benchmark numbers.  I think in order to avoid churn in this PR we should keep the existing working commons code path.  The data you provided raises the urgency of finding a more performant solution.  The issue with the previous library is it does not appear to address [dependent blocks](http://mail-archives.apache.org/mod_mbox/arrow-dev/202101.mbox/%3CCAJPUwMAPSTrdbu4vw=GJiLy9ciJU0FvH_hdON5uAiwe2TK-h2Q@mail.gmail.com%3E) which are emitted from C++.  So there are three options:
   1.  Change the specification to require dependent blocks be disabled.
   2.  Find a performant library that supports dependent blocks.
   3.  Ask/provide a patch to the lz4-java library to support dependent blocks.
   
   I think especially after  ARROW-11899 is done, it should hopefully be fairly easy to try out different implementations (or use your own if interop with other languages isn't a requirement).
   
   If you are interested in contributing in this area, maybe coordinate with @liyafan82 to help out once the PR is merged?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-801600759


   > If you've already started [ARROW-11899](https://issues.apache.org/jira/browse/ARROW-11899) then I'll let you finish it up, hopefully it isn't too much work. We are discussing on the ML the path forward for LZ4 in general, once that is cleared up we can figure out do the work including if @HedgehogCode is interesting in contributing.
   
   Sounds good. Hopefully, I will prepare a PR in a few days. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r551603469



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
+    Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    // create compressor lazily

Review comment:
       why?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-789649806


   > I've restarted the integration CI job, it seemed stuck downloading the docker image.
   
   @pitrou Thanks for your help. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-755044988


   > Is it possible to add a test to confirm that this can be read/written from the C++ implementation?
   
   @emkornfield I think it is a good idea to provide e2e cross-language integration tests. 
   However, I am not sure if we are ready now. 
   
   In particular, we need to change the way buffers are released after compressing. 
   Previously, we release the buffers by directly closing the related vectors. This no longer works now, as vector's buffers are released by the codec, and the compressed buffers need to be released properly.
   
   Solution to this problem may have impacts to other parts of the code base. So maybe we need another issue to discuss it (if we do not do it in this PR).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-760767374


   > The [comment in the BodyCompression protobuf](https://github.com/apache/arrow/blob/master/format/Message.fbs#L59-L65) states:
   > 
   > > Each constituent buffer is first compressed with the indicated compressor, and then written with the uncompressed length in the first 8 bytes as a 64-bit little-endian signed integer followed by the compressed buffer bytes (and then padding as required by the protocol). **The uncompressed length may be set to -1 to indicate that the data that follows is not compressed, which can be useful for cases where compression does not yield appreciable savings.**
   > 
   > Should the check for a length of -1 be made outside of `CompressionCodec` implementation? I think it would be useful to do it in the `#decompress(BufferAllocator, ArrowBuf)` method.
   > 
   > Should be pretty easy if I don't miss something:
   > 
   > ```java
   >     if (decompressedLength == -1L) {
   >       // handle uncompressed buffers
   >       return compressedBuffer.slice(SIZE_OF_MESSAGE_LENGTH,
   >           compressedBuffer.writerIndex() - SIZE_OF_MESSAGE_LENGTH);
   >     }
   > ```
   
   @HedgehogCode Thanks for your good suggestion. I have revised the code to implement the logic that when the compressed buffer is larger, we directly send the raw buffer with length -1. In addition, I have updated the test cast to make sure the code path of the logic is covered. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-777201181


   @liyafan82 could you enable the java integration test to confirm that reading the files generated by C++ works before we merge (once we verify it is working I can take a final look)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-801332194


   +1 thank you.  @liyafan82 did you have plans to work on the follow-up items or ZSTD? Otherwise I can take them up.
   
   @hedgehogcode any thoughts on how to procede for LZ4?  We can maybe discuss more on the performance JIRA?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592197707



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
##########
@@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec)
   }
 
   /**
-   * Creates the {@link CompressionCodec} given the compression type.
+   * Process compression by compressing the buffer as is.
    */
-  public static CompressionCodec createCodec(byte compressionType) {
-    switch (compressionType) {
-      case NoCompressionCodec.COMPRESSION_TYPE:
-        return NoCompressionCodec.INSTANCE;
-      default:
-        throw new IllegalArgumentException("Compression type not supported: " + compressionType);
-    }
+  public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) {
+    ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
+    compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH);
+    compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex());
+    compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
+    return compressedBuffer;
+  }
+
+  /**
+   * Process decompression by decompressing the buffer as is.

Review comment:
       I chose `extractUncompressedBuffer`. Thanks for the good suggestion. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794817984


   @liyafan82 let me know when you think this is ready for re-review.  I think like I said I think getting a baseline working so we can do the follow-up work makes the most sense here.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] garyelephant commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
garyelephant commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r791429995



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();

Review comment:
       I also encountered this `RefCnt has gone negative` in arrow 6.0.1, is it have not fixed yet ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-800898224


   > please update the docs to match, something like.
   
   > "Slice the buffer to contain the uncompressed bytes"
   
   Updated. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-800897864


   > With the new enum, maybe we can make this an accessor that returns and enum instead? and then the byte can be extracted from there where necesssary?
   
   Sounds good. I have revised the code accordingly. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r552335378



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {

Review comment:
       Nice catch. Thank you!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-747171848


   https://issues.apache.org/jira/browse/ARROW-10880


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592192366



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.

Review comment:
       Revised. Thanks for your kind reminder. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] HedgehogCode commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
HedgehogCode commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-754727763


   When I use the changes and try to compress and decompress an empty buffer (by using a variable sized vector with only missing values) I get a SIGSEGV:
   ```
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x00007f1209448fd0, pid=10504, tid=0x00007f1208bc7700
   #
   # JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
   # Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
   # Problematic frame:
   # V  [libjvm.so+0x6fdfd0]  jni_ThrowNew+0xc0
   #
   # Core dump written. Default location: /workspaces/arrow/java/vector/core or core.10504
   ```
   
   This can be reproduced by adding the following test to `TestCompressionCodec.java`:
   ```java
     @Test
     public void testEmptyBuffer() throws Exception {
       final int vecLength = 10;
       final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);
   
       origVec.allocateNew(vecLength);
   
       // Do not set any values (all missing)
       origVec.setValueCount(vecLength);
   
       final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
       final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
       final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
   
       // TODO assert that the decompressed buffers are correct
       AutoCloseables.close(decompressedBuffers);
     }
   ```
   This looks like an error in the lz4-java library but I am not sure. I thought I should mention it here first.
   (Note that I am using OpenJDK 8 and I haven't tried OpenJDK 11 yet)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585195259



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,

Review comment:
       According to the design, this should not happen, because even if the buffer is empty, a head would be attached containing the buffer length. We have a test case for such case. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592190402



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+
+/**
+ * A factory implementation based on Apache Commons library.
+ */
+public class CommonsCompressionFactory implements CompressionCodec.Factory {
+
+  public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory();
+
+  @Override
+  public CompressionCodec createCodec(byte codecType) {

Review comment:
       Sounds reasonable. I have extracted an enum `CompressionUtil#CodecType` for this purpose. It also makes some other operations easier. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r575091316



##########
File path: java/vector/pom.xml
##########
@@ -74,6 +74,11 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>

Review comment:
       @emkornfield Sounds reasonable. I will try to revise the PR accordingly. Thanks for your good suggestion. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r551603225



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {

Review comment:
       ```suggestion
     public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-770514765


   > @liyafan82 per recent discussion on mailing list. I looked into it and the lz4 page mentioned https://commons.apache.org/proper/commons-compress/javadocs/api-release/org/apache/commons/compress/compressors/lz4/package-summary.html as a port, so that might offer better compatibiity as a library
   
   @emkornfield Sounds reasonable. I will update the PR accordingly. Thanks for your good suggestion. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-773186162


   Switched to the commons-compress library, according to @emkornfield's suggestion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794829156


   > @liyafan82 let me know when you think this is ready for re-review. I think like I said I think getting a baseline working so we can do the follow-up work makes the most sense here.
   
   @emkornfield Sorry for my delay. I am a little busy these days. I will try my best to make it ready in one or two days. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r558083064



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_MESSAGE_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(SIZE_OF_MESSAGE_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    // create compressor lazily
+    if (compressor == null) {
+      compressor = factory.fastCompressor();
+    }
+
+    int maxCompressedLength = compressor.maxCompressedLength((int) uncompressedBuffer.writerIndex());
+
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    ByteBuffer uncompressed =
+        MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) uncompressedBuffer.writerIndex());
+    ByteBuffer compressed =
+        MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength);
+
+    int compressedLength = compressor.compress(
+        uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength);
+    compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH);
+
+    uncompressedBuffer.close();
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_MESSAGE_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    // create decompressor lazily
+    if (decompressor == null) {
+      decompressor = factory.fastDecompressor();
+    }
+
+    ByteBuffer compressed = MemoryUtil.directBuffer(
+        compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex());

Review comment:
       Nice catch. Thank you @stczwd




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] HedgehogCode commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
HedgehogCode commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-758805393


   The [comment in the BodyCompression protobuf](https://github.com/apache/arrow/blob/master/format/Message.fbs#L59-L65) states:
   > Each constituent buffer is first compressed with the indicated compressor, and then written with the uncompressed length in the first 8 bytes as a 64-bit little-endian signed integer followed by the compressed buffer bytes (and then padding as required by the protocol). __The uncompressed length may be set to -1 to indicate that the data that follows is not compressed, which can be useful for cases where compression does not yield appreciable savings.__
   
   Should the check for a length of -1 be made outside of `CompressionCodec` implementation? I think it would be useful to do it in the `#decompress(BufferAllocator, ArrowBuf)` method.
   
   Should be pretty easy if I don't miss something:
   ```java
       if (decompressedLength == -1L) {
         // handle uncompressed buffers
         return compressedBuffer.slice(SIZE_OF_MESSAGE_LENGTH,
             compressedBuffer.writerIndex() - SIZE_OF_MESSAGE_LENGTH);
       }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r551603003



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;

Review comment:
       How was this library chosen?  It looks like it might not have been released in a while?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-804055943


   @liyafan82 @emkornfield Can you one of you update https://github.com/apache/arrow/blob/master/docs/source/status.rst#ipc-format once this is all finished?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r595805905



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;

Review comment:
       I guess no for now. Maybe we can have one in the future, so we can remove the dependency on Netty (and other dependencies on Netty as well).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r588044636



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+
+/**
+ * A factory implementation based on Apache Commons library.
+ */
+public class CommonsCompressionFactory implements CompressionCodec.Factory {
+
+  public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory();
+
+  @Override
+  public CompressionCodec createCodec(byte codecType) {

Review comment:
       It seems like byte isn't the right interface here.  Can we decouple from the flatbuf enum for the factory?

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.

Review comment:
       the specification?  

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {

Review comment:
       If you don't want to do it for this PR it is OK, but I think we should likely separate the functionality here into two parts.  Arrow specific and Core compression/decompresion.  The Arrow specific parts (parsing/writing lengths into buffers) belongs in Vector.  That way it would hopefully be easier to substitute different implementations.

##########
File path: java/compression/pom.xml
##########
@@ -0,0 +1,56 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.arrow</groupId>
+    <artifactId>arrow-java-root</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>arrow-compression</artifactId>
+  <name>Arrow Compression</name>
+  <description>(Experimental/Contrib) A library for working with the compression/decompression of Arrow data.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-format</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>${project.version}</version>
+      <classifier>${arrow.vector.classifier}</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-unsafe</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>1.20</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>

Review comment:
       why is netty required?

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);

Review comment:
       `packageRawBuffer` might be a better name?

##########
File path: java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
##########
@@ -79,7 +105,12 @@ private void loadBuffers(
     List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
     for (int j = 0; j < bufferLayoutCount; j++) {
       ArrowBuf nextBuf = buffers.next();
-      ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf));
+      // for vectors without nulls, the buffer is empty, so there is no need to decompress it.
+      ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf;
+      ownBuffers.add(bufferToAdd);
+      if (decompressionPerformed) {
+        decompressedBuffers.add(bufferToAdd);
+      }
     }
     try {
       vector.loadFieldBuffers(fieldNode, ownBuffers);

Review comment:
       could we close decompressed buffers after loading them here?

##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
##########
@@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec)
   }
 
   /**
-   * Creates the {@link CompressionCodec} given the compression type.
+   * Process compression by compressing the buffer as is.
    */
-  public static CompressionCodec createCodec(byte compressionType) {
-    switch (compressionType) {
-      case NoCompressionCodec.COMPRESSION_TYPE:
-        return NoCompressionCodec.INSTANCE;
-      default:
-        throw new IllegalArgumentException("Compression type not supported: " + compressionType);
-    }
+  public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) {
+    ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());

Review comment:
       is it possible to rework the APIs to avoid this type of copy (it seems when writing out we can simply write out the 8 bytes for NO_COMPRESSION_LENGTH?)

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();
+      return decompressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException {
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
+    PlatformDependent.copyMemory(
+        compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length);
+    ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength);
+    try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = out.toByteArray();
+    ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length);
+    PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length);
+    decompressedBuffer.writerIndex(decompressedLength);
+    return decompressedBuffer;
+  }
+
+  @Override
+  public String getCodecName() {
+    return CompressionType.name(CompressionType.LZ4_FRAME);

Review comment:
       i forget usage here, but should this be something like CommonsLz4FrameCodec?

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];

Review comment:
       we should open up a follow-up issue to see if we can find a good library that will work with native memory.  

##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
##########
@@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec)
   }
 
   /**
-   * Creates the {@link CompressionCodec} given the compression type.
+   * Process compression by compressing the buffer as is.
    */
-  public static CompressionCodec createCodec(byte compressionType) {
-    switch (compressionType) {
-      case NoCompressionCodec.COMPRESSION_TYPE:
-        return NoCompressionCodec.INSTANCE;
-      default:
-        throw new IllegalArgumentException("Compression type not supported: " + compressionType);
-    }
+  public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) {
+    ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
+    compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH);
+    compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex());
+    compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
+    return compressedBuffer;
+  }
+
+  /**
+   * Process decompression by decompressing the buffer as is.

Review comment:
       something like `transferUncompressedComponent` or `extractUncompressedBuffer`

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+
+/**
+ * A factory implementation based on Apache Commons library.
+ */
+public class CommonsCompressionFactory implements CompressionCodec.Factory {
+
+  public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory();
+
+  @Override
+  public CompressionCodec createCodec(byte codecType) {
+    switch (codecType) {
+      case NoCompressionCodec.COMPRESSION_TYPE:

Review comment:
       it seems like NoCompressionCodec decision should be made outside of the factory because it should be common across all factories?  So code that uses a factory would be something like
   
   ```
   CompressionCode codec = NoCompressionCodec.INSTANCE;
   if (codecType != NoCompression) {
      codec = Factory.createCodec()
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585195875



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       I think this memory layout is consistent with the C++ native implementation. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r552332372



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;

Review comment:
       @kiszk You are right. I chose this library because our C++ implementation also depends on this repo (https://github.com/lz4/lz4). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kiszk commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
kiszk commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r551997558



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;

Review comment:
       My guess is that this import refers to [this](https://github.com/lz4/lz4-java/tree/master/src/java/net/jpountz/lz4).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r574250394



##########
File path: java/vector/pom.xml
##########
@@ -74,6 +74,11 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>

Review comment:
       I'm a little hesitant to take a direct dependency on any lz4 library.  Is there away that this can be done optionally (similar to how the netty dependency for memory has been isolated?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592197440



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
##########
@@ -79,7 +105,12 @@ private void loadBuffers(
     List<ArrowBuf> ownBuffers = new ArrayList<>(bufferLayoutCount);
     for (int j = 0; j < bufferLayoutCount; j++) {
       ArrowBuf nextBuf = buffers.next();
-      ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf));
+      // for vectors without nulls, the buffer is empty, so there is no need to decompress it.
+      ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf;
+      ownBuffers.add(bufferToAdd);
+      if (decompressionPerformed) {
+        decompressedBuffers.add(bufferToAdd);
+      }
     }
     try {
       vector.loadFieldBuffers(fieldNode, ownBuffers);

Review comment:
       Good suggestion. This way, we do not need to change the public interface of `VectorLoader`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-789645531


   I've restarted the integration CI job, it seemed stuck downloading the docker image.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-754259372


   Is it possible to add a test to confirm that this can be read/written from the C++ implementation?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] stczwd commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
stczwd commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r555746080



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_MESSAGE_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(SIZE_OF_MESSAGE_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    // create compressor lazily
+    if (compressor == null) {
+      compressor = factory.fastCompressor();
+    }
+
+    int maxCompressedLength = compressor.maxCompressedLength((int) uncompressedBuffer.writerIndex());
+
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    ByteBuffer uncompressed =
+        MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) uncompressedBuffer.writerIndex());
+    ByteBuffer compressed =
+        MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength);
+
+    int compressedLength = compressor.compress(
+        uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength);
+    compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH);
+
+    uncompressedBuffer.close();
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_MESSAGE_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    // create decompressor lazily
+    if (decompressor == null) {
+      decompressor = factory.fastDecompressor();
+    }
+
+    ByteBuffer compressed = MemoryUtil.directBuffer(
+        compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex());

Review comment:
       nit: the capacity may be `(int) (compressedBuffer.writerIndex() - SIZE_OF_MESSAGE_LENGTH)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-788520085


   > @liyafan82 Thanks for your great working! I have cherry pick your code in my project to enable the LZ4 compress. And I encountered the following two bugs. Looking forward your response. Thanks.
   
   @JkSelf Thanks a lot for your effort and feedback. This PR is still under devlopment, and some problems are not resolved yet (you may observe that the integration tests are still breaking.)
   
   I will try to resolve the problems in a few days. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kiszk commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
kiszk commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-753952360


   Looks good to me


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r589169372



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {

Review comment:
       Good suggestion. I have opened ARROW-11899 to track it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-778562980


   > > efore we merge (once we verify it is working I can take a final look)
   > 
   > Sure. I will do some tests for that.
   
   To run tests it should be sufficient to unskip the Java implementation in archery.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r586333980



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       Note this is allowed by the [format spec](https://arrow.apache.org/docs/format/Columnar.html#validity-bitmaps):
   > Arrays having a 0 null count may choose to not allocate the validity bitmap.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r589171170



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();
+      return decompressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException {
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
+    PlatformDependent.copyMemory(
+        compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length);
+    ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength);
+    try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = out.toByteArray();
+    ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length);
+    PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length);
+    decompressedBuffer.writerIndex(decompressedLength);
+    return decompressedBuffer;
+  }
+
+  @Override
+  public String getCodecName() {
+    return CompressionType.name(CompressionType.LZ4_FRAME);

Review comment:
       I am afraid we cannot return an arbitrary name here, as in the `VectorUnloader` class, we need to get the codec type (byte) from the codec object. Our current way of doing this is throught the codec name.
   
   Maybe we need to reconsider the design here?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kiszk commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
kiszk commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r546331739



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
+    Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    // create compressor lazily
+    if (compressor == null) {
+      compressor = factory.fastCompressor();
+    }
+
+    int maxCompressedLength = compressor.maxCompressedLength((int) unCompressedBuffer.writerIndex());
+
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
+    compressedBuffer.setLong(0, unCompressedBuffer.writerIndex());

Review comment:
       Do we need to ensure this in little-endian? (c.f. https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/reader.cc#L385).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] kiszk commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
kiszk commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r546331755



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
+    Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    // create compressor lazily
+    if (compressor == null) {
+      compressor = factory.fastCompressor();
+    }
+
+    int maxCompressedLength = compressor.maxCompressedLength((int) unCompressedBuffer.writerIndex());
+
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
+    compressedBuffer.setLong(0, unCompressedBuffer.writerIndex());
+
+    ByteBuffer uncompressed =
+        MemoryUtil.directBuffer(unCompressedBuffer.memoryAddress(), (int) unCompressedBuffer.writerIndex());
+    ByteBuffer compressed =
+        MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength);
+
+    int compressedLength = compressor.compress(
+        uncompressed, 0, (int) unCompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength);
+    compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH);
+
+    unCompressedBuffer.close();
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() > SIZE_OF_MESSAGE_LENGTH,
+        "Not enough data to decompress.");
+
+    // create decompressor lazily
+    if (decompressor == null) {
+      decompressor = factory.fastDecompressor();
+    }
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794791234


   Thanks for the discussions.
   
   @HedgehogCode Thanks a lot for the performance data. Do you have any idea about the performance difference? Is it due to the fault of our implementation?
   If you are interested and want to provide a PR, could you please assign ARROW-11901 to yourself?
   
   @emkornfield Another (maybe ugly) solution that comes to my mind is to use different libraries for buffers with dependent and independent blocks?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r552335719



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
+    Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    // create compressor lazily

Review comment:
       For some scenarios (e.g. flight sender), we only need the compressor, while for others (e.g. flight receiver), we only need the decompressor. So there is no need to create both eagerly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794328778


   Ok, looking at the lz4-java source, I understand: the `LZ4FrameInputStream` actually calls into `LZ4FrameOutputStream`, which is where the error comes from.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592196563



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
##########
@@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec)
   }
 
   /**
-   * Creates the {@link CompressionCodec} given the compression type.
+   * Process compression by compressing the buffer as is.
    */
-  public static CompressionCodec createCodec(byte compressionType) {
-    switch (compressionType) {
-      case NoCompressionCodec.COMPRESSION_TYPE:
-        return NoCompressionCodec.INSTANCE;
-      default:
-        throw new IllegalArgumentException("Compression type not supported: " + compressionType);
-    }
+  public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) {
+    ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());

Review comment:
       This is a good point. It could be useful in many scenarios.
   
   However, so far, I do not have a method to solve it (without changing our APIs significantly).
   
   The fundamental reason is that the compression feature is only used in `VectorLoader`, which requires a single compressed buffer for each input buffer. In addition, we do not have an efficient way to combine two `ArrowBuf` objects. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592190683



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+
+/**
+ * A factory implementation based on Apache Commons library.
+ */
+public class CommonsCompressionFactory implements CompressionCodec.Factory {
+
+  public static final CommonsCompressionFactory INSTANCE = new CommonsCompressionFactory();
+
+  @Override
+  public CompressionCodec createCodec(byte codecType) {
+    switch (codecType) {
+      case NoCompressionCodec.COMPRESSION_TYPE:

Review comment:
       Sounds reasonable. I have revised the code accordingly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r551603359



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {

Review comment:
       ? or is this consistent with the existing API?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-801585216


   If you've already started ARROW-11899 then I'll let you finish it up, hopefully it isn't too much work.  We are discussing on the ML the path forward for LZ4 in general, once that is cleared up we can figure out do the work including if @HedgehogCode is interesting in contributing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield closed pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield closed pull request #8949:
URL: https://github.com/apache/arrow/pull/8949


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-801583143


   > +1 thank you. @liyafan82 did you have plans to work on the follow-up items or ZSTD? Otherwise I can take them up.
   > 
   > @HedgehogCode any thoughts on how to procede for LZ4? We can maybe discuss more on the performance JIRA?
   
   @emkornfield Thanks a lot for your effort.
   
   I have started working on ARROW-11899 yesterday. 
   If you are interested in any of the items (including ARROW-11899), please feel free to assign them to yourself. I'd like to help with the review/discussions :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794831399


   No, rush just wanted to make sure I knew when it was ready for another pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-796674674


   @emkornfield I have replied to each of the previous comment. So maybe it is ready for a new round of review. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-756003757


   > When I use the changes and try to compress and decompress an empty buffer (by using a variable sized vector with only missing values) I get a SIGSEGV ([hs_err_pid10504.log](https://github.com/apache/arrow/files/5771248/hs_err_pid10504.log)):
   > 
   > ```
   > #
   > # A fatal error has been detected by the Java Runtime Environment:
   > #
   > #  SIGSEGV (0xb) at pc=0x00007f1209448fd0, pid=10504, tid=0x00007f1208bc7700
   > #
   > # JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
   > # Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
   > # Problematic frame:
   > # V  [libjvm.so+0x6fdfd0]  jni_ThrowNew+0xc0
   > #
   > # Core dump written. Default location: /workspaces/arrow/java/vector/core or core.10504
   > ```
   > 
   > This can be reproduced by adding the following test to `TestCompressionCodec.java`:
   > 
   > ```java
   >   @Test
   >   public void testEmptyBuffer() throws Exception {
   >     final int vecLength = 10;
   >     final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);
   > 
   >     origVec.allocateNew(vecLength);
   > 
   >     // Do not set any values (all missing)
   >     origVec.setValueCount(vecLength);
   > 
   >     final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
   >     final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
   >     final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
   > 
   >     // TODO assert that the decompressed buffers are correct
   >     AutoCloseables.close(decompressedBuffers);
   >   }
   > ```
   > 
   > This looks like an error in the lz4-java library but I am not sure. I thought I should mention it here first.
   > (Note that I am using OpenJDK 8 and I haven't tried OpenJDK 11 yet)
   
   @HedgehogCode The problem happend when lz4-java tried to decompress an empty buffer. I have fixed the problem by taking special case of empty buffers. Thanks again for your kind reminder. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794817274


   > @emkornfield Another (maybe ugly) solution that comes to my mind is to use different libraries for buffers with dependent and independent blocks?
   
   No, I think we should figure out a way to have on implementation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r589202945



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];

Review comment:
       Sure. I have opened ARROW-11901 to track it.
   
   I also want to share some thoughts briefly: to use the native memory directly, we need a way to wrap `ByteBuffers` as input/output streams (In Java, the only "standard" way to access the off-heap memory is through the `DirectByteBuffer`). 
   
   We need some third party library to achieve that. We also need to evaluate the performance thereafter, because the Commons-compress library also uses on-heap data extensively, the copy between on-heap and off-heap data can be difficult to avoid. 
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-789684969


   The integration tests have passed. Please take another another look when you have time, dear reviewers. (maybe just review the last three commits) 
   Thanks a lot.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] HedgehogCode commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
HedgehogCode commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-793893081


   I know there is a ticket now to investigate the compression performance but I am concerned that the performance with the Apache commons compress library is unusable bad.
   
   I created a simple benchmark that compresses 5 buffers of 100,000 integers. See [HedgehogCode/arrow#benchmark-compression-commons-compress#TestBenchmarkCompressionCodec.java](https://github.com/HedgehogCode/arrow/blob/benchmark-compression-commons-compress/java/compression/src/test/java/org/apache/arrow/compression/TestBenchmarkCompressionCodec.java).
   
   With the current state of this PR, this takes around __2600ms__ on my system.
   If I change the class `Lz4CompressionCodec` to use `java-lz4` as in some older commits of this PR the benchmark only takes around __3ms__.
   
   See the branches https://github.com/HedgehogCode/arrow/tree/benchmark-compression-commons-compress and https://github.com/Hedgehogcode/arrow/tree/benchmark-compression-java-lz4 in my fork.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-804525790


   > @liyafan82 @emkornfield Can you one of you update https://github.com/apache/arrow/blob/master/docs/source/status.rst#ipc-format once this is all finished?
   
   @pitrou I will keep this in mind. Thanks for your kind reminder. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-800883260


   > Thank @liyafan82 a few more minor comments. I'd like to see this merged sooner rather then later so we can do the follow-up work. If you don't have bandwidth please let me know, and if it OK I can fixup my comments and push to this PR?
   
   @emkornfield Thanks a lot for the further comments. I think I can fix them up today. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-791193697


   > @liyafan82 nice work. Left a few comments about API and structure let me know what you think.
   
   @emkornfield Thanks a lot for your comments. I will resolve them one by one ASAP. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] HedgehogCode edited a comment on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
HedgehogCode edited a comment on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-754727763


   When I use the changes and try to compress and decompress an empty buffer (by using a variable sized vector with only missing values) I get a SIGSEGV ([hs_err_pid10504.log](https://github.com/apache/arrow/files/5771248/hs_err_pid10504.log)):
   ```
   #
   # A fatal error has been detected by the Java Runtime Environment:
   #
   #  SIGSEGV (0xb) at pc=0x00007f1209448fd0, pid=10504, tid=0x00007f1208bc7700
   #
   # JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
   # Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
   # Problematic frame:
   # V  [libjvm.so+0x6fdfd0]  jni_ThrowNew+0xc0
   #
   # Core dump written. Default location: /workspaces/arrow/java/vector/core or core.10504
   ```
   
   This can be reproduced by adding the following test to `TestCompressionCodec.java`:
   ```java
     @Test
     public void testEmptyBuffer() throws Exception {
       final int vecLength = 10;
       final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);
   
       origVec.allocateNew(vecLength);
   
       // Do not set any values (all missing)
       origVec.setValueCount(vecLength);
   
       final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
       final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
       final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
   
       // TODO assert that the decompressed buffers are correct
       AutoCloseables.close(decompressedBuffers);
     }
   ```
   This looks like an error in the lz4-java library but I am not sure. I thought I should mention it here first.
   (Note that I am using OpenJDK 8 and I haven't tried OpenJDK 11 yet)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] HedgehogCode commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
HedgehogCode commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794148781


   I see. Thank you for clarifying this. I missed the discussion on the mailing list.
   I am very much interested in helping out in this area.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] JkSelf commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
JkSelf commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r584729112



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       If we call the compress in native side and decompress the buffer in java side, how to ensure it can work well by `getLong(0)`?

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,

Review comment:
       When the compressedBuffer is empty, the task will fail by this check. Can we remove this check and directly return the compressedBuffer when empty.
   

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();

Review comment:
       `ArrowReader` will call the `close()` method after loadRecordBatch. If it is close already when decompress the compressedBuffer. It will fail with the `RefCnt has gone negative`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] stczwd commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
stczwd commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r555746080



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_MESSAGE_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(SIZE_OF_MESSAGE_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    // create compressor lazily
+    if (compressor == null) {
+      compressor = factory.fastCompressor();
+    }
+
+    int maxCompressedLength = compressor.maxCompressedLength((int) uncompressedBuffer.writerIndex());
+
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    ByteBuffer uncompressed =
+        MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) uncompressedBuffer.writerIndex());
+    ByteBuffer compressed =
+        MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength);
+
+    int compressedLength = compressor.compress(
+        uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength);
+    compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH);
+
+    uncompressedBuffer.close();
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_MESSAGE_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    // create decompressor lazily
+    if (decompressor == null) {
+      decompressor = factory.fastDecompressor();
+    }
+
+    ByteBuffer compressed = MemoryUtil.directBuffer(
+        compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) compressedBuffer.writerIndex());

Review comment:
       the capacity may be `(int) (compressedBuffer.writerIndex() - SIZE_OF_MESSAGE_LENGTH)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r546486677



##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
+    Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    // create compressor lazily
+    if (compressor == null) {
+      compressor = factory.fastCompressor();
+    }
+
+    int maxCompressedLength = compressor.maxCompressedLength((int) unCompressedBuffer.writerIndex());
+
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
+    compressedBuffer.setLong(0, unCompressedBuffer.writerIndex());

Review comment:
       Revised accordingly. Thanks for your kind reminder.

##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+    factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf unCompressedBuffer) {
+    Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    // create compressor lazily
+    if (compressor == null) {
+      compressor = factory.fastCompressor();
+    }
+
+    int maxCompressedLength = compressor.maxCompressedLength((int) unCompressedBuffer.writerIndex());
+
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + SIZE_OF_MESSAGE_LENGTH);
+    compressedBuffer.setLong(0, unCompressedBuffer.writerIndex());
+
+    ByteBuffer uncompressed =
+        MemoryUtil.directBuffer(unCompressedBuffer.memoryAddress(), (int) unCompressedBuffer.writerIndex());
+    ByteBuffer compressed =
+        MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, maxCompressedLength);
+
+    int compressedLength = compressor.compress(
+        uncompressed, 0, (int) unCompressedBuffer.writerIndex(), compressed, 0, maxCompressedLength);
+    compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH);
+
+    unCompressedBuffer.close();
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() > SIZE_OF_MESSAGE_LENGTH,
+        "Not enough data to decompress.");
+
+    // create decompressor lazily
+    if (decompressor == null) {
+      decompressor = factory.fastDecompressor();
+    }
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       Revised. Thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592192114



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);

Review comment:
       Sure. It sounds better. I have changed the name accordingly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585290009



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       The statement immediately above writes the size:
   
   ```
   *reinterpret_cast<int64_t*>(result->mutable_data()) =
           BitUtil::ToLittleEndian(buffer.size());
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-789701493


   Congratulations @liyafan82 ! Do you have an idea how hard it will be to add zstd support?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-770157540


   @liyafan82 per recent discussion on mailing list.  I looked into it and the lz4 page mentioned https://commons.apache.org/proper/commons-compress/javadocs/api-release/org/apache/commons/compress/compressors/lz4/package-summary.html as a port, so that might offer better compatibiity as a library


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-773186162


   Switched to the commons-compress library, according to @emkornfield's suggestion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794327006


   @emkornfield I'm afraid I don't understand. What `lz4-java` simply seems to be saying is that the _caller_ must set the `BLOCK_INDEPENDENCE` flag. Why isn't that the solution?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-755045402


   > When I use the changes and try to compress and decompress an empty buffer (by using a variable sized vector with only missing values) I get a SIGSEGV ([hs_err_pid10504.log](https://github.com/apache/arrow/files/5771248/hs_err_pid10504.log)):
   > 
   > ```
   > #
   > # A fatal error has been detected by the Java Runtime Environment:
   > #
   > #  SIGSEGV (0xb) at pc=0x00007f1209448fd0, pid=10504, tid=0x00007f1208bc7700
   > #
   > # JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
   > # Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
   > # Problematic frame:
   > # V  [libjvm.so+0x6fdfd0]  jni_ThrowNew+0xc0
   > #
   > # Core dump written. Default location: /workspaces/arrow/java/vector/core or core.10504
   > ```
   > 
   > This can be reproduced by adding the following test to `TestCompressionCodec.java`:
   > 
   > ```java
   >   @Test
   >   public void testEmptyBuffer() throws Exception {
   >     final int vecLength = 10;
   >     final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);
   > 
   >     origVec.allocateNew(vecLength);
   > 
   >     // Do not set any values (all missing)
   >     origVec.setValueCount(vecLength);
   > 
   >     final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
   >     final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
   >     final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
   > 
   >     // TODO assert that the decompressed buffers are correct
   >     AutoCloseables.close(decompressedBuffers);
   >   }
   > ```
   > 
   > This looks like an error in the lz4-java library but I am not sure. I thought I should mention it here first.
   > (Note that I am using OpenJDK 8 and I haven't tried OpenJDK 11 yet)
   
   @HedgehogCode Thanks a lot for your effort and information. I will take a look at the problem. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794317886


   Haven't investigated it fully but https://github.com/airlift/aircompressor might be an option?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r588067931



##########
File path: java/compression/pom.xml
##########
@@ -0,0 +1,56 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.arrow</groupId>
+    <artifactId>arrow-java-root</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>arrow-compression</artifactId>
+  <name>Arrow Compression</name>
+  <description>(Experimental/Contrib) A library for working with the compression/decompression of Arrow data.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-format</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>${project.version}</version>
+      <classifier>${arrow.vector.classifier}</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-unsafe</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>1.20</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>

Review comment:
       The `vector` module requires this library. If we do not add it in the dependency list, we get an errow when building with maven:
   
   ```
   Used undeclared dependencies found: io.netty:netty-common:jar:4.1.48.Final:compile
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] HedgehogCode edited a comment on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
HedgehogCode edited a comment on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-794148781


   @emkornfield, thank you for clarifying this. I missed the discussion on the mailing list.
   I am very much interested in helping out in this area.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585195604



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();

Review comment:
       This is a known issue. We will fix it ASAP. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] JkSelf commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
JkSelf commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585252853



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       It seems only reserve the size but not write the uncompressed size in [here](https://github.com/apache/arrow/blob/7184c3f46981dd52c3c521b2676796e82f17da77/cpp/src/arrow/ipc/writer.cc#L196).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r586185940



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);

Review comment:
       I have fixed this problem. It was due to an optimization of the C++ implementation: when an array contains no nulls, its validity buffer can be empty. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-778084083


   > @liyafan82 could you enable the java integration test to confirm that reading the files generated by C++ works before we merge (once we verify it is working I can take a final look)
   
   Sure. I will do some tests for that. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r595777904



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;

Review comment:
       ahh this is where netty is used.  we don't have an arrow wrapper for it?

##########
File path: java/compression/pom.xml
##########
@@ -0,0 +1,56 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.arrow</groupId>
+    <artifactId>arrow-java-root</artifactId>
+    <version>4.0.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>arrow-compression</artifactId>
+  <name>Arrow Compression</name>
+  <description>(Experimental/Contrib) A library for working with the compression/decompression of Arrow data.</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-format</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>${project.version}</version>
+      <classifier>${arrow.vector.classifier}</classifier>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-memory-unsafe</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <version>1.20</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>

Review comment:
       hmm, wonder why netty is required here  though, I'll take a closer look.

##########
File path: java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
##########
@@ -47,14 +57,21 @@ public static ArrowBodyCompression createBodyCompression(CompressionCodec codec)
   }
 
   /**
-   * Creates the {@link CompressionCodec} given the compression type.
+   * Process compression by compressing the buffer as is.
    */
-  public static CompressionCodec createCodec(byte compressionType) {
-    switch (compressionType) {
-      case NoCompressionCodec.COMPRESSION_TYPE:
-        return NoCompressionCodec.INSTANCE;
-      default:
-        throw new IllegalArgumentException("Compression type not supported: " + compressionType);
-    }
+  public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) {
+    ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
+    compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH);
+    compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex());
+    compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex());
+    return compressedBuffer;
+  }
+
+  /**
+   * Process decompression by decompressing the buffer as is.

Review comment:
       please update the docs to match, something like.
   
   "Slice the buffer to contain the uncompressed bytes"

##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();
+      return decompressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException {
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH)];
+    PlatformDependent.copyMemory(
+        compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length);
+    ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength);
+    try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = out.toByteArray();
+    ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length);
+    PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length);
+    decompressedBuffer.writerIndex(decompressedLength);
+    return decompressedBuffer;
+  }
+
+  @Override
+  public String getCodecName() {
+    return CompressionType.name(CompressionType.LZ4_FRAME);

Review comment:
       With the new enum, maybe we can make this an accessor that returns and enum instead?  and then the byte can be extracted from there where necesssary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r792333746



##########
File path: java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
+    Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The uncompressed buffer size exceeds the integer limit");
+
+    if (uncompressedBuffer.writerIndex() == 0L) {
+      // shortcut for empty buffer
+      ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      compressedBuffer.setLong(0, 0);
+      compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    }
+
+    try {
+      ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+      long compressedLength = compressedBuffer.writerIndex() - CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+      if (compressedLength > uncompressedBuffer.writerIndex()) {
+        // compressed buffer is larger, send the raw buffer
+        compressedBuffer.close();
+        compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer);
+      }
+
+      uncompressedBuffer.close();
+      return compressedBuffer;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException {
+    byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+    PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex());
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (InputStream in = new ByteArrayInputStream(inBytes);
+         OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+      IOUtils.copy(in, out);
+    }
+
+    byte[] outBytes = baos.toByteArray();
+
+    ArrowBuf compressedBuffer = allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+    long uncompressedLength = uncompressedBuffer.writerIndex();
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      uncompressedLength = Long.reverseBytes(uncompressedLength);
+    }
+    // first 8 bytes reserved for uncompressed length, to be consistent with the
+    // C++ implementation.
+    compressedBuffer.setLong(0, uncompressedLength);
+
+    PlatformDependent.copyMemory(
+        outBytes, 0, compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+    compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+    return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) {
+    Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE,
+        "The compressed buffer size exceeds the integer limit");
+
+    Preconditions.checkArgument(compressedBuffer.writerIndex() >= CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH,
+        "Not enough data to decompress.");
+
+    long decompressedLength = compressedBuffer.getLong(0);
+    if (!MemoryUtil.LITTLE_ENDIAN) {
+      decompressedLength = Long.reverseBytes(decompressedLength);
+    }
+
+    if (decompressedLength == 0L) {
+      // shortcut for empty buffer
+      compressedBuffer.close();
+      return allocator.getEmpty();
+    }
+
+    if (decompressedLength == CompressionUtil.NO_COMPRESSION_LENGTH) {
+      // no compression
+      return CompressionUtil.decompressRawBuffer(compressedBuffer);
+    }
+
+    try {
+      ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer);
+      compressedBuffer.close();

Review comment:
       @garyelephant Do you have an example reproducing the problem?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-772718996


   See PR #9408 for integration tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] liyafan82 commented on pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

Posted by GitBox <gi...@apache.org>.
liyafan82 commented on pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#issuecomment-784850125


   To avoid the direct dependency on the lz4 library, I have extracted the concrete compression codec implementations to a separate module. Will continue to work on the integration tests. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org