You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/08 09:32:01 UTC

[arrow-rs] branch master updated: Combine multiple selections into the same batch size in skip_records (#2359)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ce2bd1efe Combine multiple selections into the same batch size in skip_records (#2359)
ce2bd1efe is described below

commit ce2bd1efe61b0ac755e64021d99f6169203c8e54
Author: Yang Jiang <ji...@163.com>
AuthorDate: Mon Aug 8 17:31:57 2022 +0800

    Combine multiple selections into the same batch size in skip_records (#2359)
    
    * Combine multiple selections into the same batch size in skip_records
    
    * Apply suggestions from code review
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 parquet/src/arrow/arrow_reader.rs      | 86 ++++++++++++++++++----------------
 parquet/src/arrow/record_reader/mod.rs |  6 ---
 2 files changed, 46 insertions(+), 46 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 1f53c2abc..594b41676 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -287,43 +287,55 @@ impl Iterator for ParquetRecordBatchReader {
     type Item = ArrowResult<RecordBatch>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        let to_read = match self.selection.as_mut() {
-            Some(selection) => loop {
-                let front = selection.pop_front()?;
-                if front.skip {
-                    let skipped = match self.array_reader.skip_records(front.row_count) {
-                        Ok(skipped) => skipped,
-                        Err(e) => return Some(Err(e.into())),
-                    };
-
-                    if skipped != front.row_count {
-                        return Some(Err(general_err!(
-                            "failed to skip rows, expected {}, got {}",
-                            front.row_count,
-                            skipped
-                        )
-                        .into()));
+        let mut read_records = 0;
+        match self.selection.as_mut() {
+            Some(selection) => {
+                while read_records < self.batch_size && !selection.is_empty() {
+                    let front = selection.pop_front().unwrap();
+                    if front.skip {
+                        let skipped =
+                            match self.array_reader.skip_records(front.row_count) {
+                                Ok(skipped) => skipped,
+                                Err(e) => return Some(Err(e.into())),
+                            };
+
+                        if skipped != front.row_count {
+                            return Some(Err(general_err!(
+                                "failed to skip rows, expected {}, got {}",
+                                front.row_count,
+                                skipped
+                            )
+                            .into()));
+                        }
+                        continue;
                     }
-                    continue;
-                }
 
-                // try to read record
-                let to_read = match front.row_count.checked_sub(self.batch_size) {
-                    Some(remaining) if remaining != 0 => {
-                        // if page row count less than batch_size we must set batch size to page row count.
-                        // add check avoid dead loop
-                        selection.push_front(RowSelection::select(remaining));
-                        self.batch_size
+                    // try to read record
+                    let need_read = self.batch_size - read_records;
+                    let to_read = match front.row_count.checked_sub(need_read) {
+                        Some(remaining) if remaining != 0 => {
+                            // if page row count less than batch_size we must set batch size to page row count.
+                            // add check avoid dead loop
+                            selection.push_front(RowSelection::select(remaining));
+                            need_read
+                        }
+                        _ => front.row_count,
+                    };
+                    match self.array_reader.read_records(to_read) {
+                        Ok(0) => break,
+                        Ok(rec) => read_records += rec,
+                        Err(error) => return Some(Err(error.into())),
                     }
-                    _ => front.row_count,
-                };
-
-                break to_read;
-            },
-            None => self.batch_size,
+                }
+            }
+            None => {
+                if let Err(error) = self.array_reader.read_records(self.batch_size) {
+                    return Some(Err(error.into()));
+                }
+            }
         };
 
-        match self.array_reader.next_batch(to_read) {
+        match self.array_reader.consume_batch() {
             Err(error) => Some(Err(error.into())),
             Ok(array) => {
                 let struct_array =
@@ -1212,15 +1224,9 @@ mod tests {
         loop {
             let maybe_batch = record_reader.next();
             if total_read < expected_data.len() {
-                let mut end =
-                    min(total_read + opts.record_batch_size, expected_data.len());
+                let end = min(total_read + opts.record_batch_size, expected_data.len());
                 let batch = maybe_batch.unwrap().unwrap();
-                //TODO remove this after implement https://github.com/apache/arrow-rs/issues/2197
-                if opts.row_selections.is_none() {
-                    assert_eq!(end - total_read, batch.num_rows());
-                } else {
-                    end = end.min(total_read + batch.num_rows())
-                }
+                assert_eq!(end - total_read, batch.num_rows());
 
                 let mut data = vec![];
                 data.extend_from_slice(&expected_data[total_read..end]);
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index b68f59d51..60fe4cdc9 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -198,12 +198,6 @@ where
         self.num_records += buffered_records;
         self.num_values += buffered_values;
 
-        self.consume_def_levels();
-        self.consume_rep_levels();
-        self.consume_record_data();
-        self.consume_bitmap();
-        self.reset();
-
         let remaining = num_records - buffered_records;
 
         if remaining == 0 {