You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/13 03:13:48 UTC

[GitHub] [arrow-rs] liukun4515 opened a new pull request, #1855: support compression for IPC

liukun4515 opened a new pull request, #1855:
URL: https://github.com/apache/arrow-rs/pull/1855

   # Which issue does this PR close?
   
   <!---
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #1709 #70
   
   # Rationale for this change
    
    <!---
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
   -->
   
   # What changes are included in this PR?
   
   <!---
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   
   
   <!---
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!---
   If there are any breaking changes to public APIs, please add the `breaking change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r937170314


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &Option<CompressionCodecType>,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        Some(_decompressor) if cfg!(feature = "ipc_compression") || cfg!(test) => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer =
+                    Vec::with_capacity(decompressed_length as usize);
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];

Review Comment:
   Something still isn't quite right with this code -- instead of gating the code on the `test` feature, I think the more typical pattern is to gate the entire test on the `ipc_compression` feature
   
   So something like
   
   ```rust
       #[cfg(ipc_compression)]
       #[test]
       fn read_generated_streams_200() {
           let testdata = crate::util::test_util::arrow_test_data();
           let version = "2.0.0-compression";
   ...
   }
   ```



##########
arrow/Cargo.toml:
##########
@@ -83,6 +86,8 @@ rand = { version = "0.8", default-features = false, features =  ["std", "std_rng
 criterion = { version = "0.3", default-features = false }
 flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
 tempfile = { version = "3", default-features = false }
+lz4 = { version = "1.23", default-features = false }
+zstd = { version = "0.11", default-features = false }

Review Comment:
   I don't think these need to be in dev dependencies do they? If they are already in the dependencies of the crate?



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .decompress(_input_data, &mut _uncompressed_buffer)
+                    .unwrap();
+                Buffer::from(_uncompressed_buffer)
+            }
+        }
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        _ => {

Review Comment:
   I think returning an error is the correct way but as you have identifed above you can't do that without changing the signature to `Result<Buffer>` -- but since decompression can fail we probably need to make the change



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(

Review Comment:
   do you still intend to make this change? Or is it planned for a subsequent PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939496865


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .decompress(_input_data, &mut _uncompressed_buffer)
+                    .unwrap();
+                Buffer::from(_uncompressed_buffer)
+            }
+        }
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        _ => {

Review Comment:
   @alamb 
   I file a new issue to track this, and will submit a sub pr for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1208327168

   @liukun4515 
   
   Here is my proposal (the PR to https://github.com/liukun4515/arrow-rs/pull/1 is now rendered terribly): https://github.com/apache/arrow-rs/pull/2369


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] nevi-me commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
nevi-me commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r905418959


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
    I'd be in favour of supporting adding the deps to`ipc` but not creating `ipc_compression` separately. The inconvenience of a larger binary is a lesser problem than the runtime error of not supporting compression by omission.
   The latter would need a rebuild.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1159599967

   hi @andygrove @alamb , I have implemented the IPC compression, but the ci on AMD 32 fails.
   I have no idea to fix it, can you help me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] martin-g commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
martin-g commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r897835928


##########
arrow/src/ipc/reader.rs:
##########
@@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};
 
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
+use crate::ipc::{CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};
+
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec: &CompressionCodecType) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    match compression_codec {
+        CompressionCodecType::NoCompression => {
+            Buffer::from(buf_data)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise decompress data.
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..(end_offset - start_offset)];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut uncompressed_buffer = Vec::new();
+                let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..(end_offset - start_offset)];

Review Comment:
   ```suggestion
                   let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
   ```



##########
arrow/src/ipc/reader.rs:
##########
@@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};
 
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
+use crate::ipc::{CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};
+
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec: &CompressionCodecType) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    match compression_codec {
+        CompressionCodecType::NoCompression => {
+            Buffer::from(buf_data)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise decompress data.

Review Comment:
   ```suggestion
               // if the data is compressed, decompress the data, otherwise return as is
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -37,6 +38,9 @@ use crate::record_batch::RecordBatch;
 use crate::util::bit_util;
 
 use ipc::CONTINUATION_MARKER;
+use crate::ipc::{BodyCompressionMethod, CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};

Review Comment:
   ```suggestion
   use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA, LENGTH_NO_COMPRESSED_DATA};
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -912,19 +945,64 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
+    let origin_buffer_len = buffer.len();
+    let mut compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer.as_slice(), origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer.as_slice(), 0)
+            } else {
+                compression_codec.compress(buffer.as_slice(), &mut compression_buffer).unwrap();
+                if compression_buffer.len() > origin_buffer_len {
+                    // the length of compressed data is larger than uncompressed data
+                    // use the uncompressed data with -1
+                    // -1 indicate that we don't compress the data
+                    (buffer.as_slice(), -1)

Review Comment:
   ```suggestion
                       (buffer.as_slice(), LENGTH_NO_COMPRESSED_DATA)
   ```



##########
arrow/src/ipc/reader.rs:
##########
@@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader};
 
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
+use crate::ipc::{CompressionType};
+use crate::ipc::compression::compression::CompressionCodecType;
+use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA};
+
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec: &CompressionCodecType) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    match compression_codec {
+        CompressionCodecType::NoCompression => {
+            Buffer::from(buf_data)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise decompress data.
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..(end_offset - start_offset)];

Review Comment:
   ```suggestion
                   let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934114196


