You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/21 22:13:47 UTC

[GitHub] [arrow-rs] viirya commented on a diff in pull request #1855: support compression for IPC

viirya commented on code in PR #1855:
URL: https://github.com/apache/arrow-rs/pull/1855#discussion_r903077863


##########
arrow/Cargo.toml:
##########
@@ -60,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
 
 [features]
-default = ["csv", "ipc", "test_utils"]
+default = ["csv", "ipc", "test_utils", "zstd", "lz4"]

Review Comment:
   So by default we will include zstd, lz4 dependencies?



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +33,74 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy
+                let empty = Vec::<u8>::new();
+                Buffer::from(empty)
+            } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+                // not compress
+                let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                Buffer::from(data)
+            } else {
+                // decompress data using the codec
+                let mut _uncompressed_buffer = Vec::new();
+                let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..];
+                // TODO consider the error result
+                #[cfg(any(feature = "zstd,lz4", test))]

Review Comment:
   Hmm, if these features are not used, what will it happened? I think an explicit error is necessary.



##########
arrow/src/ipc/writer.rs:
##########
@@ -922,20 +1021,67 @@ fn write_array_data(
 }
 
 /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
     arrow_data: &mut Vec<u8>,
     offset: i64,
+    compression_codec: &CompressionCodecType,
 ) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer.as_slice());
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    let origin_buffer_len = buffer.len();
+    let mut _compression_buffer = Vec::<u8>::new();
+    let (data, uncompression_buffer_len) = match compression_codec {
+        CompressionCodecType::NoCompression => {
+            // this buffer_len will not used in the following logic
+            // If we don't use the compression, just write the data in the array
+            (buffer.as_slice(), origin_buffer_len as i64)
+        }
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA {
+                (buffer.as_slice(), 0)
+            } else {
+                #[cfg(any(feature = "zstd,lz4", test))]
+                compression_codec
+                    .compress(buffer.as_slice(), &mut _compression_buffer)
+                    .unwrap();

Review Comment:
   I think we can prevent using compression option in `try_new_with_compression` if these features are not included.



##########
arrow/src/ipc/reader.rs:
##########
@@ -32,15 +33,74 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::ipc_compression::CompressionCodecType;
+use crate::ipc::compression::{
+    LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA,
+};
+use crate::ipc::CompressionType;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &CompressionCodecType,
+) -> Buffer {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Buffer::from(buf_data);
+    }
+    match compression_codec {
+        CompressionCodecType::NoCompression => Buffer::from(buf_data),
+        CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => {
+            // 8byte + data
+            // read the first 8 bytes
+            // if the data is compressed, decompress the data, otherwise return as is
+            let decompressed_length = read_uncompressed_size(buf_data);
+            if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA {
+                // emtpy

Review Comment:
   ```suggestion
                   // empty
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -517,17 +610,20 @@ impl<W: Write> FileWriter<W> {
         let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write magic to header
+        let mut header_size: usize = 0;
         writer.write_all(&super::ARROW_MAGIC[..])?;
+        header_size += super::ARROW_MAGIC.len();
         // create an 8-byte boundary after the header
         writer.write_all(&[0, 0])?;
+        header_size += 2;
         // write the schema, set the written bytes to the schema + header
         let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
         let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
         Ok(Self {
             writer,
             write_options,
             schema: schema.clone(),
-            block_offsets: meta + data + 8,
+            block_offsets: meta + data + header_size,

Review Comment:
   What is this for?



##########
arrow/src/ipc/writer.rs:
##########
@@ -55,9 +61,50 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    batch_compression_type: CompressionCodecType,
 }
 
 impl IpcWriteOptions {
+    pub fn try_new_with_compression(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+        batch_compression_type: CompressionCodecType,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match batch_compression_type {
+            CompressionCodecType::NoCompression => {}
+            _ => {
+                if metadata_version != ipc::MetadataVersion::V5 {
+                    return Err(ArrowError::InvalidArgumentError(
+                        "Compress buffer just support from metadata v5".to_string(),
+                    ));
+                }
+            }
+        };
+        match metadata_version {
+            ipc::MetadataVersion::V5 => {
+                if write_legacy_ipc_format {
+                    Err(ArrowError::InvalidArgumentError(
+                        "Legacy IPC format only supported on metadata version 4"
+                            .to_string(),
+                    ))
+                } else {
+                    Ok(Self {
+                        alignment,
+                        write_legacy_ipc_format,
+                        metadata_version,
+                        batch_compression_type,
+                    })
+                }
+            }
+            z => panic!("Unsupported ipc::MetadataVersion {:?}", z),

Review Comment:
   `ipc::MetadataVersion::V4` seems also hitting this error? But actually there is `CompressionCodecType::NoCompression` indicating no compression is used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org