You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/08 09:32:01 UTC
[arrow-rs] branch master updated: Combine multiple selections into the same batch size in skip_records (#2359)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ce2bd1efe Combine multiple selections into the same batch size in skip_records (#2359)
ce2bd1efe is described below
commit ce2bd1efe61b0ac755e64021d99f6169203c8e54
Author: Yang Jiang <ji...@163.com>
AuthorDate: Mon Aug 8 17:31:57 2022 +0800
Combine multiple selections into the same batch size in skip_records (#2359)
* Combine multiple selections into the same batch size in skip_records
* Apply suggestions from code review
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
parquet/src/arrow/arrow_reader.rs | 86 ++++++++++++++++++----------------
parquet/src/arrow/record_reader/mod.rs | 6 ---
2 files changed, 46 insertions(+), 46 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 1f53c2abc..594b41676 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -287,43 +287,55 @@ impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
- let to_read = match self.selection.as_mut() {
- Some(selection) => loop {
- let front = selection.pop_front()?;
- if front.skip {
- let skipped = match self.array_reader.skip_records(front.row_count) {
- Ok(skipped) => skipped,
- Err(e) => return Some(Err(e.into())),
- };
-
- if skipped != front.row_count {
- return Some(Err(general_err!(
- "failed to skip rows, expected {}, got {}",
- front.row_count,
- skipped
- )
- .into()));
+ let mut read_records = 0;
+ match self.selection.as_mut() {
+ Some(selection) => {
+ while read_records < self.batch_size && !selection.is_empty() {
+ let front = selection.pop_front().unwrap();
+ if front.skip {
+ let skipped =
+ match self.array_reader.skip_records(front.row_count) {
+ Ok(skipped) => skipped,
+ Err(e) => return Some(Err(e.into())),
+ };
+
+ if skipped != front.row_count {
+ return Some(Err(general_err!(
+ "failed to skip rows, expected {}, got {}",
+ front.row_count,
+ skipped
+ )
+ .into()));
+ }
+ continue;
}
- continue;
- }
- // try to read record
- let to_read = match front.row_count.checked_sub(self.batch_size) {
- Some(remaining) if remaining != 0 => {
- // if page row count less than batch_size we must set batch size to page row count.
- // add check avoid dead loop
- selection.push_front(RowSelection::select(remaining));
- self.batch_size
+ // try to read record
+ let need_read = self.batch_size - read_records;
+ let to_read = match front.row_count.checked_sub(need_read) {
+ Some(remaining) if remaining != 0 => {
+ // if page row count less than batch_size we must set batch size to page row count.
+ // add check avoid dead loop
+ selection.push_front(RowSelection::select(remaining));
+ need_read
+ }
+ _ => front.row_count,
+ };
+ match self.array_reader.read_records(to_read) {
+ Ok(0) => break,
+ Ok(rec) => read_records += rec,
+ Err(error) => return Some(Err(error.into())),
}
- _ => front.row_count,
- };
-
- break to_read;
- },
- None => self.batch_size,
+ }
+ }
+ None => {
+ if let Err(error) = self.array_reader.read_records(self.batch_size) {
+ return Some(Err(error.into()));
+ }
+ }
};
- match self.array_reader.next_batch(to_read) {
+ match self.array_reader.consume_batch() {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array =
@@ -1212,15 +1224,9 @@ mod tests {
loop {
let maybe_batch = record_reader.next();
if total_read < expected_data.len() {
- let mut end =
- min(total_read + opts.record_batch_size, expected_data.len());
+ let end = min(total_read + opts.record_batch_size, expected_data.len());
let batch = maybe_batch.unwrap().unwrap();
- //TODO remove this after implement https://github.com/apache/arrow-rs/issues/2197
- if opts.row_selections.is_none() {
- assert_eq!(end - total_read, batch.num_rows());
- } else {
- end = end.min(total_read + batch.num_rows())
- }
+ assert_eq!(end - total_read, batch.num_rows());
let mut data = vec![];
data.extend_from_slice(&expected_data[total_read..end]);
diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs
index b68f59d51..60fe4cdc9 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -198,12 +198,6 @@ where
self.num_records += buffered_records;
self.num_values += buffered_values;
- self.consume_def_levels();
- self.consume_rep_levels();
- self.consume_record_data();
- self.consume_bitmap();
- self.reset();
-
let remaining = num_records - buffered_records;
if remaining == 0 {