##########
arrow/src/ipc/writer.rs:
##########
@@ -1090,20 +1224,68 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &[u8],
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer);
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer, origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer, 0)
+            } else if cfg!(feature = "ipc_compression") || cfg!(test) {
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .compress(buffer, &mut _compression_buffer)
+                    .unwrap();
+                let compression_len = _compression_buffer.len();
+                if compression_len > origin_buffer_len {
+                    // the length of compressed data is larger than uncompressed data
+                    // use the uncompressed data with -1
+                    // -1 indicate that we don't compress the data
+                    (buffer, LENGTH_NO_COMPRESSED_DATA)
+                } else {
+                    // use the compressed data with uncompressed length
+                    (_compression_buffer.as_slice(), origin_buffer_len as i64)
+                }
+            } else {
+                panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable");
+            }
+        }
+    };
+    let len = data.len() as i64;
+    let total_len = if compression_codec == &CompressionCodecType::NoCompression {
+        buffers.push(ipc::Buffer::new(offset, len));
+        len
+    } else {
+        buffers.push(ipc::Buffer::new(offset, LENGTH_OF_PREFIX_DATA + len));
+        // write the prefix of the uncompressed length
+        let uncompression_len_buf: [u8; 8] = uncompression_buffer_len.to_le_bytes();
+        arrow_data.extend_from_slice(&uncompression_len_buf);
+        LENGTH_OF_PREFIX_DATA + len
+    };
+    arrow_data.extend_from_slice(data);
+    // padding and make offset 8 bytes aligned
+    let pad_len = pad_to_8(len as u32) as i64;

Review Comment:
   Is it padding to data len? Or total len?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934252792


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .decompress(_input_data, &mut _uncompressed_buffer)
+                    .unwrap();
+                Buffer::from(_uncompressed_buffer)
+            }
+        }
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        _ => {
+            panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable");
+        }
+    }
+}
+
+/// Get the uncompressed length
+/// Notes:
+///   -1: indicate that the data that follows is not compressed
+///    0: indicate that there is no data
+///   positive number: indicate the uncompressed length for the following data
+fn read_uncompressed_size(buffer: &[u8]) -> i64 {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1159647562

   The integration test was failed from https://github.com/apache/arrow-rs/runs/6952125461?check_suite_focus=true#step:6:15070
   and check the cause of c++ exception: https://github.com/apache/arrow/blob/4ade394a7a0fa22ecb8ef2e3b4dc8bb42e599274/cpp/src/arrow/ipc/reader.cc#L198
   
   I'm confused about why we need to make 8 bytes aligned for each buffer


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] nevi-me commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
nevi-me commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r905418959


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
    I'd be in favour of supporting `ipc` but not `ipc_compression` separately. The inconvenience of a larger binary is a lesser problem than the runtime error of not supporting compression by omission.
   The latter would need a rebuild.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 closed pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 closed pull request #1855: support compression for IPC
URL: https://github.com/apache/arrow-rs/pull/1855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934108724


##########
arrow/src/ipc/writer.rs:
##########
@@ -1090,20 +1224,68 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(

Review Comment:
   TODO:
   change the output arg and return `Return<i64>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934278953


##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "ipc_compression", test))]
+mod compression_function {
+    use crate::error::Result;
+    use crate::ipc::compression::ipc_compression::CompressionCodecType;
+    use std::io::{Read, Write};
+
+    impl CompressionCodecType {
+        pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+            match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().1.unwrap();
+                    Ok(())
+                }
+                CompressionCodecType::Zstd => {
+                    let mut encoder = zstd::Encoder::new(output, 0).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().unwrap();
+                    Ok(())
+                }
+                _ => Ok(()),
+            }
+        }
+
+        pub fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
+            let result: Result<usize> = match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut decoder = lz4::Decoder::new(input)?;
+                    match decoder.read_to_end(output) {
+                        Ok(size) => Ok(size),
+                        Err(e) => Err(e.into()),
+                    }
+                }
+                CompressionCodecType::Zstd => {
+                    let mut decoder = zstd::Decoder::new(input)?;
+                    match decoder.read_to_end(output) {
+                        Ok(size) => Ok(size),
+                        Err(e) => Err(e.into()),
+                    }
+                }
+                _ => Ok(input.len()),

Review Comment:
   just handle the LZ4 and ZSTD branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1203571484

   @viirya @alamb  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939497526


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &Option<CompressionCodecType>,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        Some(_decompressor) if cfg!(feature = "ipc_compression") || cfg!(test) => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer =
+                    Vec::with_capacity(decompressed_length as usize);
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];

Review Comment:
   `#[cfg(any(feature = "ipc_compression", test))]` this is not the feature of test, just  feature `ipc_compression` or `test`.
   It only can be compiled with the compile flag feature=`ipc_compression` or test.
   
   There are some usage like this in parquet. https://github.com/apache/arrow-rs/blob/30c94dbf1c422f81f8520b9956e96ab7b53c3f47/parquet/src/compression.rs#L76



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939497526


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &Option<CompressionCodecType>,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        Some(_decompressor) if cfg!(feature = "ipc_compression") || cfg!(test) => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer =
+                    Vec::with_capacity(decompressed_length as usize);
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];

Review Comment:
   `#[cfg(any(feature = "ipc_compression", test))]` this is not the feature of test, just  feature `ipc_compression` or `test`.
   It only can be compiled with the compile flag `feature=`ipc_compression` or test`.
   
   There are some usage like this in parquet. https://github.com/apache/arrow-rs/blob/30c94dbf1c422f81f8520b9956e96ab7b53c3f47/parquet/src/compression.rs#L76



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1207202033

   > @liukun4515 -- perhaps I can find some time to try and help with this PR. I will try to do so tomorrow
   
   Thank you, you can also ping me through the slack.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1159758336

   I think I fix the ci


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903715450


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +33,74 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                // TODO consider the error result
+                #[cfg(any(feature = "zstd,lz4", test))]

Review Comment:
   agree with you, @viirya . I have no idea and how to control this.
   Could you please give me some suggestion or advice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1211487001

   Thanks for @alamb cooperation.
   I will close this pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934242035


