You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/10/29 03:24:32 UTC

[arrow-rs] branch master updated: Pass decompressed size to parquet Codec::decompress (#2956) (#2959)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 344c552d7 Pass decompressed size to parquet Codec::decompress (#2956) (#2959)
344c552d7 is described below

commit 344c552d701374582ac1aff198e62acb9907afb6
Author: Adrián Gallego Castellanos <ku...@openmailbox.org>
AuthorDate: Sat Oct 29 05:24:26 2022 +0200

    Pass decompressed size to parquet Codec::decompress (#2956) (#2959)
    
    * Pass decompressed size to parquet Codec::decompress (#2956)
    
    Added optional argument uncompressed_size to Coded::decompress to do a better
    estimation of the required uncompress size.
    
    * snappy: Probably no much improvement as `decompress_len` is already accurate.
    * gzip: No improvement. Ignores the size hint.
    * brotli: Probably no much improvement. The buffer size will be equal to the uncompressed_size size.
    * lz4: No improvement. As the buffer is located at the stack there are no extra allocations. Then it probably is better to keep it working as it is.
    * zstd: No improvement. Ignores the size hint.
    * lz4_raw: Improvement. The estimation method over-estimates, so knowin the uncompressed size reduces allocations.
    
    * Do not include header size in uncompressed_size.
    
    A page may contain header, uncompressed size includes the header size. The `decompress` method expects to receive the `uncompress_size` for the compress block, that is without the page headers.
    
    Co-authored-by: Adrián Gallego Castellanos <ku...@gmail.com>
---
 parquet/src/compression.rs            | 67 +++++++++++++++++++++++++----------
 parquet/src/file/serialized_reader.rs |  6 +++-
 2 files changed, 54 insertions(+), 19 deletions(-)

diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index f110e3d82..310dbd34f 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -38,7 +38,7 @@ let mut compressed = vec![];
 codec.compress(&data[..], &mut compressed).unwrap();
 
 let mut output = vec![];
-codec.decompress(&compressed[..], &mut output).unwrap();
+codec.decompress(&compressed[..], &mut output, None).unwrap();
 
 assert_eq!(output, data);
 ```
@@ -57,9 +57,18 @@ pub trait Codec: Send {
     fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
 
     /// Decompresses data stored in slice `input_buf` and appends output to `output_buf`.
+    ///
+    /// If the uncompress_size is provided it will allocate the exact amount of memory.
+    /// Otherwise, it will estimate the uncompressed size, allocating an amount of memory
+    /// greater or equal to the real uncompress_size.
+    ///
     /// Returns the total number of bytes written.
-    fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>)
-        -> Result<usize>;
+    fn decompress(
+        &mut self,
+        input_buf: &[u8],
+        output_buf: &mut Vec<u8>,
+        uncompress_size: Option<usize>,
+    ) -> Result<usize>;
 }
 
 /// Given the compression type `codec`, returns a codec used to compress and decompress
@@ -112,8 +121,12 @@ mod snappy_codec {
             &mut self,
             input_buf: &[u8],
             output_buf: &mut Vec<u8>,
+            uncompress_size: Option<usize>,
         ) -> Result<usize> {
-            let len = decompress_len(input_buf)?;
+            let len = match uncompress_size {
+                Some(size) => size,
+                None => decompress_len(input_buf)?,
+            };
             let offset = output_buf.len();
             output_buf.resize(offset + len, 0);
             self.decoder
@@ -161,6 +174,7 @@ mod gzip_codec {
             &mut self,
             input_buf: &[u8],
             output_buf: &mut Vec<u8>,
+            _uncompress_size: Option<usize>,
         ) -> Result<usize> {
             let mut decoder = read::GzDecoder::new(input_buf);
             decoder.read_to_end(output_buf).map_err(|e| e.into())
@@ -203,8 +217,10 @@ mod brotli_codec {
             &mut self,
             input_buf: &[u8],
             output_buf: &mut Vec<u8>,
+            uncompress_size: Option<usize>,
         ) -> Result<usize> {
-            brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
+            let buffer_size = uncompress_size.unwrap_or(BROTLI_DEFAULT_BUFFER_SIZE);
+            brotli::Decompressor::new(input_buf, buffer_size)
                 .read_to_end(output_buf)
                 .map_err(|e| e.into())
         }
@@ -248,6 +264,7 @@ mod lz4_codec {
             &mut self,
             input_buf: &[u8],
             output_buf: &mut Vec<u8>,
+            _uncompress_size: Option<usize>,
         ) -> Result<usize> {
             let mut decoder = lz4::Decoder::new(input_buf)?;
             let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
@@ -306,6 +323,7 @@ mod zstd_codec {
             &mut self,
             input_buf: &[u8],
             output_buf: &mut Vec<u8>,
+            _uncompress_size: Option<usize>,
         ) -> Result<usize> {
             let mut decoder = zstd::Decoder::new(input_buf)?;
             match io::copy(&mut decoder, output_buf) {
@@ -353,16 +371,23 @@ mod lz4_raw_codec {
             &mut self,
             input_buf: &[u8],
             output_buf: &mut Vec<u8>,
+            uncompress_size: Option<usize>,
         ) -> Result<usize> {
             let offset = output_buf.len();
-            let required_len = max_uncompressed_size(input_buf.len());
+            let required_len =
+                uncompress_size.unwrap_or_else(|| max_uncompressed_size(input_buf.len()));
             output_buf.resize(offset + required_len, 0);
-            let required_len: i32 = required_len.try_into().unwrap();
-            match lz4::block::decompress_to_buffer(input_buf, Some(required_len), &mut output_buf[offset..]) {
+            match lz4::block::decompress_to_buffer(
+                input_buf,
+                Some(required_len.try_into().unwrap()),
+                &mut output_buf[offset..],
+            ) {
                 Ok(n) => {
-                    output_buf.truncate(offset + n);
-                    Ok(n)   
-                },
+                    if n < required_len {
+                        output_buf.truncate(offset + n);
+                    }
+                    Ok(n)
+                }
                 Err(e) => Err(e.into()),
             }
         }
@@ -371,11 +396,16 @@ mod lz4_raw_codec {
             let offset = output_buf.len();
             let required_len = lz4::block::compress_bound(input_buf.len())?;
             output_buf.resize(offset + required_len, 0);
-            match lz4::block::compress_to_buffer(input_buf, None, false, &mut output_buf[offset..]) {
+            match lz4::block::compress_to_buffer(
+                input_buf,
+                None,
+                false,
+                &mut output_buf[offset..],
+            ) {
                 Ok(n) => {
                     output_buf.truncate(offset + n);
                     Ok(())
-                },
+                }
                 Err(e) => Err(e.into()),
             }
         }
@@ -390,7 +420,7 @@ mod tests {
 
     use crate::util::test_common::rand_gen::random_bytes;
 
-    fn test_roundtrip(c: CodecType, data: &[u8]) {
+    fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option<usize>) {
         let mut c1 = create_codec(c).unwrap().unwrap();
         let mut c2 = create_codec(c).unwrap().unwrap();
 
@@ -402,7 +432,7 @@ mod tests {
 
         // Decompress with c2
         let decompressed_size = c2
-            .decompress(compressed.as_slice(), &mut decompressed)
+            .decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
             .expect("Error when decompressing");
         assert_eq!(data.len(), decompressed_size);
         assert_eq!(data, decompressed.as_slice());
@@ -416,7 +446,7 @@ mod tests {
 
         // Decompress with c1
         let decompressed_size = c1
-            .decompress(compressed.as_slice(), &mut decompressed)
+            .decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
             .expect("Error when decompressing");
         assert_eq!(data.len(), decompressed_size);
         assert_eq!(data, decompressed.as_slice());
@@ -435,7 +465,7 @@ mod tests {
         assert_eq!(&compressed[..4], prefix);
 
         let decompressed_size = c2
-            .decompress(&compressed[4..], &mut decompressed)
+            .decompress(&compressed[4..], &mut decompressed, uncompress_size)
             .expect("Error when decompressing");
 
         assert_eq!(data.len(), decompressed_size);
@@ -447,7 +477,8 @@ mod tests {
         let sizes = vec![100, 10000, 100000];
         for size in sizes {
             let data = random_bytes(size);
-            test_roundtrip(c, &data);
+            test_roundtrip(c, &data, None);
+            test_roundtrip(c, &data, Some(data.len()));
         }
     }
 
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 6b416e34d..854ae1ef6 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -420,7 +420,11 @@ pub(crate) fn decode_page(
             let mut decompressed = Vec::with_capacity(uncompressed_size);
             let compressed = &buffer.as_ref()[offset..];
             decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
-            decompressor.decompress(compressed, &mut decompressed)?;
+            decompressor.decompress(
+                compressed,
+                &mut decompressed,
+                Some(uncompressed_size - offset),
+            )?;
 
             if decompressed.len() != uncompressed_size {
                 return Err(general_err!(