You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/20 14:48:49 UTC

[arrow-rs] branch master updated: Split out byte array decoders (#2318) (#2527)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9921cd579 Split out byte array decoders (#2318) (#2527)
9921cd579 is described below

commit 9921cd579acddf618c6a9bcaf78b0606920c460a
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sat Aug 20 15:48:45 2022 +0100

    Split out byte array decoders (#2318) (#2527)
---
 parquet/src/arrow/array_reader/byte_array.rs  | 186 +++-----------------------
 parquet/src/arrow/decoder/delta_byte_array.rs | 140 +++++++++++++++++++
 parquet/src/arrow/decoder/dictionary_index.rs | 125 +++++++++++++++++
 parquet/src/arrow/decoder/mod.rs              |  24 ++++
 parquet/src/arrow/mod.rs                      |   1 +
 5 files changed, 309 insertions(+), 167 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs
index 172aeb96d..00d69d050 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -17,6 +17,7 @@
 
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
 use crate::arrow::buffer::offset_buffer::OffsetBuffer;
+use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
 use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
@@ -24,10 +25,7 @@ use crate::basic::{ConvertedType, Encoding};
 use crate::column::page::PageIterator;
 use crate::column::reader::decoder::ColumnValueDecoder;
 use crate::data_type::Int32Type;
-use crate::encodings::{
-    decoding::{Decoder, DeltaBitPackDecoder},
-    rle::RleDecoder,
-};
+use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::memory::ByteBufferPtr;
@@ -486,45 +484,14 @@ impl ByteArrayDecoderDeltaLength {
 
 /// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
 pub struct ByteArrayDecoderDelta {
-    prefix_lengths: Vec<i32>,
-    suffix_lengths: Vec<i32>,
-    data: ByteBufferPtr,
-    length_offset: usize,
-    data_offset: usize,
-    last_value: Vec<u8>,
+    decoder: DeltaByteArrayDecoder,
     validate_utf8: bool,
 }
 
 impl ByteArrayDecoderDelta {
     fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
-        let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
-        prefix.set_data(data.all(), 0)?;
-
-        let num_prefix = prefix.values_left();
-        let mut prefix_lengths = vec![0; num_prefix];
-        assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
-
-        let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
-        suffix.set_data(data.start_from(prefix.get_offset()), 0)?;
-
-        let num_suffix = suffix.values_left();
-        let mut suffix_lengths = vec![0; num_suffix];
-        assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
-
-        if num_prefix != num_suffix {
-            return Err(general_err!(format!(
-                "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}",
-                num_prefix, num_suffix
-            )));
-        }
-
         Ok(Self {
-            prefix_lengths,
-            suffix_lengths,
-            data,
-            length_offset: 0,
-            data_offset: prefix.get_offset() + suffix.get_offset(),
-            last_value: vec![],
+            decoder: DeltaByteArrayDecoder::new(data)?,
             validate_utf8,
         })
     }
@@ -535,104 +502,32 @@ impl ByteArrayDecoderDelta {
         len: usize,
     ) -> Result<usize> {
         let initial_values_length = output.values.len();
-        assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());
-
-        let to_read = len.min(self.prefix_lengths.len() - self.length_offset);
-
-        output.offsets.reserve(to_read);
+        output.offsets.reserve(len.min(self.decoder.remaining()));
 
-        let length_range = self.length_offset..self.length_offset + to_read;
-        let iter = self.prefix_lengths[length_range.clone()]
-            .iter()
-            .zip(&self.suffix_lengths[length_range]);
-
-        let data = self.data.as_ref();
-
-        for (prefix_length, suffix_length) in iter {
-            let prefix_length = *prefix_length as usize;
-            let suffix_length = *suffix_length as usize;
-
-            if self.data_offset + suffix_length > self.data.len() {
-                return Err(ParquetError::EOF("eof decoding byte array".into()));
-            }
-
-            self.last_value.truncate(prefix_length);
-            self.last_value.extend_from_slice(
-                &data[self.data_offset..self.data_offset + suffix_length],
-            );
-            output.try_push(&self.last_value, self.validate_utf8)?;
-
-            self.data_offset += suffix_length;
-        }
-
-        self.length_offset += to_read;
+        let read = self
+            .decoder
+            .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;
 
         if self.validate_utf8 {
             output.check_valid_utf8(initial_values_length)?;
         }
-        Ok(to_read)
+        Ok(read)
     }
 
     fn skip(&mut self, to_skip: usize) -> Result<usize> {
-        let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);
-
-        let length_range = self.length_offset..self.length_offset + to_skip;
-        let iter = self.prefix_lengths[length_range.clone()]
-            .iter()
-            .zip(&self.suffix_lengths[length_range]);
-
-        let data = self.data.as_ref();
-
-        for (prefix_length, suffix_length) in iter {
-            let prefix_length = *prefix_length as usize;
-            let suffix_length = *suffix_length as usize;
-
-            if self.data_offset + suffix_length > self.data.len() {
-                return Err(ParquetError::EOF("eof decoding byte array".into()));
-            }
-
-            self.last_value.truncate(prefix_length);
-            self.last_value.extend_from_slice(
-                &data[self.data_offset..self.data_offset + suffix_length],
-            );
-            self.data_offset += suffix_length;
-        }
-        self.length_offset += to_skip;
-        Ok(to_skip)
+        self.decoder.skip(to_skip)
     }
 }
 
 /// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
 pub struct ByteArrayDecoderDictionary {
-    /// Decoder for the dictionary offsets array
-    decoder: RleDecoder,
-
-    /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
-    /// offsets
-    index_buf: Box<[i32; 1024]>,
-    /// Current length of `index_buf`
-    index_buf_len: usize,
-    /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
-    /// the entire buffer and need to decode another chunk of offsets.
-    index_offset: usize,
-
-    /// This is a maximum as the null count is not always known, e.g. value data from
-    /// a v1 data page
-    max_remaining_values: usize,
+    decoder: DictIndexDecoder,
 }
 
 impl ByteArrayDecoderDictionary {
     fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>) -> Self {
-        let bit_width = data[0];
-        let mut decoder = RleDecoder::new(bit_width);
-        decoder.set_data(data.start_from(1));
-
         Self {
-            decoder,
-            index_buf: Box::new([0; 1024]),
-            index_buf_len: 0,
-            index_offset: 0,
-            max_remaining_values: num_values.unwrap_or(num_levels),
+            decoder: DictIndexDecoder::new(data, num_levels, num_values),
         }
     }
 