##########
arrow/src/ipc/writer.rs:
##########
@@ -1090,20 +1224,68 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &[u8],
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer);
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer, origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer, 0)
+            } else if cfg!(feature = "ipc_compression") || cfg!(test) {
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .compress(buffer, &mut _compression_buffer)
+                    .unwrap();
+                let compression_len = _compression_buffer.len();
+                if compression_len > origin_buffer_len {
+                    // the length of compressed data is larger than uncompressed data
+                    // use the uncompressed data with -1
+                    // -1 indicate that we don't compress the data
+                    (buffer, LENGTH_NO_COMPRESSED_DATA)
+                } else {
+                    // use the compressed data with uncompressed length
+                    (_compression_buffer.as_slice(), origin_buffer_len as i64)
+                }
+            } else {
+                panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable");
+            }
+        }
+    };
+    let len = data.len() as i64;
+    let total_len = if compression_codec == &CompressionCodecType::NoCompression {
+        buffers.push(ipc::Buffer::new(offset, len));
+        len
+    } else {
+        buffers.push(ipc::Buffer::new(offset, LENGTH_OF_PREFIX_DATA + len));
+        // write the prefix of the uncompressed length
+        let uncompression_len_buf: [u8; 8] = uncompression_buffer_len.to_le_bytes();
+        arrow_data.extend_from_slice(&uncompression_len_buf);
+        LENGTH_OF_PREFIX_DATA + len
+    };
+    arrow_data.extend_from_slice(data);
+    // padding and make offset 8 bytes aligned
+    let pad_len = pad_to_8(len as u32) as i64;

Review Comment:
   In each buffer, we have two struct, one is the metadata which store `offset` and `actual len` of data, the other is the data.
   The actual len is the `total_len`.
   The pad_len is just used to align the buffer.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934251776


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();

Review Comment:
   good point, we can init the vec with the capacity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934253633


