You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/20 14:48:49 UTC
[arrow-rs] branch master updated: Split out byte array decoders (#2318) (#2527)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9921cd579 Split out byte array decoders (#2318) (#2527)
9921cd579 is described below
commit 9921cd579acddf618c6a9bcaf78b0606920c460a
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sat Aug 20 15:48:45 2022 +0100
Split out byte array decoders (#2318) (#2527)
---
parquet/src/arrow/array_reader/byte_array.rs | 186 +++-----------------------
parquet/src/arrow/decoder/delta_byte_array.rs | 140 +++++++++++++++++++
parquet/src/arrow/decoder/dictionary_index.rs | 125 +++++++++++++++++
parquet/src/arrow/decoder/mod.rs | 24 ++++
parquet/src/arrow/mod.rs | 1 +
5 files changed, 309 insertions(+), 167 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs
index 172aeb96d..00d69d050 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -17,6 +17,7 @@
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
+use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
@@ -24,10 +25,7 @@ use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::data_type::Int32Type;
-use crate::encodings::{
- decoding::{Decoder, DeltaBitPackDecoder},
- rle::RleDecoder,
-};
+use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
@@ -486,45 +484,14 @@ impl ByteArrayDecoderDeltaLength {
/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
pub struct ByteArrayDecoderDelta {
- prefix_lengths: Vec<i32>,
- suffix_lengths: Vec<i32>,
- data: ByteBufferPtr,
- length_offset: usize,
- data_offset: usize,
- last_value: Vec<u8>,
+ decoder: DeltaByteArrayDecoder,
validate_utf8: bool,
}
impl ByteArrayDecoderDelta {
fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
- let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
- prefix.set_data(data.all(), 0)?;
-
- let num_prefix = prefix.values_left();
- let mut prefix_lengths = vec![0; num_prefix];
- assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
-
- let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
- suffix.set_data(data.start_from(prefix.get_offset()), 0)?;
-
- let num_suffix = suffix.values_left();
- let mut suffix_lengths = vec![0; num_suffix];
- assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
-
- if num_prefix != num_suffix {
- return Err(general_err!(format!(
- "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}",
- num_prefix, num_suffix
- )));
- }
-
Ok(Self {
- prefix_lengths,
- suffix_lengths,
- data,
- length_offset: 0,
- data_offset: prefix.get_offset() + suffix.get_offset(),
- last_value: vec![],
+ decoder: DeltaByteArrayDecoder::new(data)?,
validate_utf8,
})
}
@@ -535,104 +502,32 @@ impl ByteArrayDecoderDelta {
len: usize,
) -> Result<usize> {
let initial_values_length = output.values.len();
- assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());
-
- let to_read = len.min(self.prefix_lengths.len() - self.length_offset);
-
- output.offsets.reserve(to_read);
+ output.offsets.reserve(len.min(self.decoder.remaining()));
- let length_range = self.length_offset..self.length_offset + to_read;
- let iter = self.prefix_lengths[length_range.clone()]
- .iter()
- .zip(&self.suffix_lengths[length_range]);
-
- let data = self.data.as_ref();
-
- for (prefix_length, suffix_length) in iter {
- let prefix_length = *prefix_length as usize;
- let suffix_length = *suffix_length as usize;
-
- if self.data_offset + suffix_length > self.data.len() {
- return Err(ParquetError::EOF("eof decoding byte array".into()));
- }
-
- self.last_value.truncate(prefix_length);
- self.last_value.extend_from_slice(
- &data[self.data_offset..self.data_offset + suffix_length],
- );
- output.try_push(&self.last_value, self.validate_utf8)?;
-
- self.data_offset += suffix_length;
- }
-
- self.length_offset += to_read;
+ let read = self
+ .decoder
+ .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;
if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
}
- Ok(to_read)
+ Ok(read)
}
fn skip(&mut self, to_skip: usize) -> Result<usize> {
- let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);
-
- let length_range = self.length_offset..self.length_offset + to_skip;
- let iter = self.prefix_lengths[length_range.clone()]
- .iter()
- .zip(&self.suffix_lengths[length_range]);
-
- let data = self.data.as_ref();
-
- for (prefix_length, suffix_length) in iter {
- let prefix_length = *prefix_length as usize;
- let suffix_length = *suffix_length as usize;
-
- if self.data_offset + suffix_length > self.data.len() {
- return Err(ParquetError::EOF("eof decoding byte array".into()));
- }
-
- self.last_value.truncate(prefix_length);
- self.last_value.extend_from_slice(
- &data[self.data_offset..self.data_offset + suffix_length],
- );
- self.data_offset += suffix_length;
- }
- self.length_offset += to_skip;
- Ok(to_skip)
+ self.decoder.skip(to_skip)
}
}
/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
pub struct ByteArrayDecoderDictionary {
- /// Decoder for the dictionary offsets array
- decoder: RleDecoder,
-
- /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
- /// offsets
- index_buf: Box<[i32; 1024]>,
- /// Current length of `index_buf`
- index_buf_len: usize,
- /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
- /// the entire buffer and need to decode another chunk of offsets.
- index_offset: usize,
-
- /// This is a maximum as the null count is not always known, e.g. value data from
- /// a v1 data page
- max_remaining_values: usize,
+ decoder: DictIndexDecoder,
}
impl ByteArrayDecoderDictionary {
fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>) -> Self {
- let bit_width = data[0];
- let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(data.start_from(1));
-
Self {
- decoder,
- index_buf: Box::new([0; 1024]),
- index_buf_len: 0,
- index_offset: 0,
- max_remaining_values: num_values.unwrap_or(num_levels),
+ decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
}
@@ -642,38 +537,18 @@ impl ByteArrayDecoderDictionary {
dict: &OffsetBuffer<I>,
len: usize,
) -> Result<usize> {
+ // All data must be NULL
if dict.is_empty() {
- return Ok(0); // All data must be NULL
+ return Ok(0);
}
- let mut values_read = 0;
-
- while values_read != len && self.max_remaining_values != 0 {
- if self.index_offset == self.index_buf_len {
- // We've consumed the entire index buffer so we need to reload it before proceeding
- let read = self.decoder.get_batch(self.index_buf.as_mut())?;
- if read == 0 {
- break;
- }
- self.index_buf_len = read;
- self.index_offset = 0;
- }
-
- let to_read = (len - values_read)
- .min(self.index_buf_len - self.index_offset)
- .min(self.max_remaining_values);
-
+ self.decoder.read(len, |keys| {
output.extend_from_dictionary(
- &self.index_buf[self.index_offset..self.index_offset + to_read],
+ keys,
dict.offsets.as_slice(),
dict.values.as_slice(),
- )?;
-
- self.index_offset += to_read;
- values_read += to_read;
- self.max_remaining_values -= to_read;
- }
- Ok(values_read)
+ )
+ })
}
fn skip<I: OffsetSizeTrait + ScalarValue>(
@@ -681,35 +556,12 @@ impl ByteArrayDecoderDictionary {
dict: &OffsetBuffer<I>,
to_skip: usize,
) -> Result<usize> {
- let to_skip = to_skip.min(self.max_remaining_values);
// All data must be NULL
if dict.is_empty() {
return Ok(0);
}
- let mut values_skip = 0;
- while values_skip < to_skip {
- if self.index_offset == self.index_buf_len {
- // Instead of reloading the buffer, just skip in the decoder
- let skip = self.decoder.skip(to_skip - values_skip)?;
-
- if skip == 0 {
- break;
- }
-
- self.max_remaining_values -= skip;
- values_skip += skip;
- } else {
- // We still have indices buffered, so skip within the buffer
- let skip =
- (to_skip - values_skip).min(self.index_buf_len - self.index_offset);
-
- self.index_offset += skip;
- self.max_remaining_values -= skip;
- values_skip += skip;
- }
- }
- Ok(values_skip)
+ self.decoder.skip(to_skip)
}
}
diff --git a/parquet/src/arrow/decoder/delta_byte_array.rs b/parquet/src/arrow/decoder/delta_byte_array.rs
new file mode 100644
index 000000000..af73f4f25
--- /dev/null
+++ b/parquet/src/arrow/decoder/delta_byte_array.rs
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::data_type::Int32Type;
+use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
+use crate::errors::{ParquetError, Result};
+use crate::util::memory::ByteBufferPtr;
+
+/// Decoder for `Encoding::DELTA_BYTE_ARRAY`
+pub struct DeltaByteArrayDecoder {
+ prefix_lengths: Vec<i32>,
+ suffix_lengths: Vec<i32>,
+ data: ByteBufferPtr,
+ length_offset: usize,
+ data_offset: usize,
+ last_value: Vec<u8>,
+}
+
+impl DeltaByteArrayDecoder {
+ /// Create a new [`DeltaByteArrayDecoder`] with the provided data page
+ pub fn new(data: ByteBufferPtr) -> Result<Self> {
+ let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
+ prefix.set_data(data.all(), 0)?;
+
+ let num_prefix = prefix.values_left();
+ let mut prefix_lengths = vec![0; num_prefix];
+ assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
+
+ let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
+ suffix.set_data(data.start_from(prefix.get_offset()), 0)?;
+
+ let num_suffix = suffix.values_left();
+ let mut suffix_lengths = vec![0; num_suffix];
+ assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
+
+ if num_prefix != num_suffix {
+ return Err(general_err!(format!(
+ "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}",
+ num_prefix, num_suffix
+ )));
+ }
+
+ assert_eq!(prefix_lengths.len(), suffix_lengths.len());
+
+ Ok(Self {
+ prefix_lengths,
+ suffix_lengths,
+ data,
+ length_offset: 0,
+ data_offset: prefix.get_offset() + suffix.get_offset(),
+ last_value: vec![],
+ })
+ }
+
+ /// Returns the number of values remaining
+ pub fn remaining(&self) -> usize {
+ self.prefix_lengths.len() - self.length_offset
+ }
+
+ /// Read up to `len` values, returning the number of values read
+ /// and calling `f` with each decoded byte slice
+ ///
+ /// Will short-circuit and return on error
+ pub fn read<F>(&mut self, len: usize, mut f: F) -> Result<usize>
+ where
+ F: FnMut(&[u8]) -> Result<()>,
+ {
+ let to_read = len.min(self.remaining());
+
+ let length_range = self.length_offset..self.length_offset + to_read;
+ let iter = self.prefix_lengths[length_range.clone()]
+ .iter()
+ .zip(&self.suffix_lengths[length_range]);
+
+ let data = self.data.as_ref();
+
+ for (prefix_length, suffix_length) in iter {
+ let prefix_length = *prefix_length as usize;
+ let suffix_length = *suffix_length as usize;
+
+ if self.data_offset + suffix_length > self.data.len() {
+ return Err(ParquetError::EOF("eof decoding byte array".into()));
+ }
+
+ self.last_value.truncate(prefix_length);
+ self.last_value.extend_from_slice(
+ &data[self.data_offset..self.data_offset + suffix_length],
+ );
+ f(&self.last_value)?;
+
+ self.data_offset += suffix_length;
+ }
+
+ self.length_offset += to_read;
+ Ok(to_read)
+ }
+
+ /// Skip up to `to_skip` values, returning the number of values skipped
+ pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);
+
+ let length_range = self.length_offset..self.length_offset + to_skip;
+ let iter = self.prefix_lengths[length_range.clone()]
+ .iter()
+ .zip(&self.suffix_lengths[length_range]);
+
+ let data = self.data.as_ref();
+
+ for (prefix_length, suffix_length) in iter {
+ let prefix_length = *prefix_length as usize;
+ let suffix_length = *suffix_length as usize;
+
+ if self.data_offset + suffix_length > self.data.len() {
+ return Err(ParquetError::EOF("eof decoding byte array".into()));
+ }
+
+ self.last_value.truncate(prefix_length);
+ self.last_value.extend_from_slice(
+ &data[self.data_offset..self.data_offset + suffix_length],
+ );
+ self.data_offset += suffix_length;
+ }
+ self.length_offset += to_skip;
+ Ok(to_skip)
+ }
+}
diff --git a/parquet/src/arrow/decoder/dictionary_index.rs b/parquet/src/arrow/decoder/dictionary_index.rs
new file mode 100644
index 000000000..3d258309d
--- /dev/null
+++ b/parquet/src/arrow/decoder/dictionary_index.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::encodings::rle::RleDecoder;
+use crate::errors::Result;
+use crate::util::memory::ByteBufferPtr;
+
+/// Decoder for `Encoding::RLE_DICTIONARY` indices
+pub struct DictIndexDecoder {
+ /// Decoder for the dictionary offsets array
+ decoder: RleDecoder,
+
+ /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
+ /// offsets
+ index_buf: Box<[i32; 1024]>,
+ /// Current length of `index_buf`
+ index_buf_len: usize,
+ /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
+ /// the entire buffer and need to decode another chunk of offsets.
+ index_offset: usize,
+
+ /// This is a maximum as the null count is not always known, e.g. value data from
+ /// a v1 data page
+ max_remaining_values: usize,
+}
+
+impl DictIndexDecoder {
+ /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels
+ /// associated with this data page, and the number of non-null values (if known)
+ pub fn new(
+ data: ByteBufferPtr,
+ num_levels: usize,
+ num_values: Option<usize>,
+ ) -> Self {
+ let bit_width = data[0];
+ let mut decoder = RleDecoder::new(bit_width);
+ decoder.set_data(data.start_from(1));
+
+ Self {
+ decoder,
+ index_buf: Box::new([0; 1024]),
+ index_buf_len: 0,
+ index_offset: 0,
+ max_remaining_values: num_values.unwrap_or(num_levels),
+ }
+ }
+
+ /// Read up to `len` values, returning the number of values read
+ /// and calling `f` with each decoded dictionary index
+ ///
+ /// Will short-circuit and return on error
+ pub fn read<F: FnMut(&[i32]) -> Result<()>>(
+ &mut self,
+ len: usize,
+ mut f: F,
+ ) -> Result<usize> {
+ let mut values_read = 0;
+
+ while values_read != len && self.max_remaining_values != 0 {
+ if self.index_offset == self.index_buf_len {
+ // We've consumed the entire index buffer so we need to reload it before proceeding
+ let read = self.decoder.get_batch(self.index_buf.as_mut())?;
+ if read == 0 {
+ break;
+ }
+ self.index_buf_len = read;
+ self.index_offset = 0;
+ }
+
+ let to_read = (len - values_read)
+ .min(self.index_buf_len - self.index_offset)
+ .min(self.max_remaining_values);
+
+ f(&self.index_buf[self.index_offset..self.index_offset + to_read])?;
+
+ self.index_offset += to_read;
+ values_read += to_read;
+ self.max_remaining_values -= to_read;
+ }
+ Ok(values_read)
+ }
+
+ /// Skip up to `to_skip` values, returning the number of values skipped
+ pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
+ let to_skip = to_skip.min(self.max_remaining_values);
+
+ let mut values_skip = 0;
+ while values_skip < to_skip {
+ if self.index_offset == self.index_buf_len {
+ // Instead of reloading the buffer, just skip in the decoder
+ let skip = self.decoder.skip(to_skip - values_skip)?;
+
+ if skip == 0 {
+ break;
+ }
+
+ self.max_remaining_values -= skip;
+ values_skip += skip;
+ } else {
+ // We still have indices buffered, so skip within the buffer
+ let skip =
+ (to_skip - values_skip).min(self.index_buf_len - self.index_offset);
+
+ self.index_offset += skip;
+ self.max_remaining_values -= skip;
+ values_skip += skip;
+ }
+ }
+ Ok(values_skip)
+ }
+}
diff --git a/parquet/src/arrow/decoder/mod.rs b/parquet/src/arrow/decoder/mod.rs
new file mode 100644
index 000000000..dc1000ffd
--- /dev/null
+++ b/parquet/src/arrow/decoder/mod.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Specialized decoders optimised for decoding to arrow format
+
+mod delta_byte_array;
+mod dictionary_index;
+
+pub use delta_byte_array::DeltaByteArrayDecoder;
+pub use dictionary_index::DictIndexDecoder;
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index dda602589..c0de656bf 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -123,6 +123,7 @@ experimental!(mod array_reader);
pub mod arrow_reader;
pub mod arrow_writer;
mod buffer;
+mod decoder;
#[cfg(feature = "async")]
pub mod async_reader;