You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/01 15:31:01 UTC

[GitHub] [arrow] alamb commented on a change in pull request #8225: ARROW-10046: [Rust] [DataFusion] Made `RecordBatchReader` implement Iterator

alamb commented on a change in pull request #8225:
URL: https://github.com/apache/arrow/pull/8225#discussion_r498325552



##########
File path: rust/arrow-flight/src/utils.rs
##########
@@ -129,21 +129,29 @@ impl TryFrom<&SchemaResult> for Schema {
 pub fn flight_data_to_arrow_batch(
     data: &FlightData,
     schema: SchemaRef,
-) -> Result<Option<RecordBatch>> {
+) -> Option<Result<RecordBatch>> {

Review comment:
       Is the reason for this change because a Rust iterator has to return an `Option`  (so it can signal none)?

##########
File path: rust/arrow/src/record_batch.rs
##########
@@ -217,15 +217,12 @@ impl Into<StructArray> for RecordBatch {
 }
 
 /// Trait for types that can read `RecordBatch`'s.
-pub trait RecordBatchReader {
+pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch>> {
     /// Returns the schema of this `RecordBatchReader`.
     ///
     /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
     /// reader should have the same schema as returned from this method.
     fn schema(&self) -> SchemaRef;
-
-    /// Reads the next `RecordBatch`.

Review comment:
       I wonder if you could leave this function in the trait (and provide a default implementation it in terms of `self.next()`) if you wanted to reduce the changes required for existing code (maybe it could be marked as deprecated?)

##########
File path: rust/arrow/src/ipc/reader.rs
##########
@@ -945,7 +959,7 @@ mod tests {
             let arrow_json = read_gzip_json(path);
             assert!(arrow_json.equals_reader(&mut reader));
             // the next batch must be empty
-            assert!(reader.next_batch().unwrap().is_none());
+            assert!(reader.next().is_none());

Review comment:
       👍 

##########
File path: rust/datafusion/src/datasource/memory.rs
##########
@@ -64,10 +65,7 @@ impl MemTable {
         for partition in 0..exec.output_partitioning().partition_count() {
             let it = exec.execute(partition).await?;
             let mut it = it.lock().unwrap();
-            let mut partition_batches = vec![];
-            while let Ok(Some(batch)) = it.next_batch() {
-                partition_batches.push(batch);
-            }
+            let partition_batches = it.into_iter().collect::<ArrowResult<Vec<_>>>()?;

Review comment:
       this is so much nicer. ❤️ 

##########
File path: rust/arrow/src/record_batch.rs
##########
@@ -217,15 +217,12 @@ impl Into<StructArray> for RecordBatch {
 }
 
 /// Trait for types that can read `RecordBatch`'s.
-pub trait RecordBatchReader {
+pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch>> {

Review comment:
       🎉 

##########
File path: rust/datafusion/tests/user_defined_plan.rs
##########
@@ -450,73 +449,69 @@ impl TopKReader {
             self.top_values.remove(&smallest_revenue);
         }
     }
-}
 
-impl RecordBatchReader for TopKReader {
-    fn schema(&self) -> SchemaRef {
-        self.input.lock().expect("locked input reader").schema()
+    // how we process a whole batch
+    fn accumulate_batch(&mut self, input_batch: &RecordBatch) -> Result<()> {

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org