You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/11/21 23:15:32 UTC

[arrow] branch master updated: ARROW-10672: [Rust] [DataFusion] Made Limit be computed on the stream.

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e1214  ARROW-10672: [Rust] [DataFusion] Made Limit be computed on the stream.
78e1214 is described below

commit 78e1214510d264087cd1e7c93c112d1289be3a57
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sat Nov 21 18:15:01 2020 -0500

    ARROW-10672: [Rust] [DataFusion] Made Limit be computed on the stream.
    
    Currently, the `Limit` operator in DataFusion collects all batches `ExecutionPlan::execute` to memory and returns a `MemoryExec` that mimics a stream. This can cause an expensive execution to happen in `ExecutionPlan::execute`.
    
    This makes the limit operator in DataFusion to postpone the collection of RecordBatches to the stream, thereby avoiding the potentially expensive execution to happen inside of the `Plan::execute` and instead happen on the stream, as other operators already do.
    
    This also makes the `kernel::limit` to not return a `Result`, as there is no need for it (the operation is infalible).
    
    Closes #8729 from jorgecarleitao/improve_limit
    
    Authored-by: Jorge C. Leitao <jo...@gmail.com>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/arrow/benches/arithmetic_kernels.rs   |   2 +-
 rust/arrow/src/compute/kernels/limit.rs    |  17 +++--
 rust/datafusion/src/physical_plan/limit.rs | 110 ++++++++++++++++-------------
 3 files changed, 71 insertions(+), 58 deletions(-)

