You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/11/21 23:15:32 UTC
[arrow] branch master updated: ARROW-10672: [Rust] [DataFusion]
Made Limit be computed on the stream.
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 78e1214 ARROW-10672: [Rust] [DataFusion] Made Limit be computed on the stream.
78e1214 is described below
commit 78e1214510d264087cd1e7c93c112d1289be3a57
Author: Jorge C. Leitao <jo...@gmail.com>
AuthorDate: Sat Nov 21 18:15:01 2020 -0500
ARROW-10672: [Rust] [DataFusion] Made Limit be computed on the stream.
Currently, the `Limit` operator in DataFusion collects all batches `ExecutionPlan::execute` to memory and returns a `MemoryExec` that mimics a stream. This can cause an expensive execution to happen in `ExecutionPlan::execute`.
This makes the limit operator in DataFusion to postpone the collection of RecordBatches to the stream, thereby avoiding the potentially expensive execution to happen inside of the `Plan::execute` and instead happen on the stream, as other operators already do.
This also makes the `kernel::limit` to not return a `Result`, as there is no need for it (the operation is infalible).
Closes #8729 from jorgecarleitao/improve_limit
Authored-by: Jorge C. Leitao <jo...@gmail.com>
Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
rust/arrow/benches/arithmetic_kernels.rs | 2 +-
rust/arrow/src/compute/kernels/limit.rs | 17 +++--
rust/datafusion/src/physical_plan/limit.rs | 110 ++++++++++++++++-------------
3 files changed, 71 insertions(+), 58 deletions(-)
diff --git a/rust/arrow/benches/arithmetic_kernels.rs b/rust/arrow/benches/arithmetic_kernels.rs
index b3272d1..40d5bbf 100644
--- a/rust/arrow/benches/arithmetic_kernels.rs
+++ b/rust/arrow/benches/arithmetic_kernels.rs
@@ -69,7 +69,7 @@ fn bench_divide(arr_a: &ArrayRef, arr_b: &ArrayRef) {
}
fn bench_limit(arr_a: &ArrayRef, max: usize) {
- criterion::black_box(limit(arr_a, max).unwrap());
+ criterion::black_box(limit(arr_a, max));
}
fn add_benchmark(c: &mut Criterion) {
diff --git a/rust/arrow/src/compute/kernels/limit.rs b/rust/arrow/src/compute/kernels/limit.rs
index 65f66bc..911dbf2 100644
--- a/rust/arrow/src/compute/kernels/limit.rs
+++ b/rust/arrow/src/compute/kernels/limit.rs
@@ -18,7 +18,6 @@
//! Defines miscellaneous array kernels.
use crate::array::ArrayRef;
-use crate::error::Result;
/// Returns the array, taking only the number of elements specified
///
@@ -26,9 +25,9 @@ use crate::error::Result;
/// where:
/// * it performs a bounds-check on the array
/// * it slices from offset 0
-pub fn limit(array: &ArrayRef, num_elements: usize) -> Result<ArrayRef> {
+pub fn limit(array: &ArrayRef, num_elements: usize) -> ArrayRef {
let lim = num_elements.min(array.len());
- Ok(array.slice(0, lim))
+ array.slice(0, lim)
}
#[cfg(test)]
@@ -44,7 +43,7 @@ mod tests {
#[test]
fn test_limit_array() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![5, 6, 7, 8, 9]));
- let b = limit(&a, 3).unwrap();
+ let b = limit(&a, 3);
let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(3, c.len());
assert_eq!(5, c.value(0));
@@ -55,7 +54,7 @@ mod tests {
#[test]
fn test_limit_string_array() {
let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", " ", "world", "!"]));
- let b = limit(&a, 2).unwrap();
+ let b = limit(&a, 2);
let c = b.as_ref().as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(2, c.len());
assert_eq!("hello", c.value(0));
@@ -65,7 +64,7 @@ mod tests {
#[test]
fn test_limit_array_with_null() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(5)]));
- let b = limit(&a, 1).unwrap();
+ let b = limit(&a, 1);
let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(1, c.len());
assert_eq!(true, c.is_null(0));
@@ -75,7 +74,7 @@ mod tests {
fn test_limit_array_with_limit_too_large() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
let a_ref: ArrayRef = Arc::new(a);
- let b = limit(&a_ref, 6).unwrap();
+ let b = limit(&a_ref, 6);
let c = b.as_ref().as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(5, c.len());
@@ -120,7 +119,7 @@ mod tests {
.build();
let list_array: ArrayRef = Arc::new(ListArray::from(list_data));
- let limit_array = limit(&list_array, 6).unwrap();
+ let limit_array = limit(&list_array, 6);
assert_eq!(6, limit_array.len());
assert_eq!(0, limit_array.offset());
assert_eq!(3, limit_array.null_count());
@@ -172,7 +171,7 @@ mod tests {
let array: ArrayRef = Arc::new(struct_array);
- let sliced_array = limit(&array, 3).unwrap();
+ let sliced_array = limit(&array, 3);
let sliced_array = sliced_array.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(3, sliced_array.len());
assert_eq!(0, sliced_array.offset());
diff --git a/rust/datafusion/src/physical_plan/limit.rs b/rust/datafusion/src/physical_plan/limit.rs
index b685ca4..c6b32fb 100644
--- a/rust/datafusion/src/physical_plan/limit.rs
+++ b/rust/datafusion/src/physical_plan/limit.rs
@@ -18,18 +18,22 @@
//! Defines the LIMIT plan
use std::any::Any;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use futures::stream::Stream;
+use futures::stream::StreamExt;
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::memory::MemoryStream;
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};
use arrow::array::ArrayRef;
use arrow::compute::limit;
use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
-use futures::StreamExt;
-use super::SendableRecordBatchStream;
+use super::{RecordBatchStream, SendableRecordBatchStream};
use async_trait::async_trait;
@@ -111,12 +115,8 @@ impl ExecutionPlan for GlobalLimitExec {
));
}
- let mut it = self.input.execute(0).await?;
- Ok(Box::pin(MemoryStream::try_new(
- collect_with_limit(&mut it, self.limit).await?,
- self.input.schema(),
- None,
- )?))
+ let stream = self.input.execute(0).await?;
+ Ok(Box::pin(LimitStream::new(stream, self.limit)))
}
}
@@ -169,58 +169,72 @@ impl ExecutionPlan for LocalLimitExec {
}
async fn execute(&self, _: usize) -> Result<SendableRecordBatchStream> {
- let mut it = self.input.execute(0).await?;
- Ok(Box::pin(MemoryStream::try_new(
- collect_with_limit(&mut it, self.limit).await?,
- self.input.schema(),
- None,
- )?))
+ let stream = self.input.execute(0).await?;
+ Ok(Box::pin(LimitStream::new(stream, self.limit)))
}
}
/// Truncate a RecordBatch to maximum of n rows
-pub fn truncate_batch(batch: &RecordBatch, n: usize) -> Result<RecordBatch> {
- let limited_columns: Result<Vec<ArrayRef>> = (0..batch.num_columns())
- .map(|i| limit(batch.column(i), n).map_err(|error| DataFusionError::from(error)))
+pub fn truncate_batch(batch: &RecordBatch, n: usize) -> RecordBatch {
+ let limited_columns: Vec<ArrayRef> = (0..batch.num_columns())
+ .map(|i| limit(batch.column(i), n))
.collect();
- Ok(RecordBatch::try_new(
- batch.schema().clone(),
- limited_columns?,
- )?)
+ RecordBatch::try_new(batch.schema().clone(), limited_columns).unwrap()
}
-/// Create a vector of record batches from an iterator
-async fn collect_with_limit(
- reader: &mut SendableRecordBatchStream,
+/// A Limit stream limits the stream to up to `limit` rows.
+struct LimitStream {
limit: usize,
-) -> Result<Vec<RecordBatch>> {
- let mut count = 0;
- let mut results: Vec<RecordBatch> = vec![];
- loop {
- match reader.as_mut().next().await {
- Some(Ok(batch)) => {
- let capacity = limit - count;
- if batch.num_rows() <= capacity {
- count += batch.num_rows();
- results.push(batch);
- } else {
- let batch = truncate_batch(&batch, capacity)?;
- count += batch.num_rows();
- results.push(batch);
- }
- if count == limit {
- return Ok(results);
- }
- }
- None => {
- return Ok(results);
- }
- Some(Err(e)) => return Err(DataFusionError::from(e)),
+ input: SendableRecordBatchStream,
+ // the current count
+ current_len: usize,
+}
+
+impl LimitStream {
+ fn new(input: SendableRecordBatchStream, limit: usize) -> Self {
+ Self {
+ limit,
+ input,
+ current_len: 0,
+ }
+ }
+
+ fn stream_limit(&mut self, batch: RecordBatch) -> Option<RecordBatch> {
+ if self.current_len == self.limit {
+ return None;
+ } else if self.current_len + batch.num_rows() <= self.limit {
+ self.current_len += batch.num_rows();
+ return Some(batch);
+ } else {
+ let batch_rows = self.limit - self.current_len;
+ self.current_len = self.limit;
+ Some(truncate_batch(&batch, batch_rows))
}
}
}
+impl Stream for LimitStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ self.input.poll_next_unpin(cx).map(|x| match x {
+ Some(Ok(batch)) => Ok(self.stream_limit(batch)).transpose(),
+ other => other,
+ })
+ }
+}
+
+impl RecordBatchStream for LimitStream {
+ /// Get the schema
+ fn schema(&self) -> SchemaRef {
+ self.input.schema().clone()
+ }
+}
+
#[cfg(test)]
mod tests {