You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/10/29 03:24:32 UTC
[arrow-rs] branch master updated: Pass decompressed size to parquet Codec::decompress (#2956) (#2959)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 344c552d7 Pass decompressed size to parquet Codec::decompress (#2956) (#2959)
344c552d7 is described below
commit 344c552d701374582ac1aff198e62acb9907afb6
Author: Adrián Gallego Castellanos <ku...@openmailbox.org>
AuthorDate: Sat Oct 29 05:24:26 2022 +0200
Pass decompressed size to parquet Codec::decompress (#2956) (#2959)
* Pass decompressed size to parquet Codec::decompress (#2956)
Added optional argument uncompressed_size to Coded::decompress to do a better
estimation of the required uncompress size.
* snappy: Probably no much improvement as `decompress_len` is already accurate.
* gzip: No improvement. Ignores the size hint.
* brotli: Probably no much improvement. The buffer size will be equal to the uncompressed_size size.
* lz4: No improvement. As the buffer is located at the stack there are no extra allocations. Then it probably is better to keep it working as it is.
* zstd: No improvement. Ignores the size hint.
* lz4_raw: Improvement. The estimation method over-estimates, so knowin the uncompressed size reduces allocations.
* Do not include header size in uncompressed_size.
A page may contain header, uncompressed size includes the header size. The `decompress` method expects to receive the `uncompress_size` for the compress block, that is without the page headers.
Co-authored-by: Adrián Gallego Castellanos <ku...@gmail.com>
---
parquet/src/compression.rs | 67 +++++++++++++++++++++++++----------
parquet/src/file/serialized_reader.rs | 6 +++-
2 files changed, 54 insertions(+), 19 deletions(-)
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index f110e3d82..310dbd34f 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -38,7 +38,7 @@ let mut compressed = vec![];
codec.compress(&data[..], &mut compressed).unwrap();
let mut output = vec![];
-codec.decompress(&compressed[..], &mut output).unwrap();
+codec.decompress(&compressed[..], &mut output, None).unwrap();
assert_eq!(output, data);
```
@@ -57,9 +57,18 @@ pub trait Codec: Send {
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()>;
/// Decompresses data stored in slice `input_buf` and appends output to `output_buf`.
+ ///
+ /// If the uncompress_size is provided it will allocate the exact amount of memory.
+ /// Otherwise, it will estimate the uncompressed size, allocating an amount of memory
+ /// greater or equal to the real uncompress_size.
+ ///
/// Returns the total number of bytes written.
- fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>)
- -> Result<usize>;
+ fn decompress(
+ &mut self,
+ input_buf: &[u8],
+ output_buf: &mut Vec<u8>,
+ uncompress_size: Option<usize>,
+ ) -> Result<usize>;
}
/// Given the compression type `codec`, returns a codec used to compress and decompress
@@ -112,8 +121,12 @@ mod snappy_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
+ uncompress_size: Option<usize>,
) -> Result<usize> {
- let len = decompress_len(input_buf)?;
+ let len = match uncompress_size {
+ Some(size) => size,
+ None => decompress_len(input_buf)?,
+ };
let offset = output_buf.len();
output_buf.resize(offset + len, 0);
self.decoder
@@ -161,6 +174,7 @@ mod gzip_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
+ _uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = read::GzDecoder::new(input_buf);
decoder.read_to_end(output_buf).map_err(|e| e.into())
@@ -203,8 +217,10 @@ mod brotli_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
+ uncompress_size: Option<usize>,
) -> Result<usize> {
- brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
+ let buffer_size = uncompress_size.unwrap_or(BROTLI_DEFAULT_BUFFER_SIZE);
+ brotli::Decompressor::new(input_buf, buffer_size)
.read_to_end(output_buf)
.map_err(|e| e.into())
}
@@ -248,6 +264,7 @@ mod lz4_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
+ _uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = lz4::Decoder::new(input_buf)?;
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
@@ -306,6 +323,7 @@ mod zstd_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
+ _uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
@@ -353,16 +371,23 @@ mod lz4_raw_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
+ uncompress_size: Option<usize>,
) -> Result<usize> {
let offset = output_buf.len();
- let required_len = max_uncompressed_size(input_buf.len());
+ let required_len =
+ uncompress_size.unwrap_or_else(|| max_uncompressed_size(input_buf.len()));
output_buf.resize(offset + required_len, 0);
- let required_len: i32 = required_len.try_into().unwrap();
- match lz4::block::decompress_to_buffer(input_buf, Some(required_len), &mut output_buf[offset..]) {
+ match lz4::block::decompress_to_buffer(
+ input_buf,
+ Some(required_len.try_into().unwrap()),
+ &mut output_buf[offset..],
+ ) {
Ok(n) => {
- output_buf.truncate(offset + n);
- Ok(n)
- },
+ if n < required_len {
+ output_buf.truncate(offset + n);
+ }
+ Ok(n)
+ }
Err(e) => Err(e.into()),
}
}
@@ -371,11 +396,16 @@ mod lz4_raw_codec {
let offset = output_buf.len();
let required_len = lz4::block::compress_bound(input_buf.len())?;
output_buf.resize(offset + required_len, 0);
- match lz4::block::compress_to_buffer(input_buf, None, false, &mut output_buf[offset..]) {
+ match lz4::block::compress_to_buffer(
+ input_buf,
+ None,
+ false,
+ &mut output_buf[offset..],
+ ) {
Ok(n) => {
output_buf.truncate(offset + n);
Ok(())
- },
+ }
Err(e) => Err(e.into()),
}
}
@@ -390,7 +420,7 @@ mod tests {
use crate::util::test_common::rand_gen::random_bytes;
- fn test_roundtrip(c: CodecType, data: &[u8]) {
+ fn test_roundtrip(c: CodecType, data: &[u8], uncompress_size: Option<usize>) {
let mut c1 = create_codec(c).unwrap().unwrap();
let mut c2 = create_codec(c).unwrap().unwrap();
@@ -402,7 +432,7 @@ mod tests {
// Decompress with c2
let decompressed_size = c2
- .decompress(compressed.as_slice(), &mut decompressed)
+ .decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
assert_eq!(data, decompressed.as_slice());
@@ -416,7 +446,7 @@ mod tests {
// Decompress with c1
let decompressed_size = c1
- .decompress(compressed.as_slice(), &mut decompressed)
+ .decompress(compressed.as_slice(), &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
assert_eq!(data, decompressed.as_slice());
@@ -435,7 +465,7 @@ mod tests {
assert_eq!(&compressed[..4], prefix);
let decompressed_size = c2
- .decompress(&compressed[4..], &mut decompressed)
+ .decompress(&compressed[4..], &mut decompressed, uncompress_size)
.expect("Error when decompressing");
assert_eq!(data.len(), decompressed_size);
@@ -447,7 +477,8 @@ mod tests {
let sizes = vec![100, 10000, 100000];
for size in sizes {
let data = random_bytes(size);
- test_roundtrip(c, &data);
+ test_roundtrip(c, &data, None);
+ test_roundtrip(c, &data, Some(data.len()));
}
}
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 6b416e34d..854ae1ef6 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -420,7 +420,11 @@ pub(crate) fn decode_page(
let mut decompressed = Vec::with_capacity(uncompressed_size);
let compressed = &buffer.as_ref()[offset..];
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
- decompressor.decompress(compressed, &mut decompressed)?;
+ decompressor.decompress(
+ compressed,
+ &mut decompressed,
+ Some(uncompressed_size - offset),
+ )?;
if decompressed.len() != uncompressed_size {
return Err(general_err!(