@@ -642,38 +537,18 @@ impl ByteArrayDecoderDictionary {
         dict: &OffsetBuffer<I>,
         len: usize,
     ) -> Result<usize> {
+        // All data must be NULL
         if dict.is_empty() {
-            return Ok(0); // All data must be NULL
+            return Ok(0);
         }
 
-        let mut values_read = 0;
-
-        while values_read != len && self.max_remaining_values != 0 {
-            if self.index_offset == self.index_buf_len {
-                // We've consumed the entire index buffer so we need to reload it before proceeding
-                let read = self.decoder.get_batch(self.index_buf.as_mut())?;
-                if read == 0 {
-                    break;
-                }
-                self.index_buf_len = read;
-                self.index_offset = 0;
-            }
-
-            let to_read = (len - values_read)
-                .min(self.index_buf_len - self.index_offset)
-                .min(self.max_remaining_values);
-
+        self.decoder.read(len, |keys| {
             output.extend_from_dictionary(
-                &self.index_buf[self.index_offset..self.index_offset + to_read],
+                keys,
                 dict.offsets.as_slice(),
                 dict.values.as_slice(),
-            )?;
-
-            self.index_offset += to_read;
-            values_read += to_read;
-            self.max_remaining_values -= to_read;
-        }
-        Ok(values_read)
+            )
+        })
     }
 
     fn skip<I: OffsetSizeTrait + ScalarValue>(
@@ -681,35 +556,12 @@ impl ByteArrayDecoderDictionary {
         dict: &OffsetBuffer<I>,
         to_skip: usize,
     ) -> Result<usize> {
-        let to_skip = to_skip.min(self.max_remaining_values);
         // All data must be NULL
         if dict.is_empty() {
             return Ok(0);
         }
 
-        let mut values_skip = 0;
-        while values_skip < to_skip {
-            if self.index_offset == self.index_buf_len {
-                // Instead of reloading the buffer, just skip in the decoder
-                let skip = self.decoder.skip(to_skip - values_skip)?;
-
-                if skip == 0 {
-                    break;
-                }
-
-                self.max_remaining_values -= skip;
-                values_skip += skip;
-            } else {
-                // We still have indices buffered, so skip within the buffer
-                let skip =
-                    (to_skip - values_skip).min(self.index_buf_len - self.index_offset);
-
-                self.index_offset += skip;
-                self.max_remaining_values -= skip;
-                values_skip += skip;
-            }
-        }
-        Ok(values_skip)
+        self.decoder.skip(to_skip)
     }
 }
 
