You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by xu...@apache.org on 2022/04/12 11:48:02 UTC
[arrow-datafusion] branch master updated: Remove tokio::spawn from WindowAggExec (#2201) (#2203)
This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f3360d30b Remove tokio::spawn from WindowAggExec (#2201) (#2203)
f3360d30b is described below
commit f3360d30bf6346af8e966b5ba8b9b1aaf46f2a50
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Apr 12 12:47:57 2022 +0100
Remove tokio::spawn from WindowAggExec (#2201) (#2203)
---
.../src/physical_plan/windows/window_agg_exec.rs | 93 ++++++++--------------
1 file changed, 35 insertions(+), 58 deletions(-)
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 553b6f26b..8b545a12b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -19,7 +19,6 @@
use crate::error::Result;
use crate::execution::context::TaskContext;
-use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
@@ -36,8 +35,7 @@ use arrow::{
};
use async_trait::async_trait;
use futures::stream::Stream;
-use futures::FutureExt;
-use pin_project_lite::pin_project;
+use futures::{ready, StreamExt};
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
@@ -223,7 +221,7 @@ fn create_schema(
/// Compute the window aggregate columns
fn compute_window_aggregates(
- window_expr: Vec<Arc<dyn WindowExpr>>,
+ window_expr: &[Arc<dyn WindowExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
window_expr
@@ -232,16 +230,14 @@ fn compute_window_aggregates(
.collect()
}
-pin_project! {
- /// stream for window aggregation plan
- pub struct WindowAggStream {
- schema: SchemaRef,
- drop_helper: AbortOnDropSingle<()>,
- #[pin]
- output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
- finished: bool,
- baseline_metrics: BaselineMetrics,
- }
+/// stream for window aggregation plan
+pub struct WindowAggStream {
+ schema: SchemaRef,
+ input: SendableRecordBatchStream,
+ batches: Vec<RecordBatch>,
+ finished: bool,
+ window_expr: Vec<Arc<dyn WindowExpr>>,
+ baseline_metrics: BaselineMetrics,
}
impl WindowAggStream {
@@ -252,51 +248,33 @@ impl WindowAggStream {
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
) -> Self {
- let (tx, rx) = futures::channel::oneshot::channel();
- let schema_clone = schema.clone();
- let elapsed_compute = baseline_metrics.elapsed_compute().clone();
- let join_handle = tokio::spawn(async move {
- let schema = schema_clone.clone();
- let result =
- WindowAggStream::process(input, window_expr, schema, elapsed_compute)
- .await;
-
- // failing here is OK, the receiver is gone and does not care about the result
- tx.send(result).ok();
- });
-
Self {
schema,
- drop_helper: AbortOnDropSingle::new(join_handle),
- output: rx,
+ input,
+ batches: vec![],
finished: false,
+ window_expr,
baseline_metrics,
}
}
- async fn process(
- input: SendableRecordBatchStream,
- window_expr: Vec<Arc<dyn WindowExpr>>,
- schema: SchemaRef,
- elapsed_compute: crate::physical_plan::metrics::Time,
- ) -> ArrowResult<RecordBatch> {
- let input_schema = input.schema();
- let batches = common::collect(input).await?;
-
+ fn compute_aggregates(&self) -> ArrowResult<RecordBatch> {
// record compute time on drop
- let _timer = elapsed_compute.timer();
+ let _timer = self.baseline_metrics.elapsed_compute().timer();
- let batch = common::combine_batches(&batches, input_schema.clone())?;
+ let batch = common::combine_batches(&self.batches, self.input.schema())?;
if let Some(batch) = batch {
// calculate window cols
- let mut columns = compute_window_aggregates(window_expr, &batch)?;
+ let mut columns = compute_window_aggregates(&self.window_expr, &batch)
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+
// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
// expressions are always prepended to the columns
columns.extend_from_slice(batch.columns());
- RecordBatch::try_new(schema, columns)
+ RecordBatch::try_new(self.schema.clone(), columns)
} else {
- Ok(RecordBatch::new_empty(schema))
+ Ok(RecordBatch::new_empty(self.schema.clone()))
}
}
}
@@ -316,27 +294,26 @@ impl Stream for WindowAggStream {
impl WindowAggStream {
#[inline]
fn poll_next_inner(
- self: &mut Pin<&mut Self>,
+ &mut self,
cx: &mut Context<'_>,
) -> Poll<Option<ArrowResult<RecordBatch>>> {
if self.finished {
return Poll::Ready(None);
}
- // is the output ready?
- let output_poll = self.output.poll_unpin(cx);
-
- match output_poll {
- Poll::Ready(result) => {
- self.finished = true;
- // check for error in receiving channel and unwrap actual result
- let result = match result {
- Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
- Ok(result) => Some(result),
- };
- Poll::Ready(result)
- }
- Poll::Pending => Poll::Pending,
+ loop {
+ let result = match ready!(self.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ self.batches.push(batch);
+ continue;
+ }
+ Some(Err(e)) => Err(e),
+ None => self.compute_aggregates(),
+ };
+
+ self.finished = true;
+
+ return Poll::Ready(Some(result));
}
}
}