You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/06/04 09:59:10 UTC

[arrow-datafusion] branch main updated: Remove `avro_to_arrow::reader::Reader::next` in favor of Iterator implementation. (#6538)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new aae7ec3bdb Remove `avro_to_arrow::reader::Reader::next` in favor of Iterator implementation. (#6538)
aae7ec3bdb is described below

commit aae7ec3bdb64bf0346249ccb9e44abdc29880904
Author: Louis Gariépy <68...@users.noreply.github.com>
AuthorDate: Sun Jun 4 05:59:03 2023 -0400

    Remove `avro_to_arrow::reader::Reader::next` in favor of Iterator implementation. (#6538)
    
    Co-authored-by: Louis Gariépy <lo...@gmail.com>
---
 .../core/src/avro_to_arrow/arrow_array_reader.rs   | 50 ++++++++++------------
 datafusion/core/src/avro_to_arrow/reader.rs        | 19 ++------
 2 files changed, 25 insertions(+), 44 deletions(-)

diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index 921f2150e8..311e199f28 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -88,8 +88,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
     }
 
     /// Read the next batch of records
-    pub fn next_batch(&mut self, batch_size: usize) -> ArrowResult<Option<RecordBatch>> {
-        let rows = self
+    pub fn next_batch(&mut self, batch_size: usize) -> Option<ArrowResult<RecordBatch>> {
+        let rows_result = self
             .reader
             .by_ref()
             .take(batch_size)
@@ -102,15 +102,19 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     "Row needs to be of type object, got: {other:?}"
                 ))),
             })
-            .collect::<ArrowResult<Vec<Vec<(String, Value)>>>>()?;
-        if rows.is_empty() {
-            // reached end of file
-            return Ok(None);
-        }
+            .collect::<ArrowResult<Vec<Vec<(String, Value)>>>>();
+
+        let rows = match rows_result {
+            // Return error early
+            Err(e) => return Some(Err(e)),
+            // No rows: return None early
+            Ok(rows) if rows.is_empty() => return None,
+            Ok(rows) => rows,
+        };
+
         let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
         let projection = self.projection.clone().unwrap_or_default();
-        let arrays =
-            self.build_struct_array(rows.as_slice(), self.schema.fields(), &projection);
+        let arrays = self.build_struct_array(&rows, self.schema.fields(), &projection);
         let projected_fields = if projection.is_empty() {
             self.schema.fields().clone()
         } else {
@@ -121,7 +125,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                 .collect()
         };
         let projected_schema = Arc::new(Schema::new(projected_fields));
-        arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
+        Some(arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)))
     }
 
     fn build_boolean_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef {
@@ -416,14 +420,13 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
         let num_list_bytes = bit_util::ceil(list_len, 8);
         let mut offsets = Vec::with_capacity(list_len + 1);
         let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
-        let list_nulls = list_nulls.as_slice_mut();
         offsets.push(cur_offset);
         rows.iter().enumerate().for_each(|(i, v)| {
             // TODO: unboxing Union(Array(Union(...))) should probably be done earlier
             let v = maybe_resolve_union(v);
             if let Value::Array(a) = v {
                 cur_offset += OffsetSize::from_usize(a.len()).unwrap();
-                bit_util::set_bit(list_nulls, i);
+                bit_util::set_bit(&mut list_nulls, i);
             } else if let Value::Null = v {
                 // value is null, not incremented
             } else {
@@ -446,17 +449,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                             if let Value::Boolean(child) = value {
                                 // if valid boolean, append value
                                 if *child {
-                                    bit_util::set_bit(
-                                        bool_values.as_slice_mut(),
-                                        curr_index,
-                                    );
+                                    bit_util::set_bit(&mut bool_values, curr_index);
                                 }
                             } else {
                                 // null slot
-                                bit_util::unset_bit(
-                                    bool_nulls.as_slice_mut(),
-                                    curr_index,
-                                );
+                                bit_util::unset_bit(&mut bool_nulls, curr_index);
                             }
                             curr_index += 1;
                         });
@@ -526,10 +523,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     .map(|row| {
                         if let Value::Array(values) = row {
                             values.iter().for_each(|_| {
-                                bit_util::set_bit(
-                                    null_buffer.as_slice_mut(),
-                                    struct_index,
-                                );
+                                bit_util::set_bit(&mut null_buffer, struct_index);
                                 struct_index += 1;
                             });
                             values
@@ -543,7 +537,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                     })
                     .collect();
                 let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
-                let arrays = self.build_struct_array(rows.as_slice(), fields, &[])?;
+                let arrays = self.build_struct_array(&rows, fields, &[])?;
                 let data_type = DataType::Struct(fields.clone());
                 ArrayDataBuilder::new(data_type)
                     .len(rows.len())
@@ -719,7 +713,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                                     })
                                     .collect::<Vec<&Value>>();
                                 self.build_nested_list_array::<i32>(
-                                    extracted_rows.as_slice(),
+                                    &extracted_rows,
                                     list_field,
                                 )?
                             }
@@ -742,7 +736,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                             .map(|(i, row)| (i, self.field_lookup(field.name(), row)))
                             .map(|(i, v)| {
                                 if let Some(Value::Record(value)) = v {
-                                    bit_util::set_bit(null_buffer.as_slice_mut(), i);
+                                    bit_util::set_bit(&mut null_buffer, i);
                                     value
                                 } else {
                                     panic!("expected struct got {v:?}");
@@ -750,7 +744,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
                             })
                             .collect::<Vec<&Vec<(String, Value)>>>();
                         let arrays =
-                            self.build_struct_array(struct_rows.as_slice(), fields, &[])?;
+                            self.build_struct_array(&struct_rows, fields, &[])?;
                         // construct a struct array's data in order to set null buffer
                         let data_type = DataType::Struct(fields.clone());
                         let data = ArrayDataBuilder::new(data_type)
diff --git a/datafusion/core/src/avro_to_arrow/reader.rs b/datafusion/core/src/avro_to_arrow/reader.rs
index ddf8cd6982..c5dab22a2d 100644
--- a/datafusion/core/src/avro_to_arrow/reader.rs
+++ b/datafusion/core/src/avro_to_arrow/reader.rs
@@ -151,28 +151,15 @@ impl<'a, R: Read> Reader<'a, R> {
     pub fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
-
-    /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there
-    /// are no more results.
-    //
-    // TODO(clippy): The clippy `allow` could be removed by renaming this method to `next_batch`.
-    // This would also make the intent of the method clearer.
-    //
-    // Another option could be to rework `AvroArrowArrayReader::next_batch` so it returns an
-    // `Option<ArrowResult<RecordBatch>>` instead of a  `ArrowResult<Option<RecordBatch>>`.
-    // This would make it possible to remove this method entirely and move its body into the
-    // `Iterator` implementation.
-    #[allow(clippy::should_implement_trait)]
-    pub fn next(&mut self) -> ArrowResult<Option<RecordBatch>> {
-        self.array_reader.next_batch(self.batch_size)
-    }
 }
 
 impl<'a, R: Read> Iterator for Reader<'a, R> {
     type Item = ArrowResult<RecordBatch>;
 
+    /// Returns the next batch of results (defined by `self.batch_size`), or `None` if there
+    /// are no more results.
     fn next(&mut self) -> Option<Self::Item> {
-        self.next().transpose()
+        self.array_reader.next_batch(self.batch_size)
     }
 }