You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "metesynnada (via GitHub)" <gi...@apache.org> on 2023/02/14 11:34:02 UTC

[GitHub] [arrow-datafusion] metesynnada opened a new issue, #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

metesynnada opened a new issue, #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278

   **Describe the bug**
   I am working with 
   
   ```sql
   === Physical plan ===
   CoalescePartitionsExec
     RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3)
       UnboundableExec: unbounded=true
   ```
   
   plan. Using the physical plan with **`CoalescePartitionsExec`** and **`RepartitionExec`** causes strange behavior when providing a stream with only one unique value.
   
   The strange behavior: 
   
   - The issue with the **`CoalescePartitionsExec`** and **`RepartitionExec`** physical plan is that when a stream with only one unique value is provided, no data is read from the **`RepartitionExec`** until the stream is exhausted. Calling **`wake_receivers()`** does not wake up the **`DistributionReceiver`** . This behavior is not observed without **`CoalescePartitionsExec`**.  I suspect that the problem is caused by the spawning of new threads inside **`CoalescePartitionsExec`** and the waker needs to be updated.
   https://rust-lang.github.io/async-book/02_execution/03_wakeups.html
   
   - Plans with blocking repartition:
       
       ```
       === Physical plan ===
       CoalescePartitionsExec
         ProjectionExec: expr=[a2@0 as a5]
           RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
             UnboundableExec: unbounded=false
       ```
       
       ```
       === Physical plan ===
       CoalescePartitionsExec
         RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
           UnboundableExec: unbounded=false
       ```
       
   - Plan without blocking (`plan.execute(2, task)`), and this can change according to hash value.) :
       
       ```
       === Physical plan ===
       RepartitionExec: partitioning=Hash([Column { name: "a2", index: 0 }], 3), input_partitions=1
         UnboundableExec: unbounded=false
       ```
       
   
   **To Reproduce**
   
   - Create a file `datafusion/core/tests/repartition_exec_blocks.rs`
   - Put this code
   
   ```rust
   use arrow::array::UInt32Array;
   use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use arrow::record_batch::RecordBatch;
   use arrow::util::pretty::print_batches;
   use datafusion::execution::context::TaskContext;
   use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
   use datafusion::physical_plan::projection::ProjectionExec;
   use datafusion::physical_plan::repartition::RepartitionExec;
   use datafusion::physical_plan::{
       displayable, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
       SendableRecordBatchStream,
   };
   use datafusion::prelude::{SessionConfig, SessionContext};
   use datafusion_common::from_slice::FromSlice;
   use datafusion_common::{Result, Statistics};
   use datafusion_physical_expr::expressions::{col, Column};
   use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
   use futures::{Stream, StreamExt};
   use std::any::Any;
   use std::pin::Pin;
   use std::sync::Arc;
   use std::task::{Context, Poll};
   
   /// A mock execution plan that simply returns the provided data source characteristic
   #[derive(Debug, Clone)]
   pub struct MyUnboundedExec {
       batch_produce: Option<usize>,
       schema: Arc<Schema>,
       /// Ref-counting helper to check if the plan and the produced stream are still in memory.
       refs: Arc<()>,
   }
   impl MyUnboundedExec {
       pub fn new(batch_produce: Option<usize>, schema: Schema) -> Self {
           Self {
               batch_produce,
               schema: Arc::new(schema),
               refs: Default::default(),
           }
       }
   }
   impl ExecutionPlan for MyUnboundedExec {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn schema(&self) -> SchemaRef {
           Arc::clone(&self.schema)
       }
   
       fn output_partitioning(&self) -> Partitioning {
           Partitioning::UnknownPartitioning(1)
       }
   
       fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
           None
       }
   
       fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
           vec![]
       }
   
       fn with_new_children(
           self: Arc<Self>,
           _: Vec<Arc<dyn ExecutionPlan>>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           Ok(self)
       }
   
       fn execute(
           &self,
           _partition: usize,
           _context: Arc<TaskContext>,
       ) -> Result<SendableRecordBatchStream> {
           Ok(Box::pin(UnboundedStream {
               batch_produce: self.batch_produce,
               count: 0,
               schema: Arc::clone(&self.schema),
               _refs: Arc::clone(&self.refs),
           }))
       }
   
       fn fmt_as(
           &self,
           t: DisplayFormatType,
           f: &mut std::fmt::Formatter,
       ) -> std::fmt::Result {
           match t {
               DisplayFormatType::Default => {
                   write!(
                       f,
                       "UnboundableExec: unbounded={}",
                       self.batch_produce.is_none(),
                   )
               }
           }
       }
   
       fn statistics(&self) -> Statistics {
           Statistics::default()
       }
   }
   
   #[derive(Debug)]
   pub struct UnboundedStream {
       batch_produce: Option<usize>,
       count: usize,
       /// Schema mocked by this stream.
       schema: SchemaRef,
   
       /// Ref-counting helper to check if the stream are still in memory.
       _refs: Arc<()>,
   }
   
   impl Stream for UnboundedStream {
       type Item = Result<RecordBatch>;
   
       fn poll_next(
           mut self: Pin<&mut Self>,
           _cx: &mut Context<'_>,
       ) -> Poll<Option<Self::Item>> {
           if let Some(val) = self.batch_produce {
               if val <= self.count {
                   println!("Stream Finished");
                   return Poll::Ready(None);
               }
           }
           let batch = RecordBatch::try_new(
               self.schema.clone(),
               vec![Arc::new(UInt32Array::from_slice([1]))],
           )?;
           self.count += 1;
           std::thread::sleep(std::time::Duration::from_millis(100));
           Poll::Ready(Some(Ok(batch)))
       }
   }
   
   impl RecordBatchStream for UnboundedStream {
       fn schema(&self) -> SchemaRef {
           Arc::clone(&self.schema)
       }
   }
   
   #[tokio::test(flavor = "multi_thread")]
   async fn unbounded_repartition_sa() -> Result<()> {
       let config = SessionConfig::new();
       let ctx = SessionContext::with_config(config);
       let task = ctx.task_ctx();
       let schema = Schema::new(vec![Field::new("a2", DataType::UInt32, false)]);
       let input = Arc::new(MyUnboundedExec::new(Some(20), schema.clone())); // If you put None, it will be a unbounded source.
       let on: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(Column::new("a2", 0))];
       let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?);
       let plan = Arc::new(ProjectionExec::try_new(
           vec![(col("a2", &left_schema)?, "a5".to_string())],
           plan.clone(),
       )?);
       let plan = Arc::new(CoalescePartitionsExec::new(plan.clone()));
       println!(
           "=== Physical plan ===\n{}\n",
           displayable(plan.as_ref()).indent()
       );
       let mut stream = plan.execute(0, task)?;
       while let Some(result) = stream.next().await {
           print_batches(&[result?.clone()])?;
       }
       Ok(())
   }
   ```
   
   **Expected behavior**
   
   ```rust
   //! There are `N` virtual MPSC (multi-producer, single consumer) channels with unbounded capacity. However, if all
   //! buffers/channels are non-empty, than a global gate will be closed preventing new data from being written (the
   //! sender futures will be [pending](Poll::Pending)) until at least one channel is empty (and not closed).
   ```
   
   - Since it does not block the senders, I would expect that the waker on the receiver should wake up after `wake_receivers()` call.
   
   **Additional context**
   Add any other context about the problem here.
   
   cc @crepererum @alamb @tustvold 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1432530997

   I will take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1432923225

   I think we could make this issue less likely by inserting a yield point into the distributor channels. Let me draft a PR...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1429993477

   I thought so. If you need assistance, I could provide it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1431963851

   >         std::thread::sleep(std::time::Duration::from_millis(100));
   
   Does it make any difference if this is `tokio::time::sleep(..).await?`?  (rather than blocking the task?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1432902611

   This BTW will an issue with ALL CPU-bound tasks in tokio/DataFusion. You MUST yield to tokio from time to time to keep the system stable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1432901387

   I think the issue is your unbounded stream: it never yields back to tokio and hence the tasks that receive the data will never be executed (even though they are scheduled), see <https://tokio.rs/blog/2020-04-preemption> and <https://docs.rs/tokio/latest/tokio/task/fn.yield_now.html>.
   
   I'm pretty sure the repartition channels are correct. I've added a bunch of printlns and they wake the receivers but the receiver never is executed by tokio.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1432613406

   > Does it make any difference if this is tokio::time::sleep(..).await?? (rather than blocking the task?)
   It does. I did not put the `tokio::time::sleep(..).await?` since pool_next is a sync method. However, when I remove thread sleep, it worked. What could be the reason for this behavior?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1432914444

   > What could be the reason for this behavior? It seems that only CoalescePartitionsExec is affected by thread sleep.
   
    Your test (implicitly) uses a single threaded tokio executor (so there is only a single thread). With only a single thread, as @crepererum  mentions you need to yield control back to the scheduler (which is what `.await` does under the covers).
   
   I think the need for yielding is true for any cooperative scheduler system, but using `await` definitely may make that too magical


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.
URL: https://github.com/apache/arrow-datafusion/issues/5278


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] crepererum commented on issue #5278: Strange Behaviour on RepartitionExec with CoalescePartitionsExec.

Posted by "crepererum (via GitHub)" <gi...@apache.org>.
crepererum commented on issue #5278:
URL: https://github.com/apache/arrow-datafusion/issues/5278#issuecomment-1429904075

   Having a quick look an the plan: the repartition will partition into a single partition (because you only have a single unique key) which is likely not the first partition. The stream for the first partition will only advance when `RepartitionExec` either gets an element that is hashed into it (never in your case) or when the input terminates. However I don't understand why this is an issue here, because `CoalescePartitionsExec` polls all streams/partitions in parallel. So there's indeed some bug here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org