You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/03 21:56:18 UTC

[GitHub] [arrow-rs] alamb commented on a diff in pull request #1855: support compression for IPC

alamb commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r937170314


##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &Option<CompressionCodecType>,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        Some(_decompressor) if cfg!(feature = "ipc_compression") || cfg!(test) => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer =
+                    Vec::with_capacity(decompressed_length as usize);
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];

Review Comment:
   Something still isn't quite right with this code -- instead of gating the code on the `test` feature, I think the more typical pattern is to gate the entire test on the `ipc_compression` feature
   
   So something like
   
   ```rust
       #[cfg(ipc_compression)]
       #[test]
       fn read_generated_streams_200() {
           let testdata = crate::util::test_util::arrow_test_data();
           let version = "2.0.0-compression";
   ...
   }
   ```



##########
arrow/Cargo.toml:
##########
@@ -83,6 +86,8 @@ rand = { version = "0.8", default-features = false, features =  ["std", "std_rng
 criterion = { version = "0.3", default-features = false }
 flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
 tempfile = { version = "3", default-features = false }
+lz4 = { version = "1.23", default-features = false }
+zstd = { version = "0.11", default-features = false }

Review Comment:
   I don't think these need to be in dev dependencies do they? If they are already in the dependencies of the crate?



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd
+            if cfg!(feature = "ipc_compression") || cfg!(test) =>
+        {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                #[cfg(any(feature = "ipc_compression", test))]
+                compression_codec
+                    .decompress(_input_data, &mut _uncompressed_buffer)
+                    .unwrap();
+                Buffer::from(_uncompressed_buffer)
+            }
+        }
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        _ => {

Review Comment:
   I think returning an error is the correct way but as you have identifed above you can't do that without changing the signature to `Result<Buffer>` -- but since decompression can fail we probably need to make the change



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +32,78 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(

Review Comment:
   do you still intend to make this change? Or is it planned for a subsequent PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org