You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/06/01 11:01:52 UTC

[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212982877


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.

Review Comment:
   I think this should short-circuit on error, I think it will currently drive execution to completion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org