You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/12 08:13:41 UTC

[GitHub] [arrow-rs] tustvold commented on a diff in pull request #2369: support compression for IPC with revamped feature flags

tustvold commented on code in PR #2369:
URL: https://github.com/apache/arrow-rs/pull/2369#discussion_r941184959


##########
arrow/src/ipc/compression/codec.rs:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::buffer::Buffer;
+use crate::error::{ArrowError, Result};
+use crate::ipc::CompressionType;
+use std::io::{Read, Write};
+
+const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
+const LENGTH_OF_PREFIX_DATA: i64 = 8;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+/// Represents compressing a ipc stream using a particular compression algorithm
+pub enum CompressionCodec {
+    Lz4Frame,
+    Zstd,
+}
+
+impl TryFrom<CompressionType> for CompressionCodec {
+    type Error = ArrowError;
+
+    fn try_from(compression_type: CompressionType) -> Result<Self> {
+        match compression_type {
+            CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
+            CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
+            other_type => Err(ArrowError::NotYetImplemented(format!(
+                "compression type {:?} not supported ",
+                other_type
+            ))),
+        }
+    }
+}
+
+impl CompressionCodec {
+    /// Compresses the data in `input` to `output` and appends the
+    /// data using the specified compression mechanism.
+    ///
+    /// returns the number of bytes written to the stream
+    ///
+    /// Writes this format to output:
+    /// ```text
+    /// [8 bytes]:         uncompressed length
+    /// [reminaing bytes]: compressed data stream
+    /// ```
+    pub(crate) fn compress_to_vec(
+        &self,
+        input: &[u8],
+        output: &mut Vec<u8>,
+    ) -> Result<usize> {
+        let uncompressed_data_len = input.len();
+        let original_output_len = output.len();
+
+        if uncompressed_data_len == 0 {

Review Comment:
   ```suggestion
           if input.is_empty() {
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -322,13 +350,28 @@ impl IpcDataGenerator {
         &self,
         batch: &RecordBatch,
         write_options: &IpcWriteOptions,
-    ) -> EncodedData {
+    ) -> Result<EncodedData> {
         let mut fbb = FlatBufferBuilder::new();
 
         let mut nodes: Vec<ipc::FieldNode> = vec![];
         let mut buffers: Vec<ipc::Buffer> = vec![];
         let mut arrow_data: Vec<u8> = vec![];
         let mut offset = 0;
+
+        // get the type of compression
+        let batch_compression_type = write_options.batch_compression_type;
+
+        let compression = batch_compression_type.map(|batch_compression_type| {
+            let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
+            c.add_method(ipc::BodyCompressionMethod::BUFFER);
+            c.add_codec(batch_compression_type);
+            c.finish()
+        });
+
+        let compression_codec: Option<CompressionCodec> = batch_compression_type
+            .map(|batch_compression_type| batch_compression_type.try_into())

Review Comment:
   ```suggestion
               .map(TryInto::try_into)
   ```



##########
arrow/src/ipc/writer.rs:
##########
@@ -339,19 +382,26 @@ impl IpcDataGenerator {
                 offset,
                 array.len(),
                 array.null_count(),
+                &compression_codec,
                 write_options,
-            );
+            )?;
         }
+        // pad the tail of body data

Review Comment:
   I'm guessing this previously wasn't needed because the buffers were already 8 byte aligned



##########
arrow/src/ipc/writer.rs:
##########
@@ -520,17 +593,20 @@ impl<W: Write> FileWriter<W> {
         let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write magic to header
+        let mut header_size: usize = 0;

Review Comment:
   ```suggestion
           let header_size: usize = ARROW_MAGIC.len() + 2;
   ```
   
   ?



##########
arrow/src/ipc/compression/stub.rs:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stubs that implement the same interface as ipc_compression
+//! but always error.
+
+use crate::buffer::Buffer;
+use crate::error::{ArrowError, Result};
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum CompressionCodec {}
+
+impl TryFrom<CompressionCodec> for CompressionType {
+    type Error = ArrowError;
+    fn try_from(codec: CompressionCodec) -> Result<Self> {
+        return Err(ArrowError::InvalidArgumentError(
+            format!("codec type {:?} not supported because arrow was not compiled with the ipc_compression feature", codec)));
+    }
+}
+
+impl TryFrom<CompressionType> for CompressionCodec {
+    type Error = ArrowError;
+
+    fn try_from(compression_type: CompressionType) -> Result<Self> {
+        Err(ArrowError::InvalidArgumentError(
+            format!("compression type {:?} not supported because arrow was not compiled with the ipc_compression feature", compression_type))
+            )
+    }
+}
+
+impl CompressionCodec {
+    #[allow(clippy::ptr_arg)]

Review Comment:
   I might be misunderstanding this lint, but I don't think it should be firing for this method



##########
arrow/src/ipc/compression/codec.rs:
##########
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::buffer::Buffer;
+use crate::error::{ArrowError, Result};
+use crate::ipc::CompressionType;
+use std::io::{Read, Write};
+
+const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
+const LENGTH_OF_PREFIX_DATA: i64 = 8;
+
+#[derive(Debug, Clone, Copy, PartialEq)]
+/// Represents compressing a ipc stream using a particular compression algorithm
+pub enum CompressionCodec {
+    Lz4Frame,
+    Zstd,
+}
+
+impl TryFrom<CompressionType> for CompressionCodec {
+    type Error = ArrowError;
+
+    fn try_from(compression_type: CompressionType) -> Result<Self> {
+        match compression_type {
+            CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
+            CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
+            other_type => Err(ArrowError::NotYetImplemented(format!(
+                "compression type {:?} not supported ",
+                other_type
+            ))),
+        }
+    }
+}
+
+impl CompressionCodec {
+    /// Compresses the data in `input` to `output` and appends the
+    /// data using the specified compression mechanism.
+    ///
+    /// returns the number of bytes written to the stream
+    ///
+    /// Writes this format to output:
+    /// ```text
+    /// [8 bytes]:         uncompressed length
+    /// [reminaing bytes]: compressed data stream
+    /// ```
+    pub(crate) fn compress_to_vec(
+        &self,
+        input: &[u8],
+        output: &mut Vec<u8>,
+    ) -> Result<usize> {
+        let uncompressed_data_len = input.len();
+        let original_output_len = output.len();
+
+        if uncompressed_data_len == 0 {
+            // empty input, nothing to do
+        } else {
+            // write compressed data directly into the output buffer
+            output.extend_from_slice(&uncompressed_data_len.to_le_bytes());
+            self.compress(input, output)?;
+
+            let compression_len = output.len();
+            if compression_len > uncompressed_data_len {
+                // length of compressed data was larger than
+                // uncompressed data, use the uncompressed data with
+                // length -1 to indicate that we don't compress the
+                // data
+                output.truncate(original_output_len);
+                output.extend_from_slice(&LENGTH_NO_COMPRESSED_DATA.to_le_bytes());
+                output.extend_from_slice(input);
+            }
+        }
+        Ok(output.len() - original_output_len)
+    }
+
+    /// Decompresses the input into a [`Buffer`]
+    ///
+    /// The input should look like:
+    /// ```text
+    /// [8 bytes]:         uncompressed length
+    /// [reminaing bytes]: compressed data stream
+    /// ```
+    pub(crate) fn decompress_to_buffer(&self, input: &[u8]) -> Result<Buffer> {
+        // read the first 8 bytes to determine if the data is
+        // compressed
+        let decompressed_length = read_uncompressed_size(input);
+        let buffer = if decompressed_length == 0 {
+            // emtpy
+            let empty = Vec::<u8>::new();
+            Buffer::from(empty)
+        } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+            // no compression
+            let data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
+            Buffer::from(data)

Review Comment:
   Just an observation, but this copy is kind of unfortunate (although existed before)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org