You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "crepererum (via GitHub)" <gi...@apache.org> on 2023/06/01 12:56:11 UTC

[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

crepererum commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1213112618


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(
+                                "Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        let inner = ReceiverStream::new(rx).chain(check_stream).boxed();

Review Comment:
   > With this implementation, the panics will be propagated only after all input
   
   This is my worry as well. I think you could move the check future into another task (that holds the join set and is also aborted on drop, like a two-level join set) and that sends the error to `tx`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org