You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/08 18:07:56 UTC
[arrow-rs] branch master updated: Move record delimiting into ColumnReader (#4365) (#4376)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ab5669398 Move record delimiting into ColumnReader (#4365) (#4376)
ab5669398 is described below
commit ab56693985826bb8caea30558b8c25db286a5e37
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Jun 8 19:07:50 2023 +0100
Move record delimiting into ColumnReader (#4365) (#4376)
* Move record delimiting into ColumnReader (#4365)
* Misc tweaks
* More tests
* Clippy
* Review feedback
---
.../src/arrow/array_reader/fixed_len_byte_array.rs | 4 +-
parquet/src/arrow/arrow_reader/mod.rs | 182 ++++++++-
parquet/src/arrow/buffer/dictionary_buffer.rs | 30 +-
parquet/src/arrow/buffer/offset_buffer.rs | 29 +-
parquet/src/arrow/record_reader/buffer.rs | 38 +-
.../src/arrow/record_reader/definition_levels.rs | 88 ++---
parquet/src/arrow/record_reader/mod.rs | 209 ++--------
parquet/src/column/mod.rs | 11 +-
parquet/src/column/reader.rs | 240 ++++++-----
parquet/src/column/reader/decoder.rs | 439 +++++++++++----------
parquet/src/column/writer/mod.rs | 6 +-
parquet/src/file/writer.rs | 4 +-
parquet/src/record/triplet.rs | 11 +-
13 files changed, 649 insertions(+), 642 deletions(-)
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 47bd03a73..b06091b6b 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -247,8 +247,8 @@ impl BufferQueue for FixedLenByteArrayBuffer {
type Output = Buffer;
type Slice = Self;
- fn split_off(&mut self, len: usize) -> Self::Output {
- self.buffer.split_off(len * self.byte_length)
+ fn consume(&mut self) -> Self::Output {
+ self.buffer.consume()
}
fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index 432b00399..988738dac 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -548,12 +548,14 @@ mod tests {
use tempfile::tempfile;
use arrow_array::builder::*;
+ use arrow_array::cast::AsArray;
use arrow_array::types::{Decimal128Type, Decimal256Type, DecimalType};
use arrow_array::*;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_buffer::{i256, ArrowNativeType, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowDataType, Field, Fields, Schema};
+ use arrow_select::concat::concat_batches;
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
@@ -562,6 +564,7 @@ mod tests {
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
+ use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
FixedLenByteArrayType, Int32Type, Int64Type, Int96Type,
@@ -2131,15 +2134,15 @@ mod tests {
#[test]
fn test_row_group_exact_multiple() {
- use crate::arrow::record_reader::MIN_BATCH_SIZE;
+ const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
test_row_group_batch(8, 8);
test_row_group_batch(10, 8);
test_row_group_batch(8, 10);
- test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE);
- test_row_group_batch(MIN_BATCH_SIZE + 1, MIN_BATCH_SIZE);
- test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE + 1);
- test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
- test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
+ test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
+ test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
+ test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
+ test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
+ test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
}
/// Given a RecordBatch containing all the column data, return the expected batches given
@@ -2610,4 +2613,171 @@ mod tests {
test_decimal_roundtrip::<Decimal128Type>();
test_decimal_roundtrip::<Decimal256Type>();
}
+
+ #[test]
+ fn test_list_selection() {
+ let schema = Arc::new(Schema::new(vec![Field::new_list(
+ "list",
+ Field::new("item", ArrowDataType::Utf8, true),
+ false,
+ )]));
+ let mut buf = Vec::with_capacity(1024);
+
+ let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+ for i in 0..2 {
+ let mut list_a_builder = ListBuilder::new(StringBuilder::new());
+ for j in 0..1024 {
+ list_a_builder.values().append_value(format!("{i} {j}"));
+ list_a_builder.append(true);
+ }
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(list_a_builder.finish())],
+ )
+ .unwrap();
+ writer.write(&batch).unwrap();
+ }
+ let _metadata = writer.close().unwrap();
+
+ let buf = Bytes::from(buf);
+ let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
+ .unwrap()
+ .with_row_selection(RowSelection::from(vec![
+ RowSelector::skip(100),
+ RowSelector::select(924),
+ RowSelector::skip(100),
+ RowSelector::select(924),
+ ]))
+ .build()
+ .unwrap();
+
+ let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+ let batch = concat_batches(&schema, &batches).unwrap();
+
+ assert_eq!(batch.num_rows(), 924 * 2);
+ let list = batch.column(0).as_list::<i32>();
+
+ for w in list.value_offsets().windows(2) {
+ assert_eq!(w[0] + 1, w[1])
+ }
+ let mut values = list.values().as_string::<i32>().iter();
+
+ for i in 0..2 {
+ for j in 100..1024 {
+ let expected = format!("{i} {j}");
+ assert_eq!(values.next().unwrap().unwrap(), &expected);
+ }
+ }
+ }
+
+ #[test]
+ fn test_list_selection_fuzz() {
+ let mut rng = thread_rng();
+ let schema = Arc::new(Schema::new(vec![Field::new_list(
+ "list",
+ Field::new_list("item", Field::new("item", ArrowDataType::Int32, true), true),
+ true,
+ )]));
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
+
+ let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
+
+ for _ in 0..2048 {
+ if rng.gen_bool(0.2) {
+ list_a_builder.append(false);
+ continue;
+ }
+
+ let list_a_len = rng.gen_range(0..10);
+ let list_b_builder = list_a_builder.values();
+
+ for _ in 0..list_a_len {
+ if rng.gen_bool(0.2) {
+ list_b_builder.append(false);
+ continue;
+ }
+
+ let list_b_len = rng.gen_range(0..10);
+ let int_builder = list_b_builder.values();
+ for _ in 0..list_b_len {
+ match rng.gen_bool(0.2) {
+ true => int_builder.append_null(),
+ false => int_builder.append_value(rng.gen()),
+ }
+ }
+ list_b_builder.append(true)
+ }
+ list_a_builder.append(true);
+ }
+
+ let array = Arc::new(list_a_builder.finish());
+ let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+ writer.write(&batch).unwrap();
+ let _metadata = writer.close().unwrap();
+
+ let buf = Bytes::from(buf);
+
+ let cases = [
+ vec![
+ RowSelector::skip(100),
+ RowSelector::select(924),
+ RowSelector::skip(100),
+ RowSelector::select(924),
+ ],
+ vec![
+ RowSelector::select(924),
+ RowSelector::skip(100),
+ RowSelector::select(924),
+ RowSelector::skip(100),
+ ],
+ vec![
+ RowSelector::skip(1023),
+ RowSelector::select(1),
+ RowSelector::skip(1023),
+ RowSelector::select(1),
+ ],
+ vec![
+ RowSelector::select(1),
+ RowSelector::skip(1023),
+ RowSelector::select(1),
+ RowSelector::skip(1023),
+ ],
+ ];
+
+ for batch_size in [100, 1024, 2048] {
+ for selection in &cases {
+ let selection = RowSelection::from(selection.clone());
+ let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
+ .unwrap()
+ .with_row_selection(selection.clone())
+ .with_batch_size(batch_size)
+ .build()
+ .unwrap();
+
+ let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
+ let actual = concat_batches(&batch.schema(), &batches).unwrap();
+ assert_eq!(actual.num_rows(), selection.row_count());
+
+ let mut batch_offset = 0;
+ let mut actual_offset = 0;
+ for selector in selection.iter() {
+ if selector.skip {
+ batch_offset += selector.row_count;
+ continue;
+ }
+
+ assert_eq!(
+ batch.slice(batch_offset, selector.row_count),
+ actual.slice(actual_offset, selector.row_count)
+ );
+
+ batch_offset += selector.row_count;
+ actual_offset += selector.row_count;
+ }
+ }
+ }
+ }
}
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs
index 529c28872..6344d9dd3 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -227,14 +227,14 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue
type Output = Self;
type Slice = Self;
- fn split_off(&mut self, len: usize) -> Self::Output {
+ fn consume(&mut self) -> Self::Output {
match self {
Self::Dict { keys, values } => Self::Dict {
- keys: keys.take(len),
+ keys: std::mem::take(keys),
values: values.clone(),
},
Self::Values { values } => Self::Values {
- values: values.split_off(len),
+ values: values.consume(),
},
}
}
@@ -275,20 +275,6 @@ mod tests {
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
- // Split off some data
-
- let split = buffer.split_off(4);
- let null_buffer = Buffer::from_iter(valid.drain(0..4));
- let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
- assert_eq!(array.data_type(), &dict_type);
-
- let strings = cast(&array, &ArrowType::Utf8).unwrap();
- let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
- assert_eq!(
- strings.iter().collect::<Vec<_>>(),
- vec![None, None, Some("world"), Some("hello")]
- );
-
// Read some data not preserving the dictionary
let values = buffer.spill_values().unwrap();
@@ -300,8 +286,8 @@ mod tests {
let null_buffer = Buffer::from_iter(valid.iter().cloned());
buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
- assert_eq!(buffer.len(), 9);
- let split = buffer.split_off(9);
+ assert_eq!(buffer.len(), 13);
+ let split = buffer.consume();
let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
assert_eq!(array.data_type(), &dict_type);
@@ -311,6 +297,10 @@ mod tests {
assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
+ None,
+ None,
+ Some("world"),
+ Some("hello"),
None,
Some("a"),
Some(""),
@@ -332,7 +322,7 @@ mod tests {
.unwrap()
.extend_from_slice(&[0, 1, 0, 1]);
- let array = buffer.split_off(4).into_array(None, &dict_type).unwrap();
+ let array = buffer.consume().into_array(None, &dict_type).unwrap();
assert_eq!(array.data_type(), &dict_type);
let strings = cast(&array, &ArrowType::Utf8).unwrap();
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs
index df96996e3..c8732bc4e 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -151,25 +151,8 @@ impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for OffsetBuffer<I> {
type Output = Self;
type Slice = Self;
- fn split_off(&mut self, len: usize) -> Self::Output {
- assert!(self.offsets.len() > len, "{} > {}", self.offsets.len(), len);
- let remaining_offsets = self.offsets.len() - len - 1;
- let offsets = self.offsets.as_slice();
-
- let end_offset = offsets[len];
-
- let mut new_offsets = ScalarBuffer::new();
- new_offsets.reserve(remaining_offsets + 1);
- for v in &offsets[len..] {
- new_offsets.push(*v - end_offset)
- }
-
- self.offsets.resize(len + 1);
-
- Self {
- offsets: std::mem::replace(&mut self.offsets, new_offsets),
- values: self.values.take(end_offset.as_usize()),
- }
+ fn consume(&mut self) -> Self::Output {
+ std::mem::take(self)
}
fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
@@ -267,18 +250,18 @@ mod tests {
}
#[test]
- fn test_offset_buffer_split() {
+ fn test_offset_buffer() {
let mut buffer = OffsetBuffer::<i32>::default();
for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
buffer.try_push(v.as_bytes(), false).unwrap()
}
- let split = buffer.split_off(3);
+ let split = buffer.consume();
let array = split.into_array(None, ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(
strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
- vec!["hello", "world", "cupcakes"]
+ vec!["hello", "world", "cupcakes", "a", "b", "c"]
);
buffer.try_push("test".as_bytes(), false).unwrap();
@@ -286,7 +269,7 @@ mod tests {
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(
strings.iter().map(|x| x.unwrap()).collect::<Vec<_>>(),
- vec!["a", "b", "c", "test"]
+ vec!["test"]
);
}
diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs
index 404989493..4a0fc2a2f 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -30,13 +30,8 @@ pub trait BufferQueue: Sized {
type Slice: ?Sized;
- /// Split out the first `len` items
- ///
- /// # Panics
- ///
- /// Implementations must panic if `len` is beyond the length of [`BufferQueue`]
- ///
- fn split_off(&mut self, len: usize) -> Self::Output;
+ /// Consumes the contents of this [`BufferQueue`]
+ fn consume(&mut self) -> Self::Output;
/// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used
/// to append data to the end of this [`BufferQueue`]
@@ -146,31 +141,6 @@ impl<T: ScalarValue> ScalarBuffer<T> {
assert!(prefix.is_empty() && suffix.is_empty());
buf
}
-
- pub fn take(&mut self, len: usize) -> Self {
- assert!(len <= self.len);
-
- let num_bytes = len * std::mem::size_of::<T>();
- let remaining_bytes = self.buffer.len() - num_bytes;
- // TODO: Optimize to reduce the copy
- // create an empty buffer, as it will be resized below
- let mut remaining = MutableBuffer::new(0);
- remaining.resize(remaining_bytes, 0);
-
- let new_records = remaining.as_slice_mut();
-
- new_records[0..remaining_bytes]
- .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
-
- self.buffer.resize(num_bytes, 0);
- self.len -= len;
-
- Self {
- buffer: std::mem::replace(&mut self.buffer, remaining),
- len,
- _phantom: Default::default(),
- }
- }
}
impl<T: ScalarValue + ArrowNativeType> ScalarBuffer<T> {
@@ -196,8 +166,8 @@ impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
type Slice = [T];
- fn split_off(&mut self, len: usize) -> Self::Output {
- self.take(len).into()
+ fn consume(&mut self) -> Self::Output {
+ std::mem::take(self).into()
}
fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index 272716caf..5be0ac84d 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -22,16 +22,16 @@ use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
use arrow_buffer::Buffer;
use crate::arrow::buffer::bit_util::count_set_bits;
-use crate::arrow::record_reader::buffer::BufferQueue;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
- ColumnLevelDecoder, ColumnLevelDecoderImpl, DefinitionLevelDecoder, LevelsBufferSlice,
+ ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
+ LevelsBufferSlice,
};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
-use super::{buffer::ScalarBuffer, MIN_BATCH_SIZE};
+use super::buffer::ScalarBuffer;
enum BufferInner {
/// Compute levels and null mask
@@ -87,13 +87,10 @@ impl DefinitionLevelBuffer {
Self { inner, len: 0 }
}
- pub fn split_levels(&mut self, len: usize) -> Option<Buffer> {
+ /// Returns the built level data
+ pub fn consume_levels(&mut self) -> Option<Buffer> {
match &mut self.inner {
- BufferInner::Full { levels, .. } => {
- let out = levels.split_off(len);
- self.len = levels.len();
- Some(out)
- }
+ BufferInner::Full { levels, .. } => Some(std::mem::take(levels).into()),
BufferInner::Mask { .. } => None,
}
}
@@ -103,27 +100,13 @@ impl DefinitionLevelBuffer {
self.len = len;
}
- /// Split `len` levels out of `self`
- pub fn split_bitmask(&mut self, len: usize) -> Buffer {
- let old_builder = match &mut self.inner {
- BufferInner::Full { nulls, .. } => nulls,
- BufferInner::Mask { nulls } => nulls,
- };
-
- // Compute the number of values left behind
- let num_left_values = old_builder.len() - len;
- let mut new_builder =
- BooleanBufferBuilder::new(MIN_BATCH_SIZE.max(num_left_values));
-
- // Copy across remaining values
- new_builder.append_packed_range(len..old_builder.len(), old_builder.as_slice());
-
- // Truncate buffer
- old_builder.resize(len);
-
- // Swap into self
- self.len = new_builder.len();
- std::mem::replace(old_builder, new_builder).into()
+ /// Returns the built null bitmask
+ pub fn consume_bitmask(&mut self) -> Buffer {
+ self.len = 0;
+ match &mut self.inner {
+ BufferInner::Full { nulls, .. } => nulls.finish().into_inner(),
+ BufferInner::Mask { nulls } => nulls.finish().into_inner(),
+ }
}
pub fn nulls(&self) -> &BooleanBufferBuilder {
@@ -148,7 +131,7 @@ impl LevelsBufferSlice for DefinitionLevelBuffer {
enum MaybePacked {
Packed(PackedDecoder),
- Fallback(ColumnLevelDecoderImpl),
+ Fallback(DefinitionLevelDecoderImpl),
}
pub struct DefinitionLevelBufferDecoder {
@@ -160,7 +143,7 @@ impl DefinitionLevelBufferDecoder {
pub fn new(max_level: i16, packed: bool) -> Self {
let decoder = match packed {
true => MaybePacked::Packed(PackedDecoder::new()),
- false => MaybePacked::Fallback(ColumnLevelDecoderImpl::new(max_level)),
+ false => MaybePacked::Fallback(DefinitionLevelDecoderImpl::new(max_level)),
};
Self { max_level, decoder }
@@ -176,8 +159,14 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
MaybePacked::Fallback(d) => d.set_data(encoding, data),
}
}
+}
- fn read(&mut self, writer: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
+impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
+ fn read_def_levels(
+ &mut self,
+ writer: &mut Self::Slice,
+ range: Range<usize>,
+ ) -> Result<usize> {
match (&mut writer.inner, &mut self.decoder) {
(
BufferInner::Full {
@@ -193,7 +182,7 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
levels.resize(range.end + writer.len);
let slice = &mut levels.as_slice_mut()[writer.len..];
- let levels_read = decoder.read(slice, range.clone())?;
+ let levels_read = decoder.read_def_levels(slice, range.clone())?;
nulls.reserve(levels_read);
for i in &slice[range.start..range.start + levels_read] {
@@ -211,9 +200,7 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
_ => unreachable!("inconsistent null mask"),
}
}
-}
-impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
fn skip_def_levels(
&mut self,
num_levels: usize,
@@ -391,11 +378,8 @@ impl PackedDecoder {
#[cfg(test)]
mod tests {
use super::*;
- use std::sync::Arc;
- use crate::basic::Type as PhysicalType;
use crate::encodings::rle::RleEncoder;
- use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
use rand::{thread_rng, Rng};
#[test]
@@ -492,30 +476,4 @@ mod tests {
assert_eq!(read_level + skip_level, len);
assert_eq!(read_value + skip_value, total_value);
}
-
- #[test]
- fn test_split_off() {
- let t = Type::primitive_type_builder("col", PhysicalType::INT32)
- .build()
- .unwrap();
-
- let descriptor = Arc::new(ColumnDescriptor::new(
- Arc::new(t),
- 1,
- 0,
- ColumnPath::new(vec![]),
- ));
-
- let mut buffer = DefinitionLevelBuffer::new(&descriptor, true);
- match &mut buffer.inner {
- BufferInner::Mask { nulls } => nulls.append_n(100, false),
- _ => unreachable!(),
- };
-
- let bitmap = buffer.split_bitmask(19);
-
- // Should have split off 19 records leaving, 81 behind
- assert_eq!(bitmap.len(), 3); // Note: bitmask only tracks bytes not bits
- assert_eq!(buffer.nulls().len(), 81);
- }
}
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index e47bdee1c..35933e6e1 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -15,18 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp::{max, min};
-
use arrow_buffer::Buffer;
use crate::arrow::record_reader::{
buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
};
+use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
use crate::column::{
page::PageReader,
reader::{
- decoder::{ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl},
+ decoder::{ColumnValueDecoder, ColumnValueDecoderImpl},
GenericColumnReader,
},
};
@@ -37,15 +36,12 @@ use crate::schema::types::ColumnDescPtr;
pub(crate) mod buffer;
mod definition_levels;
-/// The minimum number of levels read when reading a repeated field
-pub(crate) const MIN_BATCH_SIZE: usize = 1024;
-
/// A `RecordReader` is a stateful column reader that delimits semantic records.
pub type RecordReader<T> =
GenericRecordReader<ScalarBuffer<<T as DataType>::T>, ColumnValueDecoderImpl<T>>;
pub(crate) type ColumnReader<CV> =
- GenericColumnReader<ColumnLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
+ GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelBufferDecoder, CV>;
/// A generic stateful column reader that delimits semantic records
///
@@ -55,19 +51,14 @@ pub(crate) type ColumnReader<CV> =
pub struct GenericRecordReader<V, CV> {
column_desc: ColumnDescPtr,
- records: V,
+ values: V,
def_levels: Option<DefinitionLevelBuffer>,
rep_levels: Option<ScalarBuffer<i16>>,
column_reader: Option<ColumnReader<CV>>,
-
- /// Number of records accumulated in records
- num_records: usize,
-
- /// Number of values `num_records` contains.
+ /// Number of buffered levels / null-padded values
num_values: usize,
-
- /// Starts from 1, number of values have been written to buffer
- values_written: usize,
+ /// Number of buffered records
+ num_records: usize,
}
impl<V, CV> GenericRecordReader<V, CV>
@@ -93,14 +84,13 @@ where
let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
Self {
- records,
+ values: records,
def_levels,
rep_levels,
column_reader: None,
column_desc: desc,
- num_records: 0,
num_values: 0,
- values_written: 0,
+ num_records: 0,
}
}
@@ -117,7 +107,7 @@ where
});
let rep_level_decoder = (descr.max_rep_level() != 0)
- .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+ .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
self.column_reader = Some(GenericColumnReader::new_with_decoders(
self.column_desc.clone(),
@@ -142,56 +132,14 @@ where
let mut records_read = 0;
loop {
- // Try to find some records from buffers that has been read into memory
- // but not counted as seen records.
-
- // Check to see if the column is exhausted. Only peek the next page since in
- // case we are reading to a page boundary and do not actually need to read
- // the next page.
- let end_of_column = !self.column_reader.as_mut().unwrap().peek_next()?;
-
- let (record_count, value_count) =
- self.count_records(num_records - records_read, end_of_column);
-
- self.num_records += record_count;
- self.num_values += value_count;
- records_read += record_count;
-
+ let records_to_read = num_records - records_read;
+ records_read += self.read_one_batch(records_to_read)?;
if records_read == num_records
|| !self.column_reader.as_mut().unwrap().has_next()?
{
break;
}
-
- // If repetition levels present, we don't know how much more to read
- // in order to read the requested number of records, therefore read at least
- // MIN_BATCH_SIZE, otherwise read **exactly** what was requested. This helps
- // to avoid a degenerate case where the buffers are never fully drained.
- //
- // Consider the scenario where the user is requesting batches of MIN_BATCH_SIZE.
- //
- // When transitioning across a row group boundary, this will read some remainder
- // from the row group `r`, before reading MIN_BATCH_SIZE from the next row group,
- // leaving `MIN_BATCH_SIZE + r` in the buffer.
- //
- // The client will then only split off the `MIN_BATCH_SIZE` they actually wanted,
- // leaving behind `r`. This will continue indefinitely.
- //
- // Aside from wasting cycles splitting and shuffling buffers unnecessarily, this
- // prevents dictionary preservation from functioning correctly as the buffer
- // will never be emptied, allowing a new dictionary to be registered.
- //
- // This degenerate case can still occur for repeated fields, but
- // it is avoided for the more common case of a non-repeated field
- let batch_size = match &self.rep_levels {
- Some(_) => max(num_records - records_read, MIN_BATCH_SIZE),
- None => num_records - records_read,
- };
-
- // Try to more value from parquet pages
- self.read_one_batch(batch_size)?;
}
-
Ok(records_read)
}
@@ -201,31 +149,10 @@ where
///
/// Number of records skipped
pub fn skip_records(&mut self, num_records: usize) -> Result<usize> {
- // First need to clear the buffer
- let end_of_column = match self.column_reader.as_mut() {
- Some(reader) => !reader.peek_next()?,
- None => return Ok(0),
- };
-
- let (buffered_records, buffered_values) =
- self.count_records(num_records, end_of_column);
-
- self.num_records += buffered_records;
- self.num_values += buffered_values;
-
- let remaining = num_records - buffered_records;
-
- if remaining == 0 {
- return Ok(buffered_records);
+ match self.column_reader.as_mut() {
+ Some(reader) => reader.skip_records(num_records),
+ None => Ok(0),
}
-
- let skipped = self
- .column_reader
- .as_mut()
- .unwrap()
- .skip_records(remaining)?;
-
- Ok(skipped + buffered_records)
}
/// Returns number of records stored in buffer.
@@ -246,25 +173,19 @@ where
/// definition level values that have already been read into memory but not counted
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Option<Buffer> {
- match self.def_levels.as_mut() {
- Some(x) => x.split_levels(self.num_values),
- None => None,
- }
+ self.def_levels.as_mut().and_then(|x| x.consume_levels())
}
/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
- match self.rep_levels.as_mut() {
- Some(x) => Some(x.split_off(self.num_values)),
- None => None,
- }
+ self.rep_levels.as_mut().map(|x| x.consume())
}
/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_record_data(&mut self) -> V::Output {
- self.records.split_off(self.num_values)
+ self.values.consume()
}
/// Returns currently stored null bitmap data.
@@ -277,34 +198,31 @@ where
/// Should be called after consuming data, e.g. `consume_rep_levels`,
/// `consume_rep_levels`, `consume_record_data` and `consume_bitmap_buffer`.
pub fn reset(&mut self) {
- self.values_written -= self.num_values;
- self.num_records = 0;
self.num_values = 0;
+ self.num_records = 0;
}
/// Returns bitmap data.
pub fn consume_bitmap(&mut self) -> Option<Buffer> {
self.def_levels
.as_mut()
- .map(|levels| levels.split_bitmask(self.num_values))
+ .map(|levels| levels.consume_bitmask())
}
- /// Try to read one batch of data.
+ /// Try to read one batch of data returning the number of records read
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
let rep_levels = self
.rep_levels
.as_mut()
.map(|levels| levels.spare_capacity_mut(batch_size));
-
let def_levels = self.def_levels.as_mut();
+ let values = self.values.spare_capacity_mut(batch_size);
- let values = self.records.spare_capacity_mut(batch_size);
-
- let (values_read, levels_read) = self
+ let (records_read, values_read, levels_read) = self
.column_reader
.as_mut()
.unwrap()
- .read_batch(batch_size, def_levels, rep_levels, values)?;
+ .read_records(batch_size, def_levels, rep_levels, values)?;
if values_read < levels_read {
let def_levels = self.def_levels.as_ref().ok_or_else(|| {
@@ -313,90 +231,29 @@ where
)
})?;
- self.records.pad_nulls(
- self.values_written,
+ self.values.pad_nulls(
+ self.num_values,
values_read,
levels_read,
def_levels.nulls().as_slice(),
);
}
- let values_read = max(levels_read, values_read);
- self.set_values_written(self.values_written + values_read);
- Ok(values_read)
- }
-
- /// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
- /// and returns the number of "complete" records along with the corresponding number of values
- ///
- /// If `end_of_column` is true it indicates that there are no further values for this
- /// column chunk beyond what is currently in the buffers
- ///
- /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
- fn count_records(
- &self,
- records_to_read: usize,
- end_of_column: bool,
- ) -> (usize, usize) {
- match self.rep_levels.as_ref() {
- Some(buf) => {
- let buf = buf.as_slice();
-
- let mut records_read = 0;
- let mut end_of_last_record = self.num_values;
-
- for (current, item) in buf
- .iter()
- .enumerate()
- .take(self.values_written)
- .skip(self.num_values)
- {
- if *item == 0 && current != self.num_values {
- records_read += 1;
- end_of_last_record = current;
-
- if records_read == records_to_read {
- break;
- }
- }
- }
-
- // If reached end of column chunk => end of a record
- if records_read != records_to_read
- && end_of_column
- && self.values_written != self.num_values
- {
- records_read += 1;
- end_of_last_record = self.values_written;
- }
-
- (records_read, end_of_last_record - self.num_values)
- }
- None => {
- let records_read =
- min(records_to_read, self.values_written - self.num_values);
-
- (records_read, records_read)
- }
- }
- }
-
- fn set_values_written(&mut self, new_values_written: usize) {
- self.values_written = new_values_written;
- self.records.set_len(self.values_written);
-
+ self.num_records += records_read;
+ self.num_values += levels_read;
+ self.values.set_len(self.num_values);
if let Some(ref mut buf) = self.rep_levels {
- buf.set_len(self.values_written)
+ buf.set_len(self.num_values)
};
-
if let Some(ref mut buf) = self.def_levels {
- buf.set_len(self.values_written)
+ buf.set_len(self.num_values)
};
+ Ok(records_read)
}
}
/// Returns true if we do not need to unpack the nullability for this column, this is
-/// only possible if the max defiition level is 1, and corresponds to nulls at the
+/// only possible if the max definition level is 1, and corresponds to nulls at the
/// leaf level, as opposed to a nullable parent nested type
fn packed_null_mask(descr: &ColumnDescPtr) -> bool {
descr.max_def_level() == 1
diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs
index a68127a4e..c81d6290a 100644
--- a/parquet/src/column/mod.rs
+++ b/parquet/src/column/mod.rs
@@ -84,7 +84,6 @@
//! let reader = SerializedFileReader::new(file).unwrap();
//! let metadata = reader.metadata();
//!
-//! let mut res = Ok((0, 0));
//! let mut values = vec![0; 8];
//! let mut def_levels = vec![0; 8];
//! let mut rep_levels = vec![0; 8];
@@ -98,19 +97,21 @@
//! match column_reader {
//! // You can also use `get_typed_column_reader` method to extract typed reader.
//! ColumnReader::Int32ColumnReader(ref mut typed_reader) => {
-//! res = typed_reader.read_batch(
-//! 8, // batch size
+//! let (records, values, levels) = typed_reader.read_records(
+//! 8, // maximum records to read
//! Some(&mut def_levels),
//! Some(&mut rep_levels),
//! &mut values,
-//! );
+//! ).unwrap();
+//! assert_eq!(records, 2);
+//! assert_eq!(levels, 5);
+//! assert_eq!(values, 3);
//! }
//! _ => {}
//! }
//! }
//! }
//!
-//! assert_eq!(res.unwrap(), (3, 5));
//! assert_eq!(values, vec![1, 2, 3, 0, 0, 0, 0, 0]);
//! assert_eq!(def_levels, vec![3, 3, 3, 2, 2, 0, 0, 0]);
//! assert_eq!(rep_levels, vec![0, 1, 0, 1, 1, 0, 0, 0]);
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 991ec2c54..88967e179 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -17,13 +17,12 @@
//! Contains column reader API.
-use std::cmp::min;
-
use super::page::{Page, PageReader};
use crate::basic::*;
use crate::column::reader::decoder::{
- ColumnLevelDecoderImpl, ColumnValueDecoder, ColumnValueDecoderImpl,
- DefinitionLevelDecoder, LevelsBufferSlice, RepetitionLevelDecoder, ValuesBufferSlice,
+ ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder,
+ DefinitionLevelDecoderImpl, LevelsBufferSlice, RepetitionLevelDecoder,
+ RepetitionLevelDecoderImpl, ValuesBufferSlice,
};
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
@@ -103,8 +102,8 @@ pub fn get_typed_column_reader<T: DataType>(
/// Typed value reader for a particular primitive column.
pub type ColumnReaderImpl<T> = GenericColumnReader<
- ColumnLevelDecoderImpl,
- ColumnLevelDecoderImpl,
+ RepetitionLevelDecoderImpl,
+ DefinitionLevelDecoderImpl,
ColumnValueDecoderImpl<T>,
>;
@@ -119,11 +118,14 @@ pub struct GenericColumnReader<R, D, V> {
page_reader: Box<dyn PageReader>,
/// The total number of values stored in the data page.
- num_buffered_values: u32,
+ num_buffered_values: usize,
/// The number of values from the current data page that has been decoded into memory
/// so far.
- num_decoded_values: u32,
+ num_decoded_values: usize,
+
+ /// True if the end of the current data page denotes the end of a record
+ has_record_delimiter: bool,
/// The decoder for the definition levels if any
def_level_decoder: Option<D>,
@@ -135,7 +137,7 @@ pub struct GenericColumnReader<R, D, V> {
values_decoder: V,
}
-impl<V> GenericColumnReader<ColumnLevelDecoderImpl, ColumnLevelDecoderImpl, V>
+impl<V> GenericColumnReader<RepetitionLevelDecoderImpl, DefinitionLevelDecoderImpl, V>
where
V: ColumnValueDecoder,
{
@@ -144,10 +146,10 @@ where
let values_decoder = V::new(&descr);
let def_level_decoder = (descr.max_def_level() != 0)
- .then(|| ColumnLevelDecoderImpl::new(descr.max_def_level()));
+ .then(|| DefinitionLevelDecoderImpl::new(descr.max_def_level()));
let rep_level_decoder = (descr.max_rep_level() != 0)
- .then(|| ColumnLevelDecoderImpl::new(descr.max_rep_level()));
+ .then(|| RepetitionLevelDecoderImpl::new(descr.max_rep_level()));
Self::new_with_decoders(
descr,
@@ -180,6 +182,7 @@ where
num_buffered_values: 0,
num_decoded_values: 0,
values_decoder,
+ has_record_delimiter: false,
}
}
@@ -195,99 +198,126 @@ where
///
/// `values` will be contiguously populated with the non-null values. Note that if the column
/// is not required, this may be less than either `batch_size` or the number of levels read
+ #[deprecated(note = "Use read_records")]
pub fn read_batch(
&mut self,
batch_size: usize,
- mut def_levels: Option<&mut D::Slice>,
- mut rep_levels: Option<&mut R::Slice>,
+ def_levels: Option<&mut D::Slice>,
+ rep_levels: Option<&mut R::Slice>,
values: &mut V::Slice,
) -> Result<(usize, usize)> {
- let mut values_read = 0;
- let mut levels_read = 0;
+ let (_, values, levels) =
+ self.read_records(batch_size, def_levels, rep_levels, values)?;
+
+ Ok((values, levels))
+ }
- // Compute the smallest batch size we can read based on provided slices
- let mut batch_size = min(batch_size, values.capacity());
+ /// Read up to `num_records` returning the number of complete records, non-null
+ /// values and levels decoded
+ ///
+ /// If the max definition level is 0, `def_levels` will be ignored, otherwise it will be
+ /// populated with the number of levels read, with an error returned if it is `None`.
+ ///
+ /// If the max repetition level is 0, `rep_levels` will be ignored, otherwise it will be
+ /// populated with the number of levels read, with an error returned if it is `None`.
+ ///
+ /// `values` will be contiguously populated with the non-null values. Note that if the column
+ /// is not required, this may be less than either `max_records` or the number of levels read
+ pub fn read_records(
+ &mut self,
+ max_records: usize,
+ mut def_levels: Option<&mut D::Slice>,
+ mut rep_levels: Option<&mut R::Slice>,
+ values: &mut V::Slice,
+ ) -> Result<(usize, usize, usize)> {
+ let mut max_levels = values.capacity().min(max_records);
if let Some(ref levels) = def_levels {
- batch_size = min(batch_size, levels.capacity());
+ max_levels = max_levels.min(levels.capacity());
}
if let Some(ref levels) = rep_levels {
- batch_size = min(batch_size, levels.capacity());
+ max_levels = max_levels.min(levels.capacity())
}
- // Read exhaustively all pages until we read all batch_size values/levels
- // or there are no more values/levels to read.
- while levels_read < batch_size {
- if !self.has_next()? {
- break;
- }
+ let mut total_records_read = 0;
+ let mut total_levels_read = 0;
+ let mut total_values_read = 0;
- // Batch size for the current iteration
- let iter_batch_size = (batch_size - levels_read)
- .min((self.num_buffered_values - self.num_decoded_values) as usize);
+ while total_records_read < max_records
+ && total_levels_read < max_levels
+ && self.has_next()?
+ {
+ let remaining_records = max_records - total_records_read;
+ let remaining_levels = self.num_buffered_values - self.num_decoded_values;
+ let levels_to_read = remaining_levels.min(max_levels - total_levels_read);
- // If the field is required and non-repeated, there are no definition levels
- let null_count = match self.descr.max_def_level() > 0 {
- true => {
- let levels = def_levels
+ let (records_read, levels_read) = match self.rep_level_decoder.as_mut() {
+ Some(reader) => {
+ let out = rep_levels
.as_mut()
- .ok_or_else(|| general_err!("must specify definition levels"))?;
+ .ok_or_else(|| general_err!("must specify repetition levels"))?;
+
+ let (mut records_read, levels_read) = reader.read_rep_levels(
+ out,
+ total_levels_read..total_levels_read + levels_to_read,
+ remaining_records,
+ )?;
+
+ if levels_read == remaining_levels && self.has_record_delimiter {
+ // Reached end of page, which implies records_read < remaining_records
+ // as otherwise would have stopped reading before reaching the end
+ assert!(records_read < remaining_records); // Sanity check
+ records_read += 1;
+ }
+ (records_read, levels_read)
+ }
+ None => {
+ let min = remaining_records.min(levels_to_read);
+ (min, min)
+ }
+ };
- let num_def_levels = self
- .def_level_decoder
+ let values_to_read = match self.def_level_decoder.as_mut() {
+ Some(reader) => {
+ let out = def_levels
.as_mut()
- .expect("def_level_decoder be set")
- .read(levels, levels_read..levels_read + iter_batch_size)?;
+ .ok_or_else(|| general_err!("must specify definition levels"))?;
+
+ let read = reader.read_def_levels(
+ out,
+ total_levels_read..total_levels_read + levels_read,
+ )?;
- if num_def_levels != iter_batch_size {
- return Err(general_err!("insufficient definition levels read from column - expected {}, got {}", iter_batch_size, num_def_levels));
+ if read != levels_read {
+ return Err(general_err!("insufficient definition levels read from column - expected {rep_levels}, got {read}"));
}
- levels.count_nulls(
- levels_read..levels_read + num_def_levels,
+ let null_count = out.count_nulls(
+ total_levels_read..total_levels_read + read,
self.descr.max_def_level(),
- )
+ );
+ levels_read - null_count
}
- false => 0,
+ None => levels_read,
};
- if self.descr.max_rep_level() > 0 {
- let levels = rep_levels
- .as_mut()
- .ok_or_else(|| general_err!("must specify repetition levels"))?;
-
- let rep_levels = self
- .rep_level_decoder
- .as_mut()
- .expect("rep_level_decoder be set")
- .read(levels, levels_read..levels_read + iter_batch_size)?;
-
- if rep_levels != iter_batch_size {
- return Err(general_err!("insufficient repetition levels read from column - expected {}, got {}", iter_batch_size, rep_levels));
- }
- }
-
- let values_to_read = iter_batch_size - null_count;
- let curr_values_read = self
- .values_decoder
- .read(values, values_read..values_read + values_to_read)?;
+ let values_read = self.values_decoder.read(
+ values,
+ total_values_read..total_values_read + values_to_read,
+ )?;
- if curr_values_read != values_to_read {
+ if values_read != values_to_read {
return Err(general_err!(
- "insufficient values read from column - expected: {}, got: {}",
- values_to_read,
- curr_values_read
+ "insufficient values read from column - expected: {values_to_read}, got: {values_read}",
));
}
- // Update all "return" counters and internal state.
-
- // This is to account for when def or rep levels are not provided
- self.num_decoded_values += iter_batch_size as u32;
- levels_read += iter_batch_size;
- values_read += curr_values_read;
+ self.num_decoded_values += levels_read;
+ total_records_read += records_read;
+ total_levels_read += levels_read;
+ total_values_read += values_read;
}
- Ok((values_read, levels_read))
+ Ok((total_records_read, total_values_read, total_levels_read))
}
/// Skips over `num_records` records, where records are delimited by repetition levels of 0
@@ -336,21 +366,30 @@ where
// start skip values in page level
// The number of levels in the current data page
- let buffered_levels =
- (self.num_buffered_values - self.num_decoded_values) as usize;
+ let remaining_levels = self.num_buffered_values - self.num_decoded_values;
let (records_read, rep_levels_read) = match self.rep_level_decoder.as_mut() {
Some(decoder) => {
- decoder.skip_rep_levels(remaining_records, buffered_levels)?
+ let (mut records_read, levels_read) =
+ decoder.skip_rep_levels(remaining_records, remaining_levels)?;
+
+ if levels_read == remaining_levels && self.has_record_delimiter {
+ // Reached end of page, which implies records_read < remaining_records
+ // as otherwise would have stopped reading before reaching the end
+ assert!(records_read < remaining_records); // Sanity check
+ records_read += 1;
+ }
+
+ (records_read, levels_read)
}
None => {
// No repetition levels, so each level corresponds to a row
- let levels = buffered_levels.min(remaining_records);
+ let levels = remaining_levels.min(remaining_records);
(levels, levels)
}
};
- self.num_decoded_values += rep_levels_read as u32;
+ self.num_decoded_values += rep_levels_read;
remaining_records -= records_read;
if self.num_buffered_values == self.num_decoded_values {
@@ -431,7 +470,7 @@ where
rep_level_encoding,
statistics: _,
} => {
- self.num_buffered_values = num_values;
+ self.num_buffered_values = num_values as _;
self.num_decoded_values = 0;
let max_rep_level = self.descr.max_rep_level();
@@ -448,6 +487,9 @@ where
)?;
offset += bytes_read;
+ self.has_record_delimiter =
+ self.page_reader.peek_next_page()?.is_none();
+
self.rep_level_decoder
.as_mut()
.unwrap()
@@ -493,12 +535,18 @@ where
return Err(general_err!("more nulls than values in page, contained {} values and {} nulls", num_values, num_nulls));
}
- self.num_buffered_values = num_values;
+ self.num_buffered_values = num_values as _;
self.num_decoded_values = 0;
// DataPage v2 only supports RLE encoding for repetition
// levels
if self.descr.max_rep_level() > 0 {
+ // Technically a DataPage v2 should not write a record
+ // across multiple pages, however, the parquet writer
+ // used to do this so we preserve backwards compatibility
+ self.has_record_delimiter =
+ self.page_reader.peek_next_page()?.is_none();
+
self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.range(0, rep_levels_byte_len as usize),
@@ -533,21 +581,6 @@ where
}
}
- /// Check whether there is more data to read from this column,
- /// If the current page is fully decoded, this will NOT load the next page
- /// into the buffer
- #[inline]
- #[cfg(feature = "arrow")]
- pub(crate) fn peek_next(&mut self) -> Result<bool> {
- if self.num_buffered_values == 0
- || self.num_buffered_values == self.num_decoded_values
- {
- Ok(self.page_reader.peek_next_page()?.is_some())
- } else {
- Ok(true)
- }
- }
-
/// Check whether there is more data to read from this column,
/// If the current page is fully decoded, this will load the next page
/// (if it exists) into the buffer
@@ -1359,15 +1392,14 @@ mod tests {
let mut curr_values_read = 0;
let mut curr_levels_read = 0;
- let mut done = false;
- while !done {
+ loop {
let actual_def_levels =
def_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]);
let actual_rep_levels =
rep_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]);
- let (values_read, levels_read) = typed_column_reader
- .read_batch(
+ let (_, values_read, levels_read) = typed_column_reader
+ .read_records(
batch_size,
actual_def_levels,
actual_rep_levels,
@@ -1375,12 +1407,12 @@ mod tests {
)
.expect("read_batch() should be OK");
- if values_read == 0 && levels_read == 0 {
- done = true;
- }
-
curr_values_read += values_read;
curr_levels_read += levels_read;
+
+ if values_read == 0 && levels_read == 0 {
+ break;
+ }
}
assert!(
diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs
index 3a6795c8c..369b335dc 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -68,24 +68,35 @@ pub trait ColumnLevelDecoder {
/// Set data for this [`ColumnLevelDecoder`]
fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr);
+}
- /// Read level data into `out[range]` returning the number of levels read
+pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
+ /// Read up to `max_records` of repetition level data into `out[range]` returning the number
+ /// of complete records and levels read
///
/// `range` is provided by the caller to allow for types such as default-initialized `[T]`
/// that only track capacity and not length
///
+ /// A record only ends when the data contains a subsequent repetition level of 0,
+ /// it is therefore left to the caller to delimit the final record in a column
+ ///
/// # Panics
///
/// Implementations may panic if `range` overlaps with already written data
- ///
- fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;
-}
+ fn read_rep_levels(
+ &mut self,
+ out: &mut Self::Slice,
+ range: Range<usize>,
+ max_records: usize,
+ ) -> Result<(usize, usize)>;
-pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
/// Skips over up to `num_levels` repetition levels corresponding to `num_records` records,
/// where a record is delimited by a repetition level of 0
///
/// Returns the number of records skipped, and the number of levels skipped
+ ///
+ /// A record only ends when the data contains a subsequent repetition level of 0,
+ /// it is therefore left to the caller to delimit the final record in a column
fn skip_rep_levels(
&mut self,
num_records: usize,
@@ -94,6 +105,22 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
}
pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
+ /// Read definition level data into `out[range]` returning the number of levels read
+ ///
+ /// `range` is provided by the caller to allow for types such as default-initialized `[T]`
+ /// that only track capacity and not length
+ ///
+ /// # Panics
+ ///
+ /// Implementations may panic if `range` overlaps with already written data
+ ///
+ // TODO: Should this return the number of nulls
+ fn read_def_levels(
+ &mut self,
+ out: &mut Self::Slice,
+ range: Range<usize>,
+ ) -> Result<usize>;
+
/// Skips over `num_levels` definition levels
///
/// Returns the number of values skipped, and the number of levels skipped
@@ -270,101 +297,67 @@ impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
const SKIP_BUFFER_SIZE: usize = 1024;
-/// An implementation of [`ColumnLevelDecoder`] for `[i16]`
-pub struct ColumnLevelDecoderImpl {
- decoder: Option<LevelDecoderInner>,
- /// Temporary buffer populated when skipping values
- buffer: Vec<i16>,
- bit_width: u8,
+enum LevelDecoder {
+ Packed(BitReader, u8),
+ Rle(RleDecoder),
}
-impl ColumnLevelDecoderImpl {
- pub fn new(max_level: i16) -> Self {
- let bit_width = num_required_bits(max_level as u64);
- Self {
- decoder: None,
- buffer: vec![],
- bit_width,
+impl LevelDecoder {
+ fn new(encoding: Encoding, data: ByteBufferPtr, bit_width: u8) -> Self {
+ match encoding {
+ Encoding::RLE => {
+ let mut decoder = RleDecoder::new(bit_width);
+ decoder.set_data(data);
+ Self::Rle(decoder)
+ }
+ Encoding::BIT_PACKED => Self::Packed(BitReader::new(data), bit_width),
+ _ => unreachable!("invalid level encoding: {}", encoding),
}
}
- /// Drops the first `len` values from the internal buffer
- fn split_off_buffer(&mut self, len: usize) {
- match self.buffer.len() == len {
- true => self.buffer.clear(),
- false => {
- // Move to_read elements to end of slice
- self.buffer.rotate_left(len);
- // Truncate buffer
- self.buffer.truncate(self.buffer.len() - len);
+ fn read(&mut self, out: &mut [i16]) -> Result<usize> {
+ match self {
+ Self::Packed(reader, bit_width) => {
+ Ok(reader.get_batch::<i16>(out, *bit_width as usize))
}
+ Self::Rle(reader) => Ok(reader.get_batch(out)?),
}
}
+}
- /// Reads up to `to_read` values to the internal buffer
- fn read_to_buffer(&mut self, to_read: usize) -> Result<()> {
- let mut buf = std::mem::take(&mut self.buffer);
-
- // Repopulate buffer
- buf.resize(to_read, 0);
- let actual = self.read(&mut buf, 0..to_read)?;
- buf.truncate(actual);
-
- self.buffer = buf;
- Ok(())
- }
+/// An implementation of [`DefinitionLevelDecoder`] for `[i16]`
+pub struct DefinitionLevelDecoderImpl {
+ decoder: Option<LevelDecoder>,
+ bit_width: u8,
}
-enum LevelDecoderInner {
- Packed(BitReader, u8),
- Rle(RleDecoder),
+impl DefinitionLevelDecoderImpl {
+ pub fn new(max_level: i16) -> Self {
+ let bit_width = num_required_bits(max_level as u64);
+ Self {
+ decoder: None,
+ bit_width,
+ }
+ }
}
-impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
+impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
type Slice = [i16];
fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
- self.buffer.clear();
- match encoding {
- Encoding::RLE => {
- let mut decoder = RleDecoder::new(self.bit_width);
- decoder.set_data(data);
- self.decoder = Some(LevelDecoderInner::Rle(decoder));
- }
- Encoding::BIT_PACKED => {
- self.decoder = Some(LevelDecoderInner::Packed(
- BitReader::new(data),
- self.bit_width,
- ));
- }
- _ => unreachable!("invalid level encoding: {}", encoding),
- }
+ self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
}
+}
- fn read(&mut self, out: &mut Self::Slice, mut range: Range<usize>) -> Result<usize> {
- let read_from_buffer = match self.buffer.is_empty() {
- true => 0,
- false => {
- let read_from_buffer = self.buffer.len().min(range.end - range.start);
- out[range.start..range.start + read_from_buffer]
- .copy_from_slice(&self.buffer[0..read_from_buffer]);
- self.split_off_buffer(read_from_buffer);
- read_from_buffer
- }
- };
- range.start += read_from_buffer;
-
- match self.decoder.as_mut().unwrap() {
- LevelDecoderInner::Packed(reader, bit_width) => Ok(read_from_buffer
- + reader.get_batch::<i16>(&mut out[range], *bit_width as usize)),
- LevelDecoderInner::Rle(reader) => {
- Ok(read_from_buffer + reader.get_batch(&mut out[range])?)
- }
- }
+impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
+ fn read_def_levels(
+ &mut self,
+ out: &mut Self::Slice,
+ range: Range<usize>,
+ ) -> Result<usize> {
+ self.decoder.as_mut().unwrap().read(&mut out[range])
}
-}
-impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
fn skip_def_levels(
&mut self,
num_levels: usize,
@@ -372,80 +365,159 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
) -> Result<(usize, usize)> {
let mut level_skip = 0;
let mut value_skip = 0;
+ let mut buf: Vec<i16> = vec![];
while level_skip < num_levels {
let remaining_levels = num_levels - level_skip;
- if self.buffer.is_empty() {
- // Only read number of needed values
- self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
- if self.buffer.is_empty() {
- // Reached end of page
- break;
- }
+ let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
+ buf.resize(to_read, 0);
+ let read = self.read_def_levels(&mut buf, 0..to_read)?;
+ if read == 0 {
+ // Reached end of page
+ break;
}
- let to_read = self.buffer.len().min(remaining_levels);
- level_skip += to_read;
- value_skip += self.buffer[..to_read]
- .iter()
- .filter(|x| **x == max_def_level)
- .count();
-
- self.split_off_buffer(to_read)
+ level_skip += read;
+ value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count();
}
Ok((value_skip, level_skip))
}
}
-impl RepetitionLevelDecoder for ColumnLevelDecoderImpl {
- fn skip_rep_levels(
+pub(crate) const REPETITION_LEVELS_BATCH_SIZE: usize = 1024;
+
+/// An implementation of [`RepetitionLevelDecoder`] for `[i16]`
+pub struct RepetitionLevelDecoderImpl {
+ decoder: Option<LevelDecoder>,
+ bit_width: u8,
+ buffer: Box<[i16; REPETITION_LEVELS_BATCH_SIZE]>,
+ buffer_len: usize,
+ buffer_offset: usize,
+ has_partial: bool,
+}
+
+impl RepetitionLevelDecoderImpl {
+ pub fn new(max_level: i16) -> Self {
+ let bit_width = num_required_bits(max_level as u64);
+ Self {
+ decoder: None,
+ bit_width,
+ buffer: Box::new([0; REPETITION_LEVELS_BATCH_SIZE]),
+ buffer_offset: 0,
+ buffer_len: 0,
+ has_partial: false,
+ }
+ }
+
+ fn fill_buf(&mut self) -> Result<()> {
+ let read = self.decoder.as_mut().unwrap().read(self.buffer.as_mut())?;
+ self.buffer_offset = 0;
+ self.buffer_len = read;
+ Ok(())
+ }
+
+ /// Inspects the buffered repetition levels in the range `self.buffer_offset..self.buffer_len`
+ /// and returns the number of "complete" records along with the corresponding number of values
+ ///
+ /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+ fn count_records(
&mut self,
- num_records: usize,
+ records_to_read: usize,
num_levels: usize,
- ) -> Result<(usize, usize)> {
- let mut level_skip = 0;
- let mut record_skip = 0;
+ ) -> (bool, usize, usize) {
+ let mut records_read = 0;
- while level_skip < num_levels {
- let remaining_levels = num_levels - level_skip;
+ let levels = num_levels.min(self.buffer_len - self.buffer_offset);
+ let buf = self.buffer.iter().skip(self.buffer_offset);
+ for (idx, item) in buf.take(levels).enumerate() {
+ if *item == 0 && (idx != 0 || self.has_partial) {
+ records_read += 1;
- if self.buffer.is_empty() {
- // Only read number of needed values
- self.read_to_buffer(remaining_levels.min(SKIP_BUFFER_SIZE))?;
- if self.buffer.is_empty() {
- // Reached end of page
- break;
+ if records_read == records_to_read {
+ return (false, records_read, idx);
}
}
+ }
+ // Either ran out of space in `num_levels` or data in `self.buffer`
+ (true, records_read, levels)
+ }
+}
+
+impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
+ type Slice = [i16];
- let max_skip = self.buffer.len().min(remaining_levels);
+ fn set_data(&mut self, encoding: Encoding, data: ByteBufferPtr) {
+ self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+ self.buffer_len = 0;
+ self.buffer_offset = 0;
+ }
+}
- let mut to_skip = 0;
- while to_skip < max_skip && record_skip != num_records {
- if self.buffer[to_skip] == 0 {
- record_skip += 1;
+impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
+ fn read_rep_levels(
+ &mut self,
+ out: &mut Self::Slice,
+ range: Range<usize>,
+ max_records: usize,
+ ) -> Result<(usize, usize)> {
+ let output = &mut out[range];
+ let max_levels = output.len();
+ let mut total_records_read = 0;
+ let mut total_levels_read = 0;
+
+ while total_records_read < max_records && total_levels_read < max_levels {
+ if self.buffer_len == self.buffer_offset {
+ self.fill_buf()?;
+ if self.buffer_len == 0 {
+ break;
}
- to_skip += 1;
}
- // Find end of record
- while to_skip < max_skip && self.buffer[to_skip] != 0 {
- to_skip += 1;
- }
+ let (partial, records_read, levels_read) = self.count_records(
+ max_records - total_records_read,
+ max_levels - total_levels_read,
+ );
- level_skip += to_skip;
- if to_skip == self.buffer.len() {
- // Need to to read more values
- self.buffer.clear();
- continue;
- }
+ output[total_levels_read..total_levels_read + levels_read].copy_from_slice(
+ &self.buffer[self.buffer_offset..self.buffer_offset + levels_read],
+ );
- self.split_off_buffer(to_skip);
- break;
+ total_levels_read += levels_read;
+ total_records_read += records_read;
+ self.buffer_offset += levels_read;
+ self.has_partial = partial;
}
+ Ok((total_records_read, total_levels_read))
+ }
+
+ fn skip_rep_levels(
+ &mut self,
+ num_records: usize,
+ num_levels: usize,
+ ) -> Result<(usize, usize)> {
+ let mut total_records_read = 0;
+ let mut total_levels_read = 0;
- Ok((record_skip, level_skip))
+ while total_records_read < num_records && total_levels_read < num_levels {
+ if self.buffer_len == self.buffer_offset {
+ self.fill_buf()?;
+ if self.buffer_len == 0 {
+ break;
+ }
+ }
+
+ let (partial, records_read, levels_read) = self.count_records(
+ num_records - total_records_read,
+ num_levels - total_levels_read,
+ );
+
+ total_levels_read += levels_read;
+ total_records_read += records_read;
+ self.buffer_offset += levels_read;
+ self.has_partial = partial;
+ }
+ Ok((total_records_read, total_levels_read))
}
}
@@ -455,35 +527,6 @@ mod tests {
use crate::encodings::rle::RleEncoder;
use rand::prelude::*;
- fn test_skip_levels<F>(encoded: &[i16], data: ByteBufferPtr, skip: F)
- where
- F: Fn(&mut ColumnLevelDecoderImpl, &mut usize, usize),
- {
- let mut rng = thread_rng();
- let mut decoder = ColumnLevelDecoderImpl::new(5);
- decoder.set_data(Encoding::RLE, data);
-
- let mut read = 0;
- let mut decoded = vec![];
- let mut expected = vec![];
- while read < encoded.len() {
- let to_read = rng.gen_range(0..(encoded.len() - read).min(100)) + 1;
-
- if rng.gen_bool(0.5) {
- skip(&mut decoder, &mut read, to_read)
- } else {
- let start = decoded.len();
- let end = decoded.len() + to_read;
- decoded.resize(end, 0);
- let actual_read = decoder.read(&mut decoded, start..end).unwrap();
- assert_eq!(actual_read, to_read);
- expected.extend_from_slice(&encoded[read..read + to_read]);
- read += to_read;
- }
- }
- assert_eq!(decoded, expected);
- }
-
#[test]
fn test_skip_padding() {
let mut encoder = RleEncoder::new(1, 1024);
@@ -491,67 +534,67 @@ mod tests {
(0..3).for_each(|_| encoder.put(1));
let data = ByteBufferPtr::new(encoder.consume());
- let mut decoder = ColumnLevelDecoderImpl::new(1);
+ let mut decoder = RepetitionLevelDecoderImpl::new(1);
decoder.set_data(Encoding::RLE, data.clone());
- let (records, levels) = decoder.skip_rep_levels(100, 4).unwrap();
- assert_eq!(records, 1);
+ let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
assert_eq!(levels, 4);
// The length of the final bit packed run is ambiguous, so without the correct
// levels limit, it will decode zero padding
- let mut decoder = ColumnLevelDecoderImpl::new(1);
+ let mut decoder = RepetitionLevelDecoderImpl::new(1);
decoder.set_data(Encoding::RLE, data);
- let (records, levels) = decoder.skip_rep_levels(100, 6).unwrap();
- assert_eq!(records, 3);
+ let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
assert_eq!(levels, 6);
}
#[test]
- fn test_skip() {
+ fn test_skip_rep_levels() {
for _ in 0..10 {
let mut rng = thread_rng();
let total_len = 10000_usize;
- let encoded: Vec<i16> = (0..total_len).map(|_| rng.gen_range(0..5)).collect();
+ let mut encoded: Vec<i16> =
+ (0..total_len).map(|_| rng.gen_range(0..5)).collect();
+ encoded[0] = 0;
let mut encoder = RleEncoder::new(3, 1024);
for v in &encoded {
encoder.put(*v as _)
}
let data = ByteBufferPtr::new(encoder.consume());
- test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
- let (values_skipped, levels_skipped) =
- decoder.skip_def_levels(to_read, 5).unwrap();
- assert_eq!(levels_skipped, to_read);
-
- let expected = &encoded[*read..*read + to_read];
- let expected_values_skipped =
- expected.iter().filter(|x| **x == 5).count();
- assert_eq!(values_skipped, expected_values_skipped);
- *read += to_read;
- });
-
- test_skip_levels(&encoded, data.clone(), |decoder, read, to_read| {
- let remaining_levels = total_len - *read;
- let (records_skipped, levels_skipped) =
- decoder.skip_rep_levels(to_read, remaining_levels).unwrap();
-
- assert!(levels_skipped <= remaining_levels);
-
- // If not run out of values
- if levels_skipped + *read != encoded.len() {
- // Should have read correct number of records
- assert_eq!(records_skipped, to_read);
- // Next value should be start of record
- assert_eq!(encoded[levels_skipped + *read], 0);
+ let mut decoder = RepetitionLevelDecoderImpl::new(5);
+ decoder.set_data(Encoding::RLE, data);
+
+ let total_records = encoded.iter().filter(|x| **x == 0).count();
+ let mut remaining_records = total_records;
+ let mut remaining_levels = encoded.len();
+ loop {
+ let skip = rng.gen_bool(0.5);
+ let records = rng.gen_range(1..=remaining_records.min(5));
+ let (records_read, levels_read) = if skip {
+ decoder.skip_rep_levels(records, remaining_levels).unwrap()
+ } else {
+ let mut decoded = vec![0; remaining_levels];
+ let (records_read, levels_read) = decoder
+ .read_rep_levels(&mut decoded, 0..remaining_levels, records)
+ .unwrap();
+
+ assert_eq!(
+ decoded[..levels_read],
+ encoded[encoded.len() - remaining_levels..][..levels_read]
+ );
+ (records_read, levels_read)
+ };
+
+ remaining_levels = remaining_levels.checked_sub(levels_read).unwrap();
+ if remaining_levels == 0 {
+ assert_eq!(records_read + 1, records);
+ assert_eq!(records, remaining_records);
+ break;
}
-
- let expected = &encoded[*read..*read + levels_skipped];
- let expected_records_skipped =
- expected.iter().filter(|x| **x == 0).count();
- assert_eq!(records_skipped, expected_records_skipped);
-
- *read += levels_skipped;
- });
+ assert_eq!(records_read, records);
+ remaining_records -= records;
+ assert_ne!(remaining_records, 0);
+ }
}
}
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index fc5e29b03..93dff1b46 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -2332,7 +2332,7 @@ mod tests {
let mut actual_def_levels = def_levels.map(|_| vec![0i16; max_batch_size]);
let mut actual_rep_levels = rep_levels.map(|_| vec![0i16; max_batch_size]);
- let (values_read, levels_read) = read_fully(
+ let (_, values_read, levels_read) = read_fully(
reader,
max_batch_size,
actual_def_levels.as_mut(),
@@ -2409,11 +2409,11 @@ mod tests {
mut def_levels: Option<&mut Vec<i16>>,
mut rep_levels: Option<&mut Vec<i16>>,
values: &mut [T::T],
- ) -> (usize, usize) {
+ ) -> (usize, usize, usize) {
let actual_def_levels = def_levels.as_mut().map(|vec| &mut vec[..]);
let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut vec[..]);
reader
- .read_batch(batch_size, actual_def_levels, actual_rep_levels, values)
+ .read_records(batch_size, actual_def_levels, actual_rep_levels, values)
.unwrap()
}
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index defdaad32..15240e33c 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1632,12 +1632,12 @@ mod tests {
let mut out = [0; 4];
let c1 = row_group.get_column_reader(0).unwrap();
let mut c1 = get_typed_column_reader::<Int32Type>(c1);
- c1.read_batch(4, None, None, &mut out).unwrap();
+ c1.read_records(4, None, None, &mut out).unwrap();
assert_eq!(out, column_data[0]);
let c2 = row_group.get_column_reader(1).unwrap();
let mut c2 = get_typed_column_reader::<Int32Type>(c2);
- c2.read_batch(4, None, None, &mut out).unwrap();
+ c2.read_records(4, None, None, &mut out).unwrap();
assert_eq!(out, column_data[1]);
};
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 14a4a3945..67c407b3a 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -295,8 +295,11 @@ impl<T: DataType> TypedTripletIter<T> {
fn read_next(&mut self) -> Result<bool> {
self.curr_triplet_index += 1;
- if self.curr_triplet_index >= self.triplets_left {
- let (values_read, levels_read) = {
+ // A loop is required to handle the case of a batch size of 1, as in such a case
+ // on reaching the end of a record, read_records will return `Ok((1, 0, 0))`
+ // and therefore not advance `self.triplets_left`
+ while self.curr_triplet_index >= self.triplets_left {
+ let (records_read, values_read, levels_read) = {
// Get slice of definition levels, if available
let def_levels = self.def_levels.as_mut().map(|vec| &mut vec[..]);
@@ -304,7 +307,7 @@ impl<T: DataType> TypedTripletIter<T> {
let rep_levels = self.rep_levels.as_mut().map(|vec| &mut vec[..]);
// Buffer triplets
- self.reader.read_batch(
+ self.reader.read_records(
self.batch_size,
def_levels,
rep_levels,
@@ -313,7 +316,7 @@ impl<T: DataType> TypedTripletIter<T> {
};
// No more values or levels to read
- if values_read == 0 && levels_read == 0 {
+ if records_read == 0 && values_read == 0 && levels_read == 0 {
self.has_next = false;
return Ok(false);
}