##########
arrow/src/ipc/reader.rs:
##########
@@ -603,6 +674,16 @@ pub fn read_record_batch(
     let field_nodes = batch.nodes().ok_or_else(|| {
         ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
     })?;
+    let option_compression = batch.compression();
+    let compression_codec = match option_compression {
+        None => CompressionCodecType::NoCompression,
+        Some(compression) => match compression.codec() {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,

Review Comment:
   No, in current version, just support the LZ4 and zstd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934107248


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();

Review Comment:
   We know decompressed_length at this point. Should we allocate the vector with enough capacity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934108184


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(

Review Comment:
   TODO:
   we should  change the output arg to `Result<Buffer>` and return error message if the buffer can't be read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934107885


##########
arrow/src/ipc/reader.rs:
##########
@@ -603,6 +674,16 @@ pub fn read_record_batch(
     let field_nodes = batch.nodes().ok_or_else(|| {
         ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
     })?;
+    let option_compression = batch.compression();
+    let compression_codec = match option_compression {
+        None => CompressionCodecType::NoCompression,
+        Some(compression) => match compression.codec() {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,

Review Comment:
   Is there other option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1207190526

   @liukun4515  -- perhaps I can find some time to try and help with this PR. I will try to do so tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934103439


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .decompress(_input_data, &mut _uncompressed_buffer)
+                    .unwrap();
+                Buffer::from(_uncompressed_buffer)
+            }
+        }
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        _ => {

Review Comment:
   How to handle this?
   If the rust service compiled without the `ipc_compression` and receive a message with the ipc compression feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934104663


##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Compress buffer just support from metadata v5".to_string(),

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934279186


##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "ipc_compression", test))]
+mod compression_function {
+    use crate::error::Result;
+    use crate::ipc::compression::ipc_compression::CompressionCodecType;
+    use std::io::{Read, Write};
+
+    impl CompressionCodecType {
+        pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+            match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().1.unwrap();
+                    Ok(())
+                }
+                CompressionCodecType::Zstd => {
+                    let mut encoder = zstd::Encoder::new(output, 0).unwrap();

Review Comment:
   fix, and pass the error out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934102744


##########
arrow/src/ipc/mod.rs:
##########
@@ -22,6 +22,7 @@ pub mod convert;
 pub mod reader;
 pub mod writer;
 
+mod compression;

Review Comment:
   some attributes should be public, and will used to be checked for reading ipc message which may be generated by other languages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903077863


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
   So by default we will include zstd, lz4 dependencies? If we don't use ipc or don't use compressed ipc, seems the dependencies are not necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903704042


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
   Yes, include the dependency by default.
   buffer compression protocol is common protocol for all languages in the arrow ecosystem, if we use the compression/decompression as optional, we can read the file or stream from compressed side.
   2.0.0 compression has been implemented in the Java, C++ and other languages, If your rust server receiver a message compressed by the protocol, we can read them by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r905492582


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
   I have encountered an issue on cross-compiling lz4 crate on some platform. We don't use ipc so we can choose not to include ipc feature at all. I'm wondering if ipc compression can't be excluded, there might be some issues like that. Although this sounds like corner case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903719721


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +33,74 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                // TODO consider the error result
+                #[cfg(any(feature = "zstd,lz4", test))]

Review Comment:
   I follow the `#[cfg(any(feature` usage like other model to resolve the error from the CI
   ```
   cargo build --features=csv,ipc,simd,lz4,zstd --target wasm32-unknown-unknown
   ```
   If I don't use this method, I can't pass the ci for target wasm32-unknown-unknown
   I search the reason and find the issue https://github.com/apache/arrow-rs/issues/180 and https://github.com/apache/arrow-rs/issues/180 which have the same problem.
   @alamb  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939500379


##########
arrow/Cargo.toml:
##########
@@ -83,6 +86,8 @@ rand = { version = "0.8", default-features = false, features =  ["std", "std_rng
 criterion = { version = "0.3", default-features = false }
 flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
 tempfile = { version = "3", default-features = false }
+lz4 = { version = "1.23", default-features = false }
+zstd = { version = "0.11", default-features = false }

Review Comment:
   we can’t remove the lz4 and zstd from dev-dependency.
   The ipc_compression is not in the default feature, we can‘t run `cargo test` with the `lz4` and `zstd` lib.
   But we need to run the ipc compiression UT in CI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1165726310

   I think there are some comments are not addressed yet? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903701041


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]
 csv = ["csv_crate"]
 ipc = ["flatbuffers"]

Review Comment:
   I also then think we would need to add an entry for `byteorder` under `features`
   
   So if we want `ipc` to always support compression (seems reasonable to me?) it would be like
   
   ```suggestion
   ipc = ["flatbuffers", "zstd", "lz4", "byte_order"]
   ```
   
   If we want another specific feature to control ipc compression (e.g. `ipc_compression`) perhaps we could do something like
   
   ```toml
   [features]
   ipc_compression = ["zstd", "lz4", "byte_order"]
   ```
   ?



##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "zstd,lz4", test))]
+mod compression_function {
+    use crate::error::Result;
+    use crate::ipc::compression::ipc_compression::CompressionCodecType;
+    use std::io::{Read, Write};
+
+    impl CompressionCodecType {
+        pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+            match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();
+                    encoder.write_all(input).unwrap();

Review Comment:
   I suggest passing the errors back out of here directly (as `compress` returns a `Result`) rather than `unwrap()` which will `panic` on error



##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "zstd,lz4", test))]

Review Comment:
   I wonder if you can put this guard on the entire `mod ipc_compression` statement so that the entire module (including the test) is not compiled unless that feature is active 



##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Compress buffer just support from metadata v5".to_string(),

Review Comment:
   ```suggestion
                           "Compression only supported in metadata v5 and above".to_string(),
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {

Review Comment:
   should this check be `<` instead of `!=` to cover future versions?
   
   ```suggestion
                   if metadata_version < ipc::MetadataVersion::V5 {
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -922,20 +1021,67 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer.as_slice());
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer.as_slice(), origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {

Review Comment:
   It seems like this code should throw an "Not supported" error (or `panic` if it is encountered without support compiled in)



##########
arrow/src/ipc/reader.rs:
##########
@@ -1414,6 +1493,53 @@ mod tests {
         });
     }
 
+    #[test]
+    fn read_generated_streams_200() {
+        let testdata = crate::util::test_util::arrow_test_data();
+        let version = "2.0.0-compression";
+
+        // the test is repetitive, thus we can read all supported files at once
+        let paths = vec!["generated_lz4", "generated_zstd"];
+        paths.iter().for_each(|path| {
+            let file = File::open(format!(
+                "{}/arrow-ipc-stream/integration/{}/{}.stream",
+                testdata, version, path
+            ))
+            .unwrap();
+
+            let mut reader = StreamReader::try_new(file, None).unwrap();
+
+            // read expected JSON output
+            let arrow_json = read_gzip_json(version, path);
+            assert!(arrow_json.equals_reader(&mut reader));
+            // the next batch must be empty
+            assert!(reader.next().is_none());
+            // the stream must indicate that it's finished
+            assert!(reader.is_finished());
+        });
+    }
+
+    #[test]
+    fn read_generated_files_200() {

Review Comment:
   nice!



##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(

Review Comment:
   I wonder if you could avoid the duplication here with more of a `Builder` style:
   
   ```rust
   impl IpcWriteOptions {
     pub fn with_compression(mut self, batch_compression_type: CompressionCodecType) -> Result<Self> {
       .. // do checks here
       self.batch_compresson_type = batch_compression_type;
       Ok(self)
     }
   ...
   }
   ```
   
   Then one could use it like:
   
   ```rust
     let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::v5)?
       .with_compression(CompressionCodecType::LZ4)?;
   ...
   ```



##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
   I agree -- I suggest something like
   
   ```suggestion
   default = ["csv", "ipc", "test_utils"]
   ```
   



##########
arrow/src/ipc/mod.rs:
##########
@@ -22,6 +22,7 @@ pub mod convert;
 pub mod reader;
 pub mod writer;
 
+mod compression;

Review Comment:
   I recommend guarding this entire thing via `#cfg(...)` rather than more granular guards within the module



##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Compress buffer just support from metadata v5".to_string(),
+                    ));
+                }
+            }
+        };
+        match metadata_version {
+            ipc::MetadataVersion::V5 => {
+                if write_legacy_ipc_format {
+                    Err(ArrowError::InvalidArgumentError(
+                        "Legacy IPC format only supported on metadata version 4"
+                            .to_string(),
+                    ))
+                } else {
+                    Ok(Self {
+                        alignment,
+                        write_legacy_ipc_format,
+                        metadata_version,
+                        batch_compression_type,
+                    })
+                }
+            }
+            z => panic!("Unsupported ipc::MetadataVersion {:?}", z),

Review Comment:
   Also given that this function returns a `Result` it seems like we could return a proper error here rather than `panic`ing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903740918


##########
arrow/src/ipc/writer.rs:
##########
@@ -922,20 +1021,67 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer.as_slice());
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer.as_slice(), origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer.as_slice(), 0)
+            } else {
+                #[cfg(any(feature = "zstd,lz4", test))]
+                compression_codec
+                    .compress(buffer.as_slice(), &mut _compression_buffer)
+                    .unwrap();

Review Comment:
   @viirya comments like above reply
   Maybe I can resolve this option or compile issue in the next issue or pull request.
   In this pull request, we can focus on the protocal of IPC compression 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934242035


##########
arrow/src/ipc/writer.rs:
##########
@@ -1090,20 +1224,68 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &[u8],
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer);
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer, origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer, 0)
+            } else if cfg!(feature = "ipc_compression") || cfg!(test) {
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .compress(buffer, &mut _compression_buffer)
+                    .unwrap();
+                let compression_len = _compression_buffer.len();
+                if compression_len > origin_buffer_len {
+                    // the length of compressed data is larger than uncompressed data
+                    // use the uncompressed data with -1
+                    // -1 indicate that we don't compress the data
+                    (buffer, LENGTH_NO_COMPRESSED_DATA)
+                } else {
+                    // use the compressed data with uncompressed length
+                    (_compression_buffer.as_slice(), origin_buffer_len as i64)
+                }
+            } else {
+                panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable");
+            }
+        }
+    };
+    let len = data.len() as i64;
+    let total_len = if compression_codec == &CompressionCodecType::NoCompression {
+        buffers.push(ipc::Buffer::new(offset, len));
+        len
+    } else {
+        buffers.push(ipc::Buffer::new(offset, LENGTH_OF_PREFIX_DATA + len));
+        // write the prefix of the uncompressed length
+        let uncompression_len_buf: [u8; 8] = uncompression_buffer_len.to_le_bytes();
+        arrow_data.extend_from_slice(&uncompression_len_buf);
+        LENGTH_OF_PREFIX_DATA + len
+    };
+    arrow_data.extend_from_slice(data);
+    // padding and make offset 8 bytes aligned
+    let pad_len = pad_to_8(len as u32) as i64;

