You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/04 09:59:10 UTC
[arrow-datafusion] branch main updated: Remove `avro_to_arrow::reader::Reader::next` in favor of Iterator implementation. (#6538)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new aae7ec3bdb Remove `avro_to_arrow::reader::Reader::next` in favor of Iterator implementation. (#6538)
aae7ec3bdb is described below
commit aae7ec3bdb64bf0346249ccb9e44abdc29880904
Author: Louis Gariépy <68...@users.noreply.github.com>
AuthorDate: Sun Jun 4 05:59:03 2023 -0400
Remove `avro_to_arrow::reader::Reader::next` in favor of Iterator implementation. (#6538)
Co-authored-by: Louis Gariépy <lo...@gmail.com>
---
.../core/src/avro_to_arrow/arrow_array_reader.rs | 50 ++++++++++------------
datafusion/core/src/avro_to_arrow/reader.rs | 19 ++------
2 files changed, 25 insertions(+), 44 deletions(-)
diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index 921f2150e8..311e199f28 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -88,8 +88,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
}
/// Read the next batch of records
- pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult<Option<RecordBatch>> {
- let rows = self
+ pub fn next_batch(&mut self, batch_size: usize) -> Option<ArrowResult<RecordBatch>> {
+ let rows_result = self
.reader
.by_ref()
.take(batch_size)
@@ -102,15 +102,19 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
"Row needs to be of type object, got: {other:?}"
))),
})
- .collect::<ArrowResult<Vec<Vec<(String, Value)>>>>()?;
- if rows.is_empty() {
- // reached end of file
- return Ok(None);
- }
+ .collect::<ArrowResult<Vec<Vec<(String, Value)>>>>();
+
+ let rows = match rows_result {
+ // Return error early
+ Err(e) => return Some(Err(e)),
+ // No rows: return None early
+ Ok(rows) if rows.is_empty() => return None,
+ Ok(rows) => rows,
+ };
+
let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
let projection = self.projection.clone().unwrap_or_default();
- let arrays =
- self.build_struct_array(rows.as_slice(), self.schema.fields(), &projection);
+ let arrays = self.build_struct_array(&rows, self.schema.fields(), &projection);
let projected_fields = if projection.is_empty() {
self.schema.fields().clone()
} else {
@@ -121,7 +125,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.collect()
};
let projected_schema = Arc::new(Schema::new(projected_fields));
- arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
+ Some(arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)))
}
fn build_boolean_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef {
@@ -416,14 +420,13 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let num_list_bytes = bit_util::ceil(list_len, 8);
let mut offsets = Vec::with_capacity(list_len + 1);
let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
- let list_nulls = list_nulls.as_slice_mut();
offsets.push(cur_offset);
rows.iter().enumerate().for_each(|(i, v)| {
// TODO: unboxing Union(Array(Union(...))) should probably be done earlier
let v = maybe_resolve_union(v);
if let Value::Array(a) = v {
cur_offset += OffsetSize::from_usize(a.len()).unwrap();
- bit_util::set_bit(list_nulls, i);
+ bit_util::set_bit(&mut list_nulls, i);
} else if let Value::Null = v {
// value is null, not incremented
} else {
@@ -446,17 +449,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
if let Value::Boolean(child) = value {
// if valid boolean, append value
if *child {
- bit_util::set_bit(
- bool_values.as_slice_mut(),
- curr_index,
- );
+ bit_util::set_bit(&mut bool_values, curr_index);
}
} else {
// null slot
- bit_util::unset_bit(
- bool_nulls.as_slice_mut(),
- curr_index,
- );
+ bit_util::unset_bit(&mut bool_nulls, curr_index);
}
curr_index += 1;
});
@@ -526,10 +523,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.map(|row| {
if let Value::Array(values) = row {
values.iter().for_each(|_| {
- bit_util::set_bit(
- null_buffer.as_slice_mut(),
- struct_index,
- );
+ bit_util::set_bit(&mut null_buffer, struct_index);
struct_index += 1;
});
values
@@ -543,7 +537,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
})
.collect();
let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
- let arrays = self.build_struct_array(rows.as_slice(), fields, &[])?;
+ let arrays = self.build_struct_array(&rows, fields, &[])?;
let data_type = DataType::Struct(fields.clone());
ArrayDataBuilder::new(data_type)
.len(rows.len())
@@ -719,7 +713,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
})
.collect::<Vec<&Value>>();
self.build_nested_list_array::<i32>(
- extracted_rows.as_slice(),
+ &extracted_rows,
list_field,
)?
}
@@ -742,7 +736,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.map(|(i, row)| (i, self.field_lookup(field.name(), row)))
.map(|(i, v)| {
if let Some(Value::Record(value)) = v {
- bit_util::set_bit(null_buffer.as_slice_mut(), i);
+ bit_util::set_bit(&mut null_buffer, i);
value
} else {
panic!("expected struct got {v:?}");
@@ -750,7 +744,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
})
.collect::<Vec<&Vec<(String, Value)>>>();
let arrays =
- self.build_struct_array(struct_rows.as_slice(), fields, &[])?;
+ self.build_struct_array(&struct_rows, fields, &[])?;
// construct a struct array's data in order to set null buffer
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
diff --git a/datafusion/core/src/avro_to_arrow/reader.rs b/datafusion/core/src/avro_to_arrow/reader.rs
index ddf8cd6982..c5dab22a2d 100644
--- a/datafusion/core/src/avro_to_arrow/reader.rs
+++ b/datafusion/core/src/avro_to_arrow/reader.rs
@@ -151,28 +151,15 @@ impl<'a, R: Read> Reader<'a, R> {
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
-
- /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there
- /// are no more results.
- //
- // TODO(clippy): The clippy `allow` could be removed by renaming this method to `next_batch`.
- // This would also make the intent of the method clearer.
- //
- // Another option could be to rework `AvroArrowArrayReader::next_batch` so it returns an
- // `Option<ArrowResult<RecordBatch>>` instead of a `ArrowResult<Option<RecordBatch>>`.
- // This would make it possible to remove this method entirely and move its body into the
- // `Iterator` implementation.
- #[allow(clippy::should_implement_trait)]
- pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
- self.array_reader.next_batch(self.batch_size)
- }
}
impl<'a, R: Read> Iterator for Reader<'a, R> {
type Item = ArrowResult<RecordBatch>;
+ /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there
+ /// are no more results.
fn next(&mut self) -> Option<Self::Item> {
- self.next().transpose()
+ self.array_reader.next_batch(self.batch_size)
}
}