diff --git a/parquet/src/arrow/decoder/delta_byte_array.rs b/parquet/src/arrow/decoder/delta_byte_array.rs
new file mode 100644
index 000000000..af73f4f25
--- /dev/null
+++ b/parquet/src/arrow/decoder/delta_byte_array.rs
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::data_type::Int32Type;
+use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
+use crate::errors::{ParquetError, Result};
+use crate::util::memory::ByteBufferPtr;
+
+/// Decoder for `Encoding::DELTA_BYTE_ARRAY`
+pub struct DeltaByteArrayDecoder {
+    prefix_lengths: Vec<i32>,
+    suffix_lengths: Vec<i32>,
+    data: ByteBufferPtr,
+    length_offset: usize,
+    data_offset: usize,
+    last_value: Vec<u8>,
+}
+
+impl DeltaByteArrayDecoder {
+    /// Create a new [`DeltaByteArrayDecoder`] with the provided data page
+    pub fn new(data: ByteBufferPtr) -> Result<Self> {
+        let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
+        prefix.set_data(data.all(), 0)?;
+
+        let num_prefix = prefix.values_left();
+        let mut prefix_lengths = vec![0; num_prefix];
+        assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);
+
+        let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
+        suffix.set_data(data.start_from(prefix.get_offset()), 0)?;
+
+        let num_suffix = suffix.values_left();
+        let mut suffix_lengths = vec![0; num_suffix];
+        assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);
+
+        if num_prefix != num_suffix {
+            return Err(general_err!(format!(
+                "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}",
+                num_prefix, num_suffix
+            )));
+        }
+
+        assert_eq!(prefix_lengths.len(), suffix_lengths.len());
+
+        Ok(Self {
+            prefix_lengths,
+            suffix_lengths,
+            data,
+            length_offset: 0,
+            data_offset: prefix.get_offset() + suffix.get_offset(),
+            last_value: vec![],
+        })
+    }
+
+    /// Returns the number of values remaining
+    pub fn remaining(&self) -> usize {
+        self.prefix_lengths.len() - self.length_offset
+    }
+
+    /// Read up to `len` values, returning the number of values read
+    /// and calling `f` with each decoded byte slice
+    ///
+    /// Will short-circuit and return on error
+    pub fn read<F>(&mut self, len: usize, mut f: F) -> Result<usize>
+    where
+        F: FnMut(&[u8]) -> Result<()>,
+    {
+        let to_read = len.min(self.remaining());
+
+        let length_range = self.length_offset..self.length_offset + to_read;
+        let iter = self.prefix_lengths[length_range.clone()]
+            .iter()
+            .zip(&self.suffix_lengths[length_range]);
+
+        let data = self.data.as_ref();
+
+        for (prefix_length, suffix_length) in iter {
+            let prefix_length = *prefix_length as usize;
+            let suffix_length = *suffix_length as usize;
+
+            if self.data_offset + suffix_length > self.data.len() {
+                return Err(ParquetError::EOF("eof decoding byte array".into()));
+            }
+
+            self.last_value.truncate(prefix_length);
+            self.last_value.extend_from_slice(
+                &data[self.data_offset..self.data_offset + suffix_length],
+            );
+            f(&self.last_value)?;
+
+            self.data_offset += suffix_length;
+        }
+
+        self.length_offset += to_read;
+        Ok(to_read)
+    }
+
+    /// Skip up to `to_skip` values, returning the number of values skipped
+    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);
+
+        let length_range = self.length_offset..self.length_offset + to_skip;
+        let iter = self.prefix_lengths[length_range.clone()]
+            .iter()
+            .zip(&self.suffix_lengths[length_range]);
+
+        let data = self.data.as_ref();
+
+        for (prefix_length, suffix_length) in iter {
+            let prefix_length = *prefix_length as usize;
+            let suffix_length = *suffix_length as usize;
+
+            if self.data_offset + suffix_length > self.data.len() {
+                return Err(ParquetError::EOF("eof decoding byte array".into()));
+            }
+
+            self.last_value.truncate(prefix_length);
+            self.last_value.extend_from_slice(
+                &data[self.data_offset..self.data_offset + suffix_length],
+            );
+            self.data_offset += suffix_length;
+        }
+        self.length_offset += to_skip;
+        Ok(to_skip)
+    }
+}
diff --git a/parquet/src/arrow/decoder/dictionary_index.rs b/parquet/src/arrow/decoder/dictionary_index.rs
new file mode 100644
index 000000000..3d258309d
--- /dev/null
+++ b/parquet/src/arrow/decoder/dictionary_index.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::encodings::rle::RleDecoder;
+use crate::errors::Result;
+use crate::util::memory::ByteBufferPtr;
+
+/// Decoder for `Encoding::RLE_DICTIONARY` indices
+pub struct DictIndexDecoder {
+    /// Decoder for the dictionary offsets array
+    decoder: RleDecoder,
+
+    /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
+    /// offsets
+    index_buf: Box<[i32; 1024]>,
+    /// Current length of `index_buf`
+    index_buf_len: usize,
+    /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
+    /// the entire buffer and need to decode another chunk of offsets.
+    index_offset: usize,
+
+    /// This is a maximum as the null count is not always known, e.g. value data from
+    /// a v1 data page
+    max_remaining_values: usize,
+}
+
+impl DictIndexDecoder {
+    /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels
+    /// associated with this data page, and the number of non-null values (if known)
+    pub fn new(
+        data: ByteBufferPtr,
+        num_levels: usize,
+        num_values: Option<usize>,
+    ) -> Self {
+        let bit_width = data[0];
+        let mut decoder = RleDecoder::new(bit_width);
+        decoder.set_data(data.start_from(1));
+
+        Self {
+            decoder,
+            index_buf: Box::new([0; 1024]),
+            index_buf_len: 0,
+            index_offset: 0,
+            max_remaining_values: num_values.unwrap_or(num_levels),
+        }
+    }
+
+    /// Read up to `len` values, returning the number of values read
+    /// and calling `f` with each decoded dictionary index
+    ///
+    /// Will short-circuit and return on error
+    pub fn read<F: FnMut(&[i32]) -> Result<()>>(
+        &mut self,
+        len: usize,
+        mut f: F,
+    ) -> Result<usize> {
+        let mut values_read = 0;
+
+        while values_read != len && self.max_remaining_values != 0 {
+            if self.index_offset == self.index_buf_len {
+                // We've consumed the entire index buffer so we need to reload it before proceeding
+                let read = self.decoder.get_batch(self.index_buf.as_mut())?;
+                if read == 0 {
+                    break;
+                }
+                self.index_buf_len = read;
+                self.index_offset = 0;
+            }
+
+            let to_read = (len - values_read)
+                .min(self.index_buf_len - self.index_offset)
+                .min(self.max_remaining_values);
+
+            f(&self.index_buf[self.index_offset..self.index_offset + to_read])?;
+
+            self.index_offset += to_read;
+            values_read += to_read;
+            self.max_remaining_values -= to_read;
+        }
+        Ok(values_read)
+    }
+
+    /// Skip up to `to_skip` values, returning the number of values skipped
+    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
+        let to_skip = to_skip.min(self.max_remaining_values);
+
+        let mut values_skip = 0;
+        while values_skip < to_skip {
+            if self.index_offset == self.index_buf_len {
+                // Instead of reloading the buffer, just skip in the decoder
+                let skip = self.decoder.skip(to_skip - values_skip)?;
+
+                if skip == 0 {
+                    break;
+                }
+
+                self.max_remaining_values -= skip;
+                values_skip += skip;
+            } else {
+                // We still have indices buffered, so skip within the buffer
+                let skip =
+                    (to_skip - values_skip).min(self.index_buf_len - self.index_offset);
+
+                self.index_offset += skip;
+                self.max_remaining_values -= skip;
+                values_skip += skip;
+            }
+        }
+        Ok(values_skip)
+    }
+}
diff --git a/parquet/src/arrow/decoder/mod.rs b/parquet/src/arrow/decoder/mod.rs
new file mode 100644
index 000000000..dc1000ffd
--- /dev/null
+++ b/parquet/src/arrow/decoder/mod.rs
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Specialized decoders optimised for decoding to arrow format
+
+mod delta_byte_array;
+mod dictionary_index;
+
+pub use delta_byte_array::DeltaByteArrayDecoder;
+pub use dictionary_index::DictIndexDecoder;
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index dda602589..c0de656bf 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -123,6 +123,7 @@ experimental!(mod array_reader);
 pub mod arrow_reader;
 pub mod arrow_writer;
 mod buffer;
+mod decoder;
 
 #[cfg(feature = "async")]
 pub mod async_reader;