You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by xu...@apache.org on 2022/04/12 11:48:02 UTC

[arrow-datafusion] branch master updated: Remove tokio::spawn from WindowAggExec (#2201) (#2203)

This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new f3360d30b Remove tokio::spawn from WindowAggExec (#2201) (#2203)
f3360d30b is described below

commit f3360d30bf6346af8e966b5ba8b9b1aaf46f2a50
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue Apr 12 12:47:57 2022 +0100

    Remove tokio::spawn from WindowAggExec (#2201) (#2203)
---
 .../src/physical_plan/windows/window_agg_exec.rs   | 93 ++++++++--------------
 1 file changed, 35 insertions(+), 58 deletions(-)

diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 553b6f26b..8b545a12b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -19,7 +19,6 @@
 
 use crate::error::Result;
 use crate::execution::context::TaskContext;
-use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
@@ -36,8 +35,7 @@ use arrow::{
 };
 use async_trait::async_trait;
 use futures::stream::Stream;
-use futures::FutureExt;
-use pin_project_lite::pin_project;
+use futures::{ready, StreamExt};
 use std::any::Any;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -223,7 +221,7 @@ fn create_schema(
 
 /// Compute the window aggregate columns
 fn compute_window_aggregates(
-    window_expr: Vec<Arc<dyn WindowExpr>>,
+    window_expr: &[Arc<dyn WindowExpr>],
     batch: &RecordBatch,
 ) -> Result<Vec<ArrayRef>> {
     window_expr
@@ -232,16 +230,14 @@ fn compute_window_aggregates(
         .collect()
 }
 
-pin_project! {
-    /// stream for window aggregation plan
-    pub struct WindowAggStream {
-        schema: SchemaRef,
-        drop_helper: AbortOnDropSingle<()>,
-        #[pin]
-        output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
-        finished: bool,
-        baseline_metrics: BaselineMetrics,
-    }
+/// stream for window aggregation plan
+pub struct WindowAggStream {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    batches: Vec<RecordBatch>,
+    finished: bool,
+    window_expr: Vec<Arc<dyn WindowExpr>>,
+    baseline_metrics: BaselineMetrics,
 }
 
 impl WindowAggStream {
@@ -252,51 +248,33 @@ impl WindowAggStream {
         input: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
     ) -> Self {
-        let (tx, rx) = futures::channel::oneshot::channel();
-        let schema_clone = schema.clone();
-        let elapsed_compute = baseline_metrics.elapsed_compute().clone();
-        let join_handle = tokio::spawn(async move {
-            let schema = schema_clone.clone();
-            let result =
-                WindowAggStream::process(input, window_expr, schema, elapsed_compute)
-                    .await;
-
-            // failing here is OK, the receiver is gone and does not care about the result
-            tx.send(result).ok();
-        });
-
         Self {
             schema,
-            drop_helper: AbortOnDropSingle::new(join_handle),
-            output: rx,
+            input,
+            batches: vec![],
             finished: false,
+            window_expr,
             baseline_metrics,
         }
     }
 
-    async fn process(
-        input: SendableRecordBatchStream,
-        window_expr: Vec<Arc<dyn WindowExpr>>,
-        schema: SchemaRef,
-        elapsed_compute: crate::physical_plan::metrics::Time,
-    ) -> ArrowResult<RecordBatch> {
-        let input_schema = input.schema();
-        let batches = common::collect(input).await?;
-
+    fn compute_aggregates(&self) -> ArrowResult<RecordBatch> {
         // record compute time on drop
-        let _timer = elapsed_compute.timer();
+        let _timer = self.baseline_metrics.elapsed_compute().timer();
 
-        let batch = common::combine_batches(&batches, input_schema.clone())?;
+        let batch = common::combine_batches(&self.batches, self.input.schema())?;
         if let Some(batch) = batch {
             // calculate window cols
-            let mut columns = compute_window_aggregates(window_expr, &batch)?;
+            let mut columns = compute_window_aggregates(&self.window_expr, &batch)
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+
             // combine with the original cols
             // note the setup of window aggregates is that they newly calculated window
             // expressions are always prepended to the columns
             columns.extend_from_slice(batch.columns());
-            RecordBatch::try_new(schema, columns)
+            RecordBatch::try_new(self.schema.clone(), columns)
         } else {
-            Ok(RecordBatch::new_empty(schema))
+            Ok(RecordBatch::new_empty(self.schema.clone()))
         }
     }
 }
@@ -316,27 +294,26 @@ impl Stream for WindowAggStream {
 impl WindowAggStream {
     #[inline]
     fn poll_next_inner(
-        self: &mut Pin<&mut Self>,
+        &mut self,
         cx: &mut Context<'_>,
     ) -> Poll<Option<ArrowResult<RecordBatch>>> {
         if self.finished {
             return Poll::Ready(None);
         }
 
-        // is the output ready?
-        let output_poll = self.output.poll_unpin(cx);
-
-        match output_poll {
-            Poll::Ready(result) => {
-                self.finished = true;
-                // check for error in receiving channel and unwrap actual result
-                let result = match result {
-                    Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
-                    Ok(result) => Some(result),
-                };
-                Poll::Ready(result)
-            }
-            Poll::Pending => Poll::Pending,
+        loop {
+            let result = match ready!(self.input.poll_next_unpin(cx)) {
+                Some(Ok(batch)) => {
+                    self.batches.push(batch);
+                    continue;
+                }
+                Some(Err(e)) => Err(e),
+                None => self.compute_aggregates(),
+            };
+
+            self.finished = true;
+
+            return Poll::Ready(Some(result));
         }
     }
 }