You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/31 19:49:20 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request, #6507: Alamb/propagate error

alamb opened a new pull request, #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507

   # Which issue does this PR close?
   
   Based on https://github.com/apache/arrow-datafusion/pull/6449 from @nvartolomei
   
   Closes https://github.com/apache/arrow-datafusion/issues/3104
   
   # Rationale for this change
   
   I wanted to fix 
   
   # What changes are included in this PR?
   2. Add a `RecordBatchReceiverStreamBuilder` which handles doing the abort-on-drop dance using [tokio::task::JoinSet](https://docs.rs/tokio/latest/tokio/task/struct.JoinSet.html) as shown by @nvartolomei 
   3. Port `CoalsceExec` and `AnalyzeExec` to use this builder
   
   # Are these changes tested?
   Covered by existing tests
   
   
   # Are there any user-facing changes?
   Yes, panic's are not ignored


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#issuecomment-1579439184

   > great work @alamb! 👏
   
   
   
   Thank you @nvartolomei  for starting the process (and providing the tests!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1214265191


##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {

Review Comment:
   That is why I plan to rename it `PanicStream` to avoid such English peculiarities 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1214852934


##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {

Review Comment:
   in 3f80690f7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] nvartolomei commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "nvartolomei (via GitHub)" <gi...@apache.org>.
nvartolomei commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212964402


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(
+                                "Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        let inner = ReceiverStream::new(rx).chain(check_stream).boxed();

Review Comment:
   This change looks much better than mine. Wondering if it can be improved further and if it makes sense at all.
   
   With this implementation, the panics will be propagated only after all input (from other partitions is consumed). Probably fine, as this shouldn't happen during normal operation and is more of a correctness check. Also, the `check` future will not make any progress up until all the inputs are exhausted. Shouldn't be much work, fine for it to be sequential.
   
   As an alternative, what if we build a "supervisor" task (`tokio::spawn`) which is launched to do all that work, and then in the `check_stream` we just check the `JoinHandle` of the "supervisor" task? This way the supervisor task will be able to make progress concurrently and panic/report errors early.
   
   Thought about this after looking at `RepartitionStream` which would need something similar (supervisor) to get task failures and then multiplex them to all the output partitions. Then, all "output streams" would only have to ensure that the supervisor didn't die. Currently, in `RepartitionStream` there are `|output partitions|` "supervisors" (`wait_for_task`) which aren't checked for success either. Wondering if it could fail at all though (https://github.com/tokio-rs/tokio/discussions/5744).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb merged PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219958272


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,206 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
+/// that produce `RecordBatch`es and send them to a single
+/// `Receiver` which can improve parallelism.
+///
+/// This also handles propagating panic`s and canceling the tasks.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    ///
+    /// If the input partition produces an error, the error will be
+    /// sent to the output stream and no further results are sent.
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, the plan being torn down, there
+                    // is no place to send the error and no reason to continue.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            // Transfer batches from inner stream to the output tx
+            // immediately.
+            while let Some(item) = stream.next().await {
+                let is_err = item.is_err();
+
+                // If send fails, plan being torn down, there is no
+                // place to send the error and no reason to continue.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+
+                // stop after the first error is encontered (don't
+                // drive all streams to completion)
+                if is_err {
+                    debug!(
+                        "Stopping execution: plan returned error: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
+
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set, and propagates panic if seen
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            // This should only occur if the task is
+                            // cancelled, which would only occur if
+                            // the JoinSet were aborted, which in turn
+                            // would imply that the receiver has been
+                            // dropped and this code is not running
+                            return Some(Err(DataFusionError::Internal(format!(
+                                "Non Panic Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        // Merge the streams together so whichever is ready first
+        // produces the batch (since futures::stream:StreamExt is
+        // already in scope, need to call it explicitly)

Review Comment:
   (it also inspired me to do https://github.com/apache/arrow-datafusion/pull/6565)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#issuecomment-1573597064

   I plan to work on improving the panic checks to be more eager later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1213508199


##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {

Review Comment:
   I was trying to mirror the spelling of the rust `panic!` macro that doesn't have the `k` -- Maybe I will rename this just to `PanicExec` 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212986683


##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {
+    /// Schema that is mocked by this plan.
+    schema: SchemaRef,
+
+    /// Number of output partitions. Each partition will produce this
+    /// many empty output record batches prior to panicing
+    batches_until_panics: Vec<usize>,
+}
+
+impl PanicingExec {
+    /// Create new [`PanickingExec`] with a give schema and number of
+    /// partitions, which will each panic immediately.
+    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
+        Self {
+            schema,
+            batches_until_panics: vec![0; n_partitions],
+        }
+    }
+
+    /// Set the number of batches prior to panic for a partition
+    pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
+        self.batches_until_panics[partition] = count;
+        self
+    }
+}
+
+impl ExecutionPlan for PanicingExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        // this is a leaf node and has no children
+        vec![]
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        let num_partitions = self.batches_until_panics.len();
+        Partitioning::UnknownPartitioning(num_partitions)
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Err(DataFusionError::Internal(format!(
+            "Children cannot be replaced in {:?}",
+            self
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(PanicingStream {
+            partition,
+            batches_until_panic: self.batches_until_panics[partition],
+            schema: Arc::clone(&self.schema),
+            ready: false,
+        }))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "PanickingExec",)
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        unimplemented!()
+    }
+}
+
+/// A [`RecordBatchStream`] that yields every other batch and panics after `batches_until_panic` batches have been produced
+#[derive(Debug)]
+struct PanicingStream {

Review Comment:
   ```suggestion
   struct PanickingStream {
   ```



##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {

Review Comment:
   ```suggestion
   pub struct PanickingExec {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219701155


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,206 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
+/// that produce `RecordBatch`es and send them to a single
+/// `Receiver` which can improve parallelism.
+///
+/// This also handles propagating panic`s and canceling the tasks.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    ///
+    /// If the input partition produces an error, the error will be
+    /// sent to the output stream and no further results are sent.
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, the plan being torn down, there
+                    // is no place to send the error and no reason to continue.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            // Transfer batches from inner stream to the output tx
+            // immediately.
+            while let Some(item) = stream.next().await {
+                let is_err = item.is_err();
+
+                // If send fails, plan being torn down, there is no
+                // place to send the error and no reason to continue.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+
+                // stop after the first error is encontered (don't
+                // drive all streams to completion)
+                if is_err {
+                    debug!(
+                        "Stopping execution: plan returned error: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
+
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set, and propagates panic if seen
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            // This should only occur if the task is
+                            // cancelled, which would only occur if
+                            // the JoinSet were aborted, which in turn
+                            // would imply that the receiver has been
+                            // dropped and this code is not running
+                            return Some(Err(DataFusionError::Internal(format!(

Review Comment:
   If this is unreachable (which I'm fairly certain it is) I'm not sure why we don't just panic here, making this future infallible and therefore an ideal candidate for https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.take_until



##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,206 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
+/// that produce `RecordBatch`es and send them to a single
+/// `Receiver` which can improve parallelism.
+///
+/// This also handles propagating panic`s and canceling the tasks.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    ///
+    /// If the input partition produces an error, the error will be
+    /// sent to the output stream and no further results are sent.
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, the plan being torn down, there
+                    // is no place to send the error and no reason to continue.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            // Transfer batches from inner stream to the output tx
+            // immediately.
+            while let Some(item) = stream.next().await {
+                let is_err = item.is_err();
+
+                // If send fails, plan being torn down, there is no
+                // place to send the error and no reason to continue.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+
+                // stop after the first error is encontered (don't
+                // drive all streams to completion)
+                if is_err {
+                    debug!(
+                        "Stopping execution: plan returned error: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
+
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set, and propagates panic if seen
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            // This should only occur if the task is
+                            // cancelled, which would only occur if
+                            // the JoinSet were aborted, which in turn
+                            // would imply that the receiver has been
+                            // dropped and this code is not running
+                            return Some(Err(DataFusionError::Internal(format!(
+                                "Non Panic Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        // Merge the streams together so whichever is ready first
+        // produces the batch (since futures::stream:StreamExt is
+        // already in scope, need to call it explicitly)

Review Comment:
   FWIW https://docs.rs/futures/latest/futures/stream/fn.select.html is the futures crate version of this, not sure if there is a material difference between the two impls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Consolidate panic propagation into RecordBatchReceiverStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212356765


##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -137,27 +131,17 @@ impl ExecutionPlan for CoalescePartitionsExec {
                 // use a stream that allows each sender to put in at
                 // least one result in an attempt to maximize
                 // parallelism.
-                let (sender, receiver) =
-                    mpsc::channel::<Result<RecordBatch>>(input_partitions);
+                let mut builder =

Review Comment:
   I am quite pleased that this is now all encapsulated into `RecordBatchReceiverStream`



##########
datafusion/core/src/physical_plan/union.rs:
##########
@@ -560,40 +561,6 @@ impl Stream for CombinedRecordBatchStream {
     }
 }
 
-/// Stream wrapper that records `BaselineMetrics` for a particular

Review Comment:
   moved 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#issuecomment-1578757979

   I believe I have resolved all outstanding comments in this PR. Please take another look if you have time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1214857925


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(

Review Comment:
   I agree I don't think it is possible -- I will update to be an Internal Error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212982877


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.

Review Comment:
   I think this should short-circuit on error, I think it will currently drive execution to completion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1214853131


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(
+                                "Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        let inner = ReceiverStream::new(rx).chain(check_stream).boxed();

Review Comment:
   I am working on tests for this behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212991034


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(

Review Comment:
   Is task cancellation actually an error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219634874


##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {
+    /// Schema that is mocked by this plan.
+    schema: SchemaRef,
+
+    /// Number of output partitions. Each partition will produce this
+    /// many empty output record batches prior to panicing
+    batches_until_panics: Vec<usize>,
+}
+
+impl PanicingExec {
+    /// Create new [`PanickingExec`] with a give schema and number of
+    /// partitions, which will each panic immediately.
+    pub fn new(schema: SchemaRef, n_partitions: usize) -> Self {
+        Self {
+            schema,
+            batches_until_panics: vec![0; n_partitions],
+        }
+    }
+
+    /// Set the number of batches prior to panic for a partition
+    pub fn with_partition_panic(mut self, partition: usize, count: usize) -> Self {
+        self.batches_until_panics[partition] = count;
+        self
+    }
+}
+
+impl ExecutionPlan for PanicingExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        // this is a leaf node and has no children
+        vec![]
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        let num_partitions = self.batches_until_panics.len();
+        Partitioning::UnknownPartitioning(num_partitions)
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Err(DataFusionError::Internal(format!(
+            "Children cannot be replaced in {:?}",
+            self
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(PanicingStream {
+            partition,
+            batches_until_panic: self.batches_until_panics[partition],
+            schema: Arc::clone(&self.schema),
+            ready: false,
+        }))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "PanickingExec",)
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        unimplemented!()
+    }
+}
+
+/// A [`RecordBatchStream`] that yields every other batch and panics after `batches_until_panic` batches have been produced
+#[derive(Debug)]
+struct PanicingStream {

Review Comment:
   Changed to `PanicStream`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212989981


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(
+                                "Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        let inner = ReceiverStream::new(rx).chain(check_stream).boxed();

Review Comment:
   https://docs.rs/futures/latest/futures/prelude/stream/trait.StreamExt.html#method.take_while is possibly a better way to formulate this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Consolidate panic propagation into RecordBatchReceiverStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212355846


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -126,3 +256,97 @@ where
         self.schema.clone()
     }
 }
+
+/// Stream wrapper that records `BaselineMetrics` for a particular
+/// partition
+pub(crate) struct ObservedStream {

Review Comment:
   Moved from Union so it can be reused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212982877


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.

Review Comment:
   I think this should short-circuit if item is an error, I think it will currently drive execution to completion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212993937


##########
datafusion/core/src/physical_plan/analyze.rs:
##########
@@ -146,18 +137,12 @@ impl ExecutionPlan for AnalyzeExec {
         // future that gathers the results from all the tasks in the
         // JoinSet that computes the overall row count and final
         // record batch
+        let mut input_stream = builder.build();
         let output = async move {
             let mut total_rows = 0;
-            while let Some(res) = set.join_next().await {
-                // translate join errors (aka task panic's) into ExecutionErrors
-                match res {
-                    Ok(row_count) => total_rows += row_count?,
-                    Err(e) => {
-                        return Err(DataFusionError::Execution(format!(
-                            "Join error in AnalyzeExec: {e}"
-                        )))
-                    }
-                }
+            while let Some(batch) = input_stream.next().await {
+                let batch = batch?;

Review Comment:
   ```suggestion
               while let Some(batch) = input_stream.next().await.transpose()? {
   ```
   FWIW, this pattern comes up a lot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219640551


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.

Review Comment:
   Fixed in https://github.com/apache/arrow-datafusion/pull/6507/commits/56a26eb6b4036db57bebba2a1e237a1f6beba544
   
   Tested in `record_batch_receiver_stream_error_does_not_drive_completion`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219773260


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,206 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
+/// that produce `RecordBatch`es and send them to a single
+/// `Receiver` which can improve parallelism.
+///
+/// This also handles propagating panic`s and canceling the tasks.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    ///
+    /// If the input partition produces an error, the error will be
+    /// sent to the output stream and no further results are sent.
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, the plan being torn down, there
+                    // is no place to send the error and no reason to continue.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            // Transfer batches from inner stream to the output tx
+            // immediately.
+            while let Some(item) = stream.next().await {
+                let is_err = item.is_err();
+
+                // If send fails, plan being torn down, there is no
+                // place to send the error and no reason to continue.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+
+                // stop after the first error is encontered (don't
+                // drive all streams to completion)
+                if is_err {
+                    debug!(
+                        "Stopping execution: plan returned error: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
+
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set, and propagates panic if seen
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            // This should only occur if the task is
+                            // cancelled, which would only occur if
+                            // the JoinSet were aborted, which in turn
+                            // would imply that the receiver has been
+                            // dropped and this code is not running
+                            return Some(Err(DataFusionError::Internal(format!(
+                                "Non Panic Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        // Merge the streams together so whichever is ready first
+        // produces the batch (since futures::stream:StreamExt is
+        // already in scope, need to call it explicitly)

Review Comment:
   Changed in fb17af868 -- I didn't see `select`. TIL!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] nvartolomei commented on pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "nvartolomei (via GitHub)" <gi...@apache.org>.
nvartolomei commented on PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#issuecomment-1579375349

   great work @alamb 👏 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212991034


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(

Review Comment:
   Is this branch even reachable, I think it can only occur if the task is cancelled, which would only occur if the JoinSet were aborted, which in turn would imply that the receiver has been dropped



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1213517967


##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {

Review Comment:
   Panic is the noun, panicking the present participle - https://dictionary.cambridge.org/dictionary/english/panic. The rust spelling is correct, panicing is not 😅



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219638661


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(
+                                "Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        let inner = ReceiverStream::new(rx).chain(check_stream).boxed();

Review Comment:
   I believe I fixed this in  https://github.com/apache/arrow-datafusion/pull/6507/commits/b1a817ce8e6c4eb210b40030f345ffb246184ac3
   
   After trying several other approaches, I found  https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.merge which did exactly what I wanted 💯 
   
   It is tested in `record_batch_receiver_stream_propagates_panics_early_shutdown`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1219727470


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,206 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] is used to spawn one or more tasks
+/// that produce `RecordBatch`es and send them to a single
+/// `Receiver` which can improve parallelism.
+///
+/// This also handles propagating panic`s and canceling the tasks.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    ///
+    /// If the input partition produces an error, the error will be
+    /// sent to the output stream and no further results are sent.
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, the plan being torn down, there
+                    // is no place to send the error and no reason to continue.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            // Transfer batches from inner stream to the output tx
+            // immediately.
+            while let Some(item) = stream.next().await {
+                let is_err = item.is_err();
+
+                // If send fails, plan being torn down, there is no
+                // place to send the error and no reason to continue.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+
+                // stop after the first error is encontered (don't
+                // drive all streams to completion)
+                if is_err {
+                    debug!(
+                        "Stopping execution: plan returned error: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
+
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set, and propagates panic if seen
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            // This should only occur if the task is
+                            // cancelled, which would only occur if
+                            // the JoinSet were aborted, which in turn
+                            // would imply that the receiver has been
+                            // dropped and this code is not running
+                            return Some(Err(DataFusionError::Internal(format!(

Review Comment:
   I haven't studied the tokio JoinHandle code or under what conditions it currently or in the future might return an error (like if the task is canceled in some way will it error??) . 
   
   Given that the API returns an error I think handling and propagating the error is the most future proof thing to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212991034


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(

Review Comment:
   Is this branch even reachable, I think it can only occur if the task is cancelled, which would only occur if the JoinSet were aborted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1213112618


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.
+                if output.send(item).await.is_err() {
+                    debug!(
+                        "Stopping execution: output is gone, plan cancelling: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+            }
+        });
+    }
 
-    #[allow(dead_code)]
-    drop_helper: AbortOnDropSingle<()>,
+    /// Create a stream of all `RecordBatch`es written to `tx`
+    pub fn build(self) -> SendableRecordBatchStream {
+        let Self {
+            tx,
+            rx,
+            schema,
+            mut join_set,
+        } = self;
+
+        // don't need tx
+        drop(tx);
+
+        // future that checks the result of the join set
+        let check = async move {
+            while let Some(result) = join_set.join_next().await {
+                match result {
+                    Ok(()) => continue, // nothing to report
+                    // This means a tokio task error, likely a panic
+                    Err(e) => {
+                        if e.is_panic() {
+                            // resume on the main thread
+                            std::panic::resume_unwind(e.into_panic());
+                        } else {
+                            return Some(Err(DataFusionError::Execution(format!(
+                                "Task error: {e}"
+                            ))));
+                        }
+                    }
+                }
+            }
+            None
+        };
+
+        let check_stream = futures::stream::once(check)
+            // unwrap Option / only return the error
+            .filter_map(|item| async move { item });
+
+        let inner = ReceiverStream::new(rx).chain(check_stream).boxed();

Review Comment:
   > With this implementation, the panics will be propagated only after all input
   
   This is my worry as well. I think you could move the check future into another task (that holds the join set and is also aborted on drop, like a two-level join set) and that sends the error to `tx`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212982877


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -17,43 +17,178 @@
 
 //! Stream wrappers for physical operators
 
+use std::sync::Arc;
+
 use crate::error::Result;
+use crate::physical_plan::displayable;
 use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use futures::{Stream, StreamExt};
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use futures::stream::BoxStream;
+use futures::{Future, Stream, StreamExt};
+use log::debug;
 use pin_project_lite::pin_project;
-use tokio::task::JoinHandle;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinSet;
 use tokio_stream::wrappers::ReceiverStream;
 
-use super::common::AbortOnDropSingle;
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::metrics::BaselineMetrics;
+use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
 
-/// Adapter for a tokio [`ReceiverStream`] that implements the
-/// [`SendableRecordBatchStream`]
-/// interface
-pub struct RecordBatchReceiverStream {
+/// Builder for [`RecordBatchReceiverStream`] that propagates errors
+/// and panic's correctly.
+///
+/// [`RecordBatchReceiverStream`] can be used when there are one or
+/// more tasks spawned which produce RecordBatches and send them to a
+/// single `Receiver`.
+pub struct RecordBatchReceiverStreamBuilder {
+    tx: Sender<Result<RecordBatch>>,
+    rx: Receiver<Result<RecordBatch>>,
     schema: SchemaRef,
+    join_set: JoinSet<()>,
+}
+
+impl RecordBatchReceiverStreamBuilder {
+    /// create new channels with the specified buffer size
+    pub fn new(schema: SchemaRef, capacity: usize) -> Self {
+        let (tx, rx) = tokio::sync::mpsc::channel(capacity);
 
-    inner: ReceiverStream<Result<RecordBatch>>,
+        Self {
+            tx,
+            rx,
+            schema,
+            join_set: JoinSet::new(),
+        }
+    }
+
+    /// Get a handle for sending [`RecordBatch`]es to the output
+    pub fn tx(&self) -> Sender<Result<RecordBatch>> {
+        self.tx.clone()
+    }
+
+    /// Spawn task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn<F>(&mut self, task: F)
+    where
+        F: Future<Output = ()>,
+        F: Send + 'static,
+    {
+        self.join_set.spawn(task);
+    }
+
+    /// Spawn a blocking task that will be aborted if this builder (or the stream
+    /// built from it) are dropped
+    ///
+    /// this is often used to spawn tasks that write to the sender
+    /// retrieved from `Self::tx`
+    pub fn spawn_blocking<F>(&mut self, f: F)
+    where
+        F: FnOnce(),
+        F: Send + 'static,
+    {
+        self.join_set.spawn_blocking(f);
+    }
+
+    /// runs the input_partition of the `input` ExecutionPlan on the
+    /// tokio threadpool and writes its outputs to this stream
+    pub(crate) fn run_input(
+        &mut self,
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) {
+        let output = self.tx();
+
+        self.spawn(async move {
+            let mut stream = match input.execute(partition, context) {
+                Err(e) => {
+                    // If send fails, plan being torn down,
+                    // there is no place to send the error.
+                    output.send(Err(e)).await.ok();
+                    debug!(
+                        "Stopping execution: error executing input: {}",
+                        displayable(input.as_ref()).one_line()
+                    );
+                    return;
+                }
+                Ok(stream) => stream,
+            };
+
+            while let Some(item) = stream.next().await {
+                // If send fails, plan being torn down,
+                // there is no place to send the error.

Review Comment:
   I think this should also short-circuit if item is an error, I think it will currently drive execution to completion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Consolidate panic propagation into RecordBatchReceiverStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212356119


##########
datafusion/core/src/physical_plan/stream.rs:
##########
@@ -126,3 +256,97 @@ where
         self.schema.clone()
     }
 }
+
+/// Stream wrapper that records `BaselineMetrics` for a particular
+/// partition
+pub(crate) struct ObservedStream {
+    inner: SendableRecordBatchStream,
+    baseline_metrics: BaselineMetrics,
+}
+
+impl ObservedStream {
+    pub fn new(
+        inner: SendableRecordBatchStream,
+        baseline_metrics: BaselineMetrics,
+    ) -> Self {
+        Self {
+            inner,
+            baseline_metrics,
+        }
+    }
+}
+
+impl RecordBatchStream for ObservedStream {
+    fn schema(&self) -> arrow::datatypes::SchemaRef {
+        self.inner.schema()
+    }
+}
+
+impl futures::Stream for ObservedStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let poll = self.inner.poll_next_unpin(cx);
+        self.baseline_metrics.record_poll(poll)
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use arrow_schema::{DataType, Field, Schema};
+
+    use crate::{execution::context::SessionContext, test::exec::PanicingExec};
+
+    #[tokio::test]
+    #[should_panic(expected = "PanickingStream did panic")]
+    async fn record_batch_receiver_stream_propagates_panics() {
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
+
+        let num_partitions = 10;
+        let input = PanicingExec::new(schema.clone(), num_partitions);
+        consume(input).await
+    }
+
+    #[tokio::test]
+    #[should_panic(expected = "PanickingStream did panic: 1")]
+    async fn record_batch_receiver_stream_propagates_panics_one() {
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
+
+        // make 2 partitions, second panics before the first
+        let num_partitions = 2;
+        let input = PanicingExec::new(schema.clone(), num_partitions)
+            .with_partition_panic(0, 10)
+            .with_partition_panic(1, 3); // partition 1 should panic first (after 3 )

Review Comment:
   Here is a test showing that when the second partition panic's it is properly reported



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6507: Consolidate panic propagation into RecordBatchReceiverStream

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1212238341


##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -270,4 +231,19 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]

Review Comment:
   This is the new test from @nvartolomei 



##########
datafusion/core/src/physical_plan/coalesce_partitions.rs:
##########
@@ -183,32 +168,6 @@ impl ExecutionPlan for CoalescePartitionsExec {
     }
 }
 
-struct MergeStream {

Review Comment:
   I basically taught `RecordBatchReceiverStream` how to propagate panics and then updated `CoalescePartitionsExec` to use it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6507: Fix panic propagation in `CoalescePartitions`, consolidates panic propagation into `RecordBatchReceiverStream`

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6507:
URL: https://github.com/apache/arrow-datafusion/pull/6507#discussion_r1213517967


##########
datafusion/core/src/test/exec.rs:
##########
@@ -643,3 +637,139 @@ pub async fn assert_strong_count_converges_to_zero<T>(refs: Weak<T>) {
     .await
     .unwrap();
 }
+
+/// Execution plan that emits streams that panics.
+///
+/// This is useful to test panic handling of certain execution plans.
+#[derive(Debug)]
+pub struct PanicingExec {

Review Comment:
   Panic is the noun, panicking the present participle - https://dictionary.cambridge.org/dictionary/english/panic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org