You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by su...@apache.org on 2021/12/13 21:44:53 UTC
[arrow-rs] branch master updated: Simplify parquet arror `RecordReader` (#1021)
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 07660c6 Simplify parquet arror `RecordReader` (#1021)
07660c6 is described below
commit 07660c61680220ac54b7bf4c42a64c840872cc43
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 13 21:44:47 2021 +0000
Simplify parquet arror `RecordReader` (#1021)
---
parquet/src/arrow/record_reader.rs | 73 +++++++++++++++++---------------------
1 file changed, 33 insertions(+), 40 deletions(-)
diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs
index 4dd7da9..a5c0b47 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -43,10 +43,8 @@ pub struct RecordReader<T: DataType> {
/// Number of values `num_records` contains.
num_values: usize,
- values_seen: usize,
/// Starts from 1, number of values have been written to buffer
values_written: usize,
- in_middle_of_record: bool,
}
impl<T: DataType> RecordReader<T> {
@@ -75,9 +73,7 @@ impl<T: DataType> RecordReader<T> {
column_desc: column_schema,
num_records: 0,
num_values: 0,
- values_seen: 0,
values_written: 0,
- in_middle_of_record: false,
}
}
@@ -107,21 +103,25 @@ impl<T: DataType> RecordReader<T> {
loop {
// Try to find some records from buffers that has been read into memory
// but not counted as seen records.
- records_read += self.split_records(num_records - records_read)?;
-
- // Since page reader contains complete records, so if we reached end of a
- // page reader, we should reach the end of a record
- if end_of_column
- && self.values_seen >= self.values_written
- && self.in_middle_of_record
- {
- self.num_records += 1;
- self.num_values = self.values_seen;
- self.in_middle_of_record = false;
- records_read += 1;
+ let (record_count, value_count) =
+ self.count_records(num_records - records_read);
+
+ self.num_records += record_count;
+ self.num_values += value_count;
+ records_read += record_count;
+
+ if records_read == num_records {
+ break;
}
- if (records_read >= num_records) || end_of_column {
+ if end_of_column {
+ // Since page reader contains complete records, if we reached end of a
+ // page reader, we should reach the end of a record
+ if self.rep_levels.is_some() {
+ self.num_records += 1;
+ self.num_values = self.values_written;
+ records_read += 1;
+ }
break;
}
@@ -265,8 +265,6 @@ impl<T: DataType> RecordReader<T> {
self.values_written -= self.num_values;
self.num_records = 0;
self.num_values = 0;
- self.values_seen = 0;
- self.in_middle_of_record = false;
}
/// Returns bitmap data.
@@ -367,10 +365,11 @@ impl<T: DataType> RecordReader<T> {
Ok(values_read)
}
- /// Split values into records according repetition definition and returns number of
- /// records read.
- #[allow(clippy::unnecessary_wraps)]
- fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
+ /// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
+ /// and returns the number of "complete" records along with the corresponding number of values
+ ///
+ /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+ fn count_records(&self, records_to_read: usize) -> (usize, usize) {
let rep_levels = self.rep_levels.as_ref().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.as_slice().align_to::<i16>() };
@@ -381,32 +380,26 @@ impl<T: DataType> RecordReader<T> {
match rep_levels {
Some(buf) => {
let mut records_read = 0;
+ let mut end_of_last_record = self.num_values;
+
+ for current in self.num_values..self.values_written {
+ if buf[current] == 0 && current != self.num_values {
+ records_read += 1;
+ end_of_last_record = current;
- while (self.values_seen < self.values_written)
- && (records_read < records_to_read)
- {
- if buf[self.values_seen] == 0 {
- if self.in_middle_of_record {
- records_read += 1;
- self.num_records += 1;
- self.num_values = self.values_seen;
+ if records_read == records_to_read {
+ break;
}
- self.in_middle_of_record = true;
}
- self.values_seen += 1;
}
- Ok(records_read)
+ (records_read, end_of_last_record - self.num_values)
}
None => {
let records_read =
- min(records_to_read, self.values_written - self.values_seen);
- self.num_records += records_read;
- self.num_values += records_read;
- self.values_seen += records_read;
- self.in_middle_of_record = false;
+ min(records_to_read, self.values_written - self.num_values);
- Ok(records_read)
+ (records_read, records_read)
}
}
}