Review Comment:
   In each buffer, we have two struct, one is the metadata which store `offset` and `actual len` of data.
   The actual len is the `total_len`.
   The pad_len is just used to align the buffer.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934253633


##########
arrow/src/ipc/reader.rs:
##########
@@ -603,6 +674,16 @@ pub fn read_record_batch(
     let field_nodes = batch.nodes().ok_or_else(|| {
         ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
     })?;
+    let option_compression = batch.compression();
+    let compression_codec = match option_compression {
+        None => CompressionCodecType::NoCompression,
+        Some(compression) => match compression.codec() {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,

Review Comment:
   No, from current version, just support the LZ4 and zstd.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934107579


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .decompress(_input_data, &mut _uncompressed_buffer)
+                    .unwrap();
+                Buffer::from(_uncompressed_buffer)
+            }
+        }
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        _ => {
+            panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable");
+        }
+    }
+}
+
+/// Get the uncompressed length
+/// Notes:
+///   -1: indicate that the data that follows is not compressed
+///    0: indicate that there is no data
+///   positive number: indicate the uncompressed length for the following data
+fn read_uncompressed_size(buffer: &[u8]) -> i64 {

Review Comment:
   `#[inline]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1159663814

   > hi @andygrove @alamb , I have implemented the IPC compression, but the ci on AMD 32 fails. I have no idea to fix it, can you help me?
   
   @martin-g Could you please take a look this ci problem?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939496168


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(

Review Comment:
   https://github.com/apache/arrow-rs/issues/2342



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r900994929


##########
arrow/src/ipc/writer.rs:
##########
@@ -922,19 +1021,67 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
+    let origin_buffer_len = buffer.len();
+    let mut compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer.as_slice(), origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer.as_slice(), 0)
+            } else {
+                compression_codec
+                    .compress(buffer.as_slice(), &mut compression_buffer)
+                    .unwrap();
+                if compression_buffer.len() > origin_buffer_len {
+                    // the length of compressed data is larger than uncompressed data
+                    // use the uncompressed data with -1
+                    // -1 indicate that we don't compress the data
+                    (buffer.as_slice(), LENGTH_NO_COMPRESSED_DATA)
+                } else {
+                    // use the compressed data with uncompressed length
+                    (compression_buffer.as_slice(), origin_buffer_len as i64)
+                }
+            }
+        }
+    };
+    let len = data.len() as i64;
+    // TODO: don't need to pad each buffer, and just need to pad the tail of the message body
+    // let pad_len = pad_to_8(len as u32);

Review Comment:
   If this make sense to you, I will drop these commented code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1162268676

   Thanks @liukun4515  -- I will try and find time tomorrow to review this PR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903732476


##########
arrow/src/ipc/writer.rs:
##########
@@ -517,17 +610,20 @@ impl<W: Write> FileWriter<W> {
         let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write magic to header
+        let mut header_size: usize = 0;
         writer.write_all(&super::ARROW_MAGIC[..])?;
+        header_size += super::ARROW_MAGIC.len();
         // create an 8-byte boundary after the header
         writer.write_all(&[0, 0])?;
+        header_size += 2;
         // write the schema, set the written bytes to the schema + header
         let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
         let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
         Ok(Self {
             writer,
             write_options,
             schema: schema.clone(),
-            block_offsets: meta + data + 8,
+            block_offsets: meta + data + header_size,

Review Comment:
   `try_new_with_options` is used to write `schema` to the file or stream.
   In the arrow format of IPC format, the layout is from the doc https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format
   
   First, we will write
   <magic number "ARROW1">
   <empty padding bytes [to 8 byte boundary]>
   the size of above part is 8bytes and is the length of header_size



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939500556


##########
arrow/Cargo.toml:
##########
@@ -83,6 +86,8 @@ rand = { version = "0.8", default-features = false, features =  ["std", "std_rng
 criterion = { version = "0.3", default-features = false }
 flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
 tempfile = { version = "3", default-features = false }
+lz4 = { version = "1.23", default-features = false }
+zstd = { version = "0.11", default-features = false }

Review Comment:
   I try to remove these from the dev-dependency, can run `cargo test`.
   I got the compile error.
   ```
   error[E0433]: failed to resolve: use of undeclared crate or module `lz4`
     --> arrow/src/ipc/compression/ipc_compression.rs:57:39
      |
   57 |                     let mut encoder = lz4::EncoderBuilder::new().build(output)?;
      |                                       ^^^ use of undeclared crate or module `lz4`
   
   error[E0433]: failed to resolve: use of undeclared crate or module `zstd`
     --> arrow/src/ipc/compression/ipc_compression.rs:65:39
      |
   65 |                     let mut encoder = zstd::Encoder::new(output, 0)?;
      |                                       ^^^^ use of undeclared crate or module `zstd`
      |
   help: there is a crate or module with a similar name
      |
   65 |                     let mut encoder = std::Encoder::new(output, 0)?;
      |                                       ~~~
   
   ```
   @alamb 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939497022


##########
arrow/src/ipc/writer.rs:
##########
@@ -1090,20 +1224,68 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(

Review Comment:
   #2342



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934106051


##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "ipc_compression", test))]
+mod compression_function {
+    use crate::error::Result;
+    use crate::ipc::compression::ipc_compression::CompressionCodecType;
+    use std::io::{Read, Write};
+
+    impl CompressionCodecType {
+        pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+            match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().1.unwrap();
+                    Ok(())
+                }
+                CompressionCodecType::Zstd => {
+                    let mut encoder = zstd::Encoder::new(output, 0).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().unwrap();
+                    Ok(())
+                }
+                _ => Ok(()),
+            }
+        }
+
+        pub fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
+            let result: Result<usize> = match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut decoder = lz4::Decoder::new(input)?;
+                    match decoder.read_to_end(output) {
+                        Ok(size) => Ok(size),
+                        Err(e) => Err(e.into()),
+                    }
+                }
+                CompressionCodecType::Zstd => {
+                    let mut decoder = zstd::Decoder::new(input)?;
+                    match decoder.read_to_end(output) {
+                        Ok(size) => Ok(size),
+                        Err(e) => Err(e.into()),
+                    }
+                }
+                _ => Ok(input.len()),

Review Comment:
   Is this for `CompressionCodecType::NoCompression`? If so, do we need to copy form input to output?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934104555


##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1208232311

   > @liukun4515 -- I made good progress on sorting out the feature flags. A draft PR is here [liukun4515#1](https://github.com/liukun4515/arrow-rs/pull/1) in case you want to see the direction I am headed.
   > 
   > I sadly ran out of time today, but I will try and finish it up tomorrow.
   
   thanks for your help!!!
   I will review your work tomorrow, because I am busy doing and discussing about optimization of decimal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1165617976

   What is the status of this PR? Is it ready to go? Do we need to mess with the feature flags more?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] pitrou commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r908515907


##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "zstd,lz4", test))]
+mod compression_function {
+    use crate::error::Result;
+    use crate::ipc::compression::ipc_compression::CompressionCodecType;
+    use std::io::{Read, Write};
+
+    impl CompressionCodecType {
+        pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+            match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();

Review Comment:
   As [the doc](https://docs.rs/lz4/1.23.3/lz4/) seems to say, "This module provides access to the block mode functions of the lz4 C library".
   LZ4_FRAME compression requires the frame functions of the lz4 C library. Apparently you can use https://docs.rs/lz4_flex/latest/lz4_flex/ instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1159606293

   After this pr merged, I will enable the IT test for 2.0.0-compression


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939497526


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &Option<CompressionCodecType>,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        Some(_decompressor) if cfg!(feature = "ipc_compression") || cfg!(test) => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer =
+                    Vec::with_capacity(decompressed_length as usize);
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];

Review Comment:
   `#[cfg(any(feature = "ipc_compression", test))]` this is not the feature of test, just  feature `ipc_compression` or `test`.
   It only can be compiled with the compile flag ```feature="ipc_compression"``` or `test`.
   
   There are some usage like in parquet. https://github.com/apache/arrow-rs/blob/30c94dbf1c422f81f8520b9956e96ab7b53c3f47/parquet/src/compression.rs#L76



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1208270593

   > I will review your work tomorrow, because I am busy doing and discussing about optimization of decimal.
   
   Thank you -- I hope to push up an update shortly. It turned into a larger change than I was expecting


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934103867


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>

Review Comment:
   need to check the compile options, if receive a message or ipc message  with the compression feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934100673


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
   I have add the feature `ipc_compression = "lz4,zstd"` as an option, if you want to compile with the ipc compression feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934105519


##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "ipc_compression", test))]
+mod compression_function {
+    use crate::error::Result;
+    use crate::ipc::compression::ipc_compression::CompressionCodecType;
+    use std::io::{Read, Write};
+
+    impl CompressionCodecType {
+        pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+            match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().1.unwrap();
+                    Ok(())
+                }
+                CompressionCodecType::Zstd => {
+                    let mut encoder = zstd::Encoder::new(output, 0).unwrap();

Review Comment:
   Should we pass the error out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934113693


##########
arrow/src/ipc/writer.rs:
##########
@@ -1090,20 +1224,68 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &[u8],
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer);
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer, origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer, 0)

Review Comment:
   ```suggestion
                   (buffer, LENGTH_EMPTY_COMPRESSED_DATA)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1159896948

   > > hi @andygrove @alamb , I have implemented the IPC compression, but the ci on AMD 32 fails. I have no idea to fix it, can you help me?
   > 
   > @martin-g Could you please take a look this ci problem?
   
   I think the issue has been resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903726865


##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Compress buffer just support from metadata v5".to_string(),
+                    ));
+                }
+            }
+        };
+        match metadata_version {
+            ipc::MetadataVersion::V5 => {
+                if write_legacy_ipc_format {
+                    Err(ArrowError::InvalidArgumentError(
+                        "Legacy IPC format only supported on metadata version 4"
+                            .to_string(),
+                    ))
+                } else {
+                    Ok(Self {
+                        alignment,
+                        write_legacy_ipc_format,
+                        metadata_version,
+                        batch_compression_type,
+                    })
+                }
+            }
+            z => panic!("Unsupported ipc::MetadataVersion {:?}", z),

Review Comment:
   I just copy some code from  `try_new`.
   `try_new` just use the `CompressionCodecType::NoCompression` and it is same with older version.
   `try_new_with_compression` want to open the compression, but I think I made a mistake.
   I should use the `CompressionType` instead of `CompressionCodecType` and make sure the compression is enable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1166293569

   Hi @alamb @viirya 
   I will continue to do this work next week after I learn the feature flag and other something.
   I will make the pr as a draft.
   After the pr is ready and address all the comments, I will reopen it.
   Sorry for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1166516364

   Thanks @liukun4515 . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934101360


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]
 csv = ["csv_crate"]
 ipc = ["flatbuffers"]

Review Comment:
   @alamb I have add the a feature to as an option compile flag `ipc_compression = ["zstd","lz4","byte_order"]



##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]
 csv = ["csv_crate"]
 ipc = ["flatbuffers"]

Review Comment:
   @alamb I have add the a feature to as an option compile flag `ipc_compression = ["zstd","lz4"]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934250693


##########
arrow/src/ipc/compression/ipc_compression.rs:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodecType {
+    NoCompression,
+    Lz4Frame,
+    Zstd,
+}
+
+impl From<CompressionType> for CompressionCodecType {
+    fn from(compression_type: CompressionType) -> Self {
+        match compression_type {
+            CompressionType::ZSTD => CompressionCodecType::Zstd,
+            CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame,
+            _ => CompressionCodecType::NoCompression,
+        }
+    }
+}
+
+impl From<CompressionCodecType> for Option<CompressionType> {
+    fn from(codec: CompressionCodecType) -> Self {
+        match codec {
+            CompressionCodecType::NoCompression => None,
+            CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME),
+            CompressionCodecType::Zstd => Some(CompressionType::ZSTD),
+        }
+    }
+}
+
+#[cfg(any(feature = "ipc_compression", test))]
+mod compression_function {
+    use crate::error::Result;
+    use crate::ipc::compression::ipc_compression::CompressionCodecType;
+    use std::io::{Read, Write};
+
+    impl CompressionCodecType {
+        pub fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+            match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().1.unwrap();
+                    Ok(())
+                }
+                CompressionCodecType::Zstd => {
+                    let mut encoder = zstd::Encoder::new(output, 0).unwrap();
+                    encoder.write_all(input).unwrap();
+                    encoder.finish().unwrap();
+                    Ok(())
+                }
+                _ => Ok(()),
+            }
+        }
+
+        pub fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
+            let result: Result<usize> = match self {
+                CompressionCodecType::Lz4Frame => {
+                    let mut decoder = lz4::Decoder::new(input)?;
+                    match decoder.read_to_end(output) {
+                        Ok(size) => Ok(size),
+                        Err(e) => Err(e.into()),
+                    }
+                }
+                CompressionCodecType::Zstd => {
+                    let mut decoder = zstd::Decoder::new(input)?;
+                    match decoder.read_to_end(output) {
+                        Ok(size) => Ok(size),
+                        Err(e) => Err(e.into()),
+                    }
+                }
+                _ => Ok(input.len()),

Review Comment:
   remove the `NoCompression` in the CompressionCodecType.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] codecov-commenter commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1159518675

   # [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1855?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1855](https://codecov.io/gh/apache/arrow-rs/pull/1855?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (58488a3) into [master](https://codecov.io/gh/apache/arrow-rs/commit/c7f89e1662cf5376df46886e11d0db0800233163?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c7f89e1) will **increase** coverage by `0.04%`.
   > The diff coverage is `83.26%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #1855      +/-   ##
   ==========================================
   + Coverage   83.42%   83.47%   +0.04%     
   ==========================================
     Files         214      215       +1     
     Lines       57015    57262     +247     
   ==========================================
   + Hits        47567    47798     +231     
   - Misses       9448     9464      +16     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-rs/pull/1855?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [arrow/src/ipc/writer.rs](https://codecov.io/gh/apache/arrow-rs/pull/1855/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2lwYy93cml0ZXIucnM=) | `81.08% <79.19%> (-0.71%)` | :arrow_down: |
   | [arrow/src/ipc/compression/compression.rs](https://codecov.io/gh/apache/arrow-rs/pull/1855/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2lwYy9jb21wcmVzc2lvbi9jb21wcmVzc2lvbi5ycw==) | `79.24% <79.24%> (ø)` | |
   | [arrow/src/ipc/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1855/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2lwYy9yZWFkZXIucnM=) | `91.16% <98.18%> (+0.33%)` | :arrow_up: |
   | [parquet\_derive/src/parquet\_field.rs](https://codecov.io/gh/apache/arrow-rs/pull/1855/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGFycXVldF9kZXJpdmUvc3JjL3BhcnF1ZXRfZmllbGQucnM=) | `65.53% <0.00%> (-0.69%)` | :arrow_down: |
   | [arrow/src/record\_batch.rs](https://codecov.io/gh/apache/arrow-rs/pull/1855/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL3JlY29yZF9iYXRjaC5ycw==) | `93.89% <0.00%> (-0.11%)` | :arrow_down: |
   | [arrow/src/json/reader.rs](https://codecov.io/gh/apache/arrow-rs/pull/1855/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2pzb24vcmVhZGVyLnJz) | `84.58% <0.00%> (ø)` | |
   | [arrow/src/ipc/gen/Message.rs](https://codecov.io/gh/apache/arrow-rs/pull/1855/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YXJyb3cvc3JjL2lwYy9nZW4vTWVzc2FnZS5ycw==) | `43.06% <0.00%> (+7.67%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1855?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-rs/pull/1855?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [c7f89e1...58488a3](https://codecov.io/gh/apache/arrow-rs/pull/1855?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r900994846


##########
arrow/src/ipc/writer.rs:
##########
@@ -922,19 +1021,67 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
+    let origin_buffer_len = buffer.len();
+    let mut compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer.as_slice(), origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer.as_slice(), 0)
+            } else {
+                compression_codec
+                    .compress(buffer.as_slice(), &mut compression_buffer)
+                    .unwrap();
+                if compression_buffer.len() > origin_buffer_len {
+                    // the length of compressed data is larger than uncompressed data
+                    // use the uncompressed data with -1
+                    // -1 indicate that we don't compress the data
+                    (buffer.as_slice(), LENGTH_NO_COMPRESSED_DATA)
+                } else {
+                    // use the compressed data with uncompressed length
+                    (compression_buffer.as_slice(), origin_buffer_len as i64)
+                }
+            }
+        }
+    };
+    let len = data.len() as i64;
+    // TODO: don't need to pad each buffer, and just need to pad the tail of the message body
+    // let pad_len = pad_to_8(len as u32);

Review Comment:
   I read format/IPC [spec](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) and got that we don't need to align each buffer and pad the `0` to the tail of each buffer, just pad the `0` to the tail of message body.
   @martin-g @alamb 
   Is there any error for my understanding for the IPC format?
   Please feel free to point out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r934611685


##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(

Review Comment:
   The constructor may be refactor by follow up PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939497526


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &Option<CompressionCodecType>,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        Some(_decompressor) if cfg!(feature = "ipc_compression") || cfg!(test) => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer =
+                    Vec::with_capacity(decompressed_length as usize);
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];

Review Comment:
   `#[cfg(any(feature = "ipc_compression", test))]` this is not the feature of test, just  feature `ipc_compression` or `test`.
   It only can be compiled with the compile flag ```feature="ipc_compression"``` or `test`.
   
   There are some usage like this in parquet. https://github.com/apache/arrow-rs/blob/30c94dbf1c422f81f8520b9956e96ab7b53c3f47/parquet/src/compression.rs#L76



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r939500379


##########
arrow/Cargo.toml:
##########
@@ -83,6 +86,8 @@ rand = { version = "0.8", default-features = false, features =  ["std", "std_rng
 criterion = { version = "0.3", default-features = false }
 flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
 tempfile = { version = "3", default-features = false }
+lz4 = { version = "1.23", default-features = false }
+zstd = { version = "0.11", default-features = false }

Review Comment:
   we can’t remove the lz4 and zstd from dev-dependency.
   The ipc_compression is the default feature, we can run `cargo test` with the `lz4` and `zstd` lib.
   But we need to run the ipc compiression UT in CI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1207217704

   > Thank you, you can also ping me through the slack.
   
   Thanks @liukun4515  -- I think I have gotten to the point where I can't offer any more specific suggestions on structure without trying it myself. I hope to try and rearrange the feature-flags and make a proposed PR to your branch.
   
   I won't have a chance to do it until tomorrow however.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903077863


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
   So by default we will include zstd, lz4 dependencies?



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +33,74 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                // TODO consider the error result
+                #[cfg(any(feature = "zstd,lz4", test))]

Review Comment:
   Hmm, if these features are not used, what will it happened? I think an explicit error is necessary.



##########
arrow/src/ipc/writer.rs:
##########
@@ -922,20 +1021,67 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer.as_slice());
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer.as_slice(), origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer.as_slice(), 0)
+            } else {
+                #[cfg(any(feature = "zstd,lz4", test))]
+                compression_codec
+                    .compress(buffer.as_slice(), &mut _compression_buffer)
+                    .unwrap();

Review Comment:
   I think we can prevent using compression option in `try_new_with_compression` if these features are not included.



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +33,74 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy

Review Comment:
   ```suggestion
                   // empty
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -517,17 +610,20 @@ impl<W: Write> FileWriter<W> {
         let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write magic to header
+        let mut header_size: usize = 0;
         writer.write_all(&super::ARROW_MAGIC[..])?;
+        header_size += super::ARROW_MAGIC.len();
         // create an 8-byte boundary after the header
         writer.write_all(&[0, 0])?;
+        header_size += 2;
         // write the schema, set the written bytes to the schema + header
         let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
         let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
         Ok(Self {
             writer,
             write_options,
             schema: schema.clone(),
-            block_offsets: meta + data + 8,
+            block_offsets: meta + data + header_size,

Review Comment:
   What is this for?



##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Compress buffer just support from metadata v5".to_string(),
+                    ));
+                }
+            }
+        };
+        match metadata_version {
+            ipc::MetadataVersion::V5 => {
+                if write_legacy_ipc_format {
+                    Err(ArrowError::InvalidArgumentError(
+                        "Legacy IPC format only supported on metadata version 4"
+                            .to_string(),
+                    ))
+                } else {
+                    Ok(Self {
+                        alignment,
+                        write_legacy_ipc_format,
+                        metadata_version,
+                        batch_compression_type,
+                    })
+                }
+            }
+            z => panic!("Unsupported ipc::MetadataVersion {:?}", z),

Review Comment:
   `ipc::MetadataVersion::V4` seems also hitting this error? But actually there is `CompressionCodecType::NoCompression` indicating no compression is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on pull request #1855: support compression for IPC

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#issuecomment-1207390129

   @liukun4515  -- I made good progress on sorting out the feature flags. A draft PR is here https://github.com/liukun4515/arrow-rs/pull/1 in case you want to see the direction I am headed.
   
   I sadly ran out of time today, but I will try and finish it up tomorrow. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org