You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by su...@apache.org on 2021/12/13 21:44:53 UTC

[arrow-rs] branch master updated: Simplify parquet arror `RecordReader` (#1021)

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 07660c6  Simplify parquet arror `RecordReader` (#1021)
07660c6 is described below

commit 07660c61680220ac54b7bf4c42a64c840872cc43
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 13 21:44:47 2021 +0000

    Simplify parquet arror `RecordReader` (#1021)
---
 parquet/src/arrow/record_reader.rs | 73 +++++++++++++++++---------------------
 1 file changed, 33 insertions(+), 40 deletions(-)

diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs
index 4dd7da9..a5c0b47 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -43,10 +43,8 @@ pub struct RecordReader<T: DataType> {
     /// Number of values `num_records` contains.
     num_values: usize,
 
-    values_seen: usize,
     /// Starts from 1, number of values have been written to buffer
     values_written: usize,
-    in_middle_of_record: bool,
 }
 
 impl<T: DataType> RecordReader<T> {
@@ -75,9 +73,7 @@ impl<T: DataType> RecordReader<T> {
             column_desc: column_schema,
             num_records: 0,
             num_values: 0,
-            values_seen: 0,
             values_written: 0,
-            in_middle_of_record: false,
         }
     }
 
@@ -107,21 +103,25 @@ impl<T: DataType> RecordReader<T> {
         loop {
             // Try to find some records from buffers that has been read into memory
             // but not counted as seen records.
-            records_read += self.split_records(num_records - records_read)?;
-
-            // Since page reader contains complete records, so if we reached end of a
-            // page reader, we should reach the end of a record
-            if end_of_column
-                && self.values_seen >= self.values_written
-                && self.in_middle_of_record
-            {
-                self.num_records += 1;
-                self.num_values = self.values_seen;
-                self.in_middle_of_record = false;
-                records_read += 1;
+            let (record_count, value_count) =
+                self.count_records(num_records - records_read);
+
+            self.num_records += record_count;
+            self.num_values += value_count;
+            records_read += record_count;
+
+            if records_read == num_records {
+                break;
             }
 
-            if (records_read >= num_records) || end_of_column {
+            if end_of_column {
+                // Since page reader contains complete records, if we reached end of a
+                // page reader, we should reach the end of a record
+                if self.rep_levels.is_some() {
+                    self.num_records += 1;
+                    self.num_values = self.values_written;
+                    records_read += 1;
+                }
                 break;
             }
 
@@ -265,8 +265,6 @@ impl<T: DataType> RecordReader<T> {
         self.values_written -= self.num_values;
         self.num_records = 0;
         self.num_values = 0;
-        self.values_seen = 0;
-        self.in_middle_of_record = false;
     }
 
     /// Returns bitmap data.
@@ -367,10 +365,11 @@ impl<T: DataType> RecordReader<T> {
         Ok(values_read)
     }
 
-    /// Split values into records according repetition definition and returns number of
-    /// records read.
-    #[allow(clippy::unnecessary_wraps)]
-    fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
+    /// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
+    /// and returns the number of "complete" records along with the corresponding number of values
+    ///
+    /// A "complete" record is one where the buffer contains a subsequent repetition level of 0
+    fn count_records(&self, records_to_read: usize) -> (usize, usize) {
         let rep_levels = self.rep_levels.as_ref().map(|buf| {
             let (prefix, rep_levels, suffix) =
                 unsafe { buf.as_slice().align_to::<i16>() };
@@ -381,32 +380,26 @@ impl<T: DataType> RecordReader<T> {
         match rep_levels {
             Some(buf) => {
                 let mut records_read = 0;
+                let mut end_of_last_record = self.num_values;
+
+                for current in self.num_values..self.values_written {
+                    if buf[current] == 0 && current != self.num_values {
+                        records_read += 1;
+                        end_of_last_record = current;
 
-                while (self.values_seen < self.values_written)
-                    && (records_read < records_to_read)
-                {
-                    if buf[self.values_seen] == 0 {
-                        if self.in_middle_of_record {
-                            records_read += 1;
-                            self.num_records += 1;
-                            self.num_values = self.values_seen;
+                        if records_read == records_to_read {
+                            break;
                         }
-                        self.in_middle_of_record = true;
                     }
-                    self.values_seen += 1;
                 }
 
-                Ok(records_read)
+                (records_read, end_of_last_record - self.num_values)
             }
             None => {
                 let records_read =
-                    min(records_to_read, self.values_written - self.values_seen);
-                self.num_records += records_read;
-                self.num_values += records_read;
-                self.values_seen += records_read;
-                self.in_middle_of_record = false;
+                    min(records_to_read, self.values_written - self.num_values);
 
-                Ok(records_read)
+                (records_read, records_read)
             }
         }
     }