You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/28 18:52:14 UTC

[GitHub] [arrow-datafusion] alamb opened a new issue #437: RepartitionExec produces no output if the input stream does not yield results immediately

alamb opened a new issue #437:
URL: https://github.com/apache/arrow-datafusion/issues/437


   **Describe the bug**
   Given a `RepartitionExec` where its input stream does not have a record batch for immediate consumption, it will terminate early. 
   
   **To Reproduce**
   Set up a plan like this:
   
   ```
   ┌───────────────────┐        ┌───────────────────────┐
   │                   │        │                       │
   │    InputStream    │───────▶│   RepartitionStream   │
   │                   │        │                       │
   └───────────────────┘        └───────────────────────┘
   ```
   
   Where the input stream won't produce the record batch immediately. Full reproducer below. 
   
   I expect to the repartition stream to produce the same record batch as the input stream (will) provide. However, I get nothing!
   ```
   expected:
   
   [
       "+------------------+",
       "| my_awesome_field |",
       "+------------------+",
       "| foo              |",
       "| bar              |",
       "+------------------+",
   ]
   actual:
   
   [
       "++",
       "++",
   ]
   ```
   
   
   **Full Reproducer (run in repartition.rs)**
   Add any other context about the problem here.
   
   ```rust
   
       #[tokio::test]
       async fn repartition_with_delayed_stream() {
           let input = DelayedExec::new();
           let partitioning = input.output_partitioning();
           let expected_batches = vec![input.batch.clone()];
           let exec = RepartitionExec::try_new(Arc::new(input), partitioning).unwrap();
   
           let expected = vec![
               "+------------------+",
               "| my_awesome_field |",
               "+------------------+",
               "| foo              |",
               "| bar              |",
               "+------------------+",
           ];
   
           assert_batches_eq!(&expected, &expected_batches);
   
   
           let output_stream = exec.execute(0).await.unwrap();
           let batches = crate::physical_plan::common::collect(output_stream).await.unwrap();
   
           assert_batches_eq!(&expected, &batches);
   
       }
   
       #[derive(Debug)]
       struct DelayedExec {
           batch: RecordBatch
       }
   
       impl DelayedExec {
           fn new() -> Self {
               let batch =  RecordBatch::try_from_iter(vec![
                   ("my_awesome_field", Arc::new(StringArray::from(vec!["foo", "bar"])) as ArrayRef)
               ]).unwrap();
   
               Self {
                   batch
               }
           }
       }
   
   
       #[async_trait]
       impl ExecutionPlan for DelayedExec {
           fn as_any(&self) -> &dyn Any {
               self
           }
   
           fn schema(&self) -> SchemaRef {
               self.batch.schema()
           }
   
           fn output_partitioning(&self) -> Partitioning {
               Partitioning::UnknownPartitioning(1)
           }
   
           fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
               unimplemented!()
           }
   
           fn with_new_children(
               &self,
               _children: Vec<Arc<dyn ExecutionPlan>>,
           ) -> Result<Arc<dyn ExecutionPlan>> {
               unimplemented!()
           }
   
           /// Returns a stream which does not have data immediately, but
           /// needs to yield (to allow another task to run) to get its
           /// input.
           async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
               assert_eq!(partition, 0);
   
               let batch = self.batch.clone();
               let schema = batch.schema();
   
               let (tx, rx) = tokio::sync::mpsc::channel(2);
   
               // task simply sends the batch
               tokio::task::spawn(async move {
                   println!("Sending batch via delayed stream");
                   if let Err(e) = tx.send(Ok(batch.clone())).await {
                       println!("ERROR batch via delayed stream: {}", e);
                   }
               });
   
               // returned stream simply reads off the rx stream
               let stream = ParquetStream {
                   schema,
                   inner: ReceiverStream::new(rx),
               };
               Ok(Box::pin(stream))
           }
       }
   
   
       #[derive(Debug)]
       pub struct ParquetStream {
           schema: SchemaRef,
           inner: ReceiverStream<ArrowResult<RecordBatch>>,
       }
   
       impl Stream for ParquetStream {
           type Item = ArrowResult<RecordBatch>;
   
           fn poll_next(
               mut self: std::pin::Pin<&mut Self>,
               cx: &mut Context<'_>,
           ) -> Poll<Option<Self::Item>> {
               println!("ParquetStream::poll_next");
               let res = self.inner.poll_next_unpin(cx);
               println!("ParquetStream::poll_next() done");
               res
           }
       }
   
       impl RecordBatchStream for ParquetStream {
           fn schema(&self) -> SchemaRef {
               Arc::clone(&self.schema)
           }
       }
   
       impl Drop for ParquetStream {
           fn drop(&mut self) {
               println!("ParquetStream::drop()");
           }
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb edited a comment on issue #437: RepartitionExec produces no output if the input stream does not yield results immediately

Posted by GitBox <gi...@apache.org>.
alamb edited a comment on issue #437:
URL: https://github.com/apache/arrow-datafusion/issues/437#issuecomment-850638941


   Aha! and the root cause of my (actual) problem is that there is an underlying error that is silently ignored by the RepartitionExec!
   
   So for this ticket I will claim success if the errors are reported correctly (rather than silently ignored and empty results are returned)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #437: RepartitionExec produces no output if the input stream does not yield results immediately

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #437:
URL: https://github.com/apache/arrow-datafusion/issues/437#issuecomment-850633732


   So the bug appears to be https://github.com/apache/arrow-datafusion/blob/aeed776986da6813a4e1c54d20e8bf0eb363d706/datafusion/src/physical_plan/repartition.rs#L240-L247
   
   Namely that there is (silently ignored) error due to an unsupported partitioning scheme. 
   
   So there are at least two problems: 
   1. the error is silently ignored
   2. The unknown partitioning is not supported 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #437: RepartitionExec produces no output if the input stream does not yield results immediately

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #437:
URL: https://github.com/apache/arrow-datafusion/issues/437#issuecomment-850638941


   Aha! and the root cause of my (actual) problem is that there is an underlying error that is silently ignored by the RepartitionExec!
   
   So for this ticket I will claim success if the errors are reported correctly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb closed issue #437: RepartitionExec produces no output if the input stream errors

Posted by GitBox <gi...@apache.org>.
alamb closed issue #437:
URL: https://github.com/apache/arrow-datafusion/issues/437


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org