diff --git a/rust/arrow/benches/arithmetic_kernels.rs b/rust/arrow/benches/arithmetic_kernels.rs
index b3272d1..40d5bbf 100644
--- a/rust/arrow/benches/arithmetic_kernels.rs
+++ b/rust/arrow/benches/arithmetic_kernels.rs
@@ -69,7 +69,7 @@ fn bench_divide(arr_a: &ArrayRef, arr_b: &ArrayRef) {
 }
 
 fn bench_limit(arr_a: &ArrayRef, max: usize) {
-    criterion::black_box(limit(arr_a, max).unwrap());
+    criterion::black_box(limit(arr_a, max));
 }
 
 fn add_benchmark(c: &mut Criterion) {
diff --git a/rust/arrow/src/compute/kernels/limit.rs b/rust/arrow/src/compute/kernels/limit.rs
index 65f66bc..911dbf2 100644
--- a/rust/arrow/src/compute/kernels/limit.rs
+++ b/rust/arrow/src/compute/kernels/limit.rs
@@ -18,7 +18,6 @@
 //! Defines miscellaneous array kernels.
 
 use crate::array::ArrayRef;
-use crate::error::Result;
 
 /// Returns the array, taking only the number of elements specified
 ///
@@ -26,9 +25,9 @@ use crate::error::Result;
 /// where:
 /// * it performs a bounds-check on the array
 /// * it slices from offset 0
-pub fn limit(array: &ArrayRef, num_elements: usize) -> Result<ArrayRef> {
+pub fn limit(array: &ArrayRef, num_elements: usize) -> ArrayRef {
     let lim = num_elements.min(array.len());
-    Ok(array.slice(0, lim))
+    array.slice(0, lim)
 }
 
 #[cfg(test)]
@@ -44,7 +43,7 @@ mod tests {
     #[test]
     fn test_limit_array() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
-        let b = limit(&a, 3).unwrap();
+        let b = limit(&a, 3);
         let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
         assert_eq!(3, c.len());
         assert_eq!(5, c.value(0));
@@ -55,7 +54,7 @@ mod tests {
     #[test]
     fn test_limit_string_array() {
         let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", " ", "world", "!"]));
-        let b = limit(&a, 2).unwrap();
+        let b = limit(&a, 2);
         let c = b.as_ref().as_any().downcast_ref::<StringArray>().unwrap();
         assert_eq!(2, c.len());
         assert_eq!("hello", c.value(0));
@@ -65,7 +64,7 @@ mod tests {
     #[test]
     fn test_limit_array_with_null() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(5)]));
-        let b = limit(&a, 1).unwrap();
+        let b = limit(&a, 1);
         let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
         assert_eq!(1, c.len());
         assert_eq!(true, c.is_null(0));
@@ -75,7 +74,7 @@ mod tests {
     fn test_limit_array_with_limit_too_large() {
         let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
         let a_ref: ArrayRef = Arc::new(a);
-        let b = limit(&a_ref, 6).unwrap();
+        let b = limit(&a_ref, 6);
         let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
 
         assert_eq!(5, c.len());
@@ -120,7 +119,7 @@ mod tests {
             .build();
         let list_array: ArrayRef = Arc::new(ListArray::from(list_data));
 
-        let limit_array = limit(&list_array, 6).unwrap();
+        let limit_array = limit(&list_array, 6);
         assert_eq!(6, limit_array.len());
         assert_eq!(0, limit_array.offset());
         assert_eq!(3, limit_array.null_count());
@@ -172,7 +171,7 @@ mod tests {
 
         let array: ArrayRef = Arc::new(struct_array);
 
-        let sliced_array = limit(&array, 3).unwrap();
+        let sliced_array = limit(&array, 3);
         let sliced_array = sliced_array.as_any().downcast_ref::<StructArray>().unwrap();
         assert_eq!(3, sliced_array.len());
         assert_eq!(0, sliced_array.offset());
diff --git a/rust/datafusion/src/physical_plan/limit.rs b/rust/datafusion/src/physical_plan/limit.rs
index b685ca4..c6b32fb 100644
--- a/rust/datafusion/src/physical_plan/limit.rs
+++ b/rust/datafusion/src/physical_plan/limit.rs
@@ -18,18 +18,22 @@
 //! Defines the LIMIT plan
 
 use std::any::Any;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use futures::stream::Stream;
+use futures::stream::StreamExt;
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::memory::MemoryStream;
 use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
 use arrow::array::ArrayRef;
 use arrow::compute::limit;
 use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
-use futures::StreamExt;
 
-use super::SendableRecordBatchStream;
+use super::{RecordBatchStream, SendableRecordBatchStream};
 
 use async_trait::async_trait;
 
@@ -111,12 +115,8 @@ impl ExecutionPlan for GlobalLimitExec {
             ));
         }
 
-        let mut it = self.input.execute(0).await?;
-        Ok(Box::pin(MemoryStream::try_new(
-            collect_with_limit(&mut it, self.limit).await?,
-            self.input.schema(),
-            None,
-        )?))
+        let stream = self.input.execute(0).await?;
+        Ok(Box::pin(LimitStream::new(stream, self.limit)))
     }
 }
 
@@ -169,58 +169,72 @@ impl ExecutionPlan for LocalLimitExec {
     }
 
     async fn execute(&self, _: usize) -> Result<SendableRecordBatchStream> {
-        let mut it = self.input.execute(0).await?;
-        Ok(Box::pin(MemoryStream::try_new(
-            collect_with_limit(&mut it, self.limit).await?,
-            self.input.schema(),
-            None,
-        )?))
+        let stream = self.input.execute(0).await?;
+        Ok(Box::pin(LimitStream::new(stream, self.limit)))
     }
 }
 
 /// Truncate a RecordBatch to maximum of n rows
-pub fn truncate_batch(batch: &RecordBatch, n: usize) -> Result<RecordBatch> {
-    let limited_columns: Result<Vec<ArrayRef>> = (0..batch.num_columns())
-        .map(|i| limit(batch.column(i), n).map_err(|error| DataFusionError::from(error)))
+pub fn truncate_batch(batch: &RecordBatch, n: usize) -> RecordBatch {
+    let limited_columns: Vec<ArrayRef> = (0..batch.num_columns())
+        .map(|i| limit(batch.column(i), n))
         .collect();
 
-    Ok(RecordBatch::try_new(
-        batch.schema().clone(),
-        limited_columns?,
-    )?)
+    RecordBatch::try_new(batch.schema().clone(), limited_columns).unwrap()
 }
 
-/// Create a vector of record batches from an iterator
-async fn collect_with_limit(
-    reader: &mut SendableRecordBatchStream,
+/// A Limit stream limits the stream to up to `limit` rows.
+struct LimitStream {
     limit: usize,
-) -> Result<Vec<RecordBatch>> {
-    let mut count = 0;
-    let mut results: Vec<RecordBatch> = vec![];
-    loop {
-        match reader.as_mut().next().await {
-            Some(Ok(batch)) => {
-                let capacity = limit - count;
-                if batch.num_rows() <= capacity {
-                    count += batch.num_rows();
-                    results.push(batch);
-                } else {
-                    let batch = truncate_batch(&batch, capacity)?;
-                    count += batch.num_rows();
-                    results.push(batch);
-                }
-                if count == limit {
-                    return Ok(results);
-                }
-            }
-            None => {
-                return Ok(results);
-            }
-            Some(Err(e)) => return Err(DataFusionError::from(e)),
+    input: SendableRecordBatchStream,
+    // the current count
+    current_len: usize,
+}
+
+impl LimitStream {
+    fn new(input: SendableRecordBatchStream, limit: usize) -> Self {
+        Self {
+            limit,
+            input,
+            current_len: 0,
+        }
+    }
+
+    fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
+        if self.current_len == self.limit {
+            return None;
+        } else if self.current_len + batch.num_rows() <= self.limit {
+            self.current_len += batch.num_rows();
+            return Some(batch);
+        } else {
+            let batch_rows = self.limit - self.current_len;
+            self.current_len = self.limit;
+            Some(truncate_batch(&batch, batch_rows))
         }
     }
 }
 
+impl Stream for LimitStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.input.poll_next_unpin(cx).map(|x| match x {
+            Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
+            other => other,
+        })
+    }
+}
+
+impl RecordBatchStream for LimitStream {
+    /// Get the schema
+    fn schema(&self) -> SchemaRef {
+        self.input.schema().clone()
+    }
+}
+
 #[cfg(test)]
 mod tests {