You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:30:07 UTC

[GitHub] [arrow-datafusion] andygrove opened a new pull request #541: WIP: ShuffleReaderExec now supports multiple locations per partition

andygrove opened a new pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #540 .
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on a change in pull request #541: WIP: ShuffleReaderExec now supports multiple locations per partition

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#discussion_r650138952



##########
File path: ballista/rust/client/src/context.rs
##########
@@ -74,32 +71,6 @@ impl BallistaContextState {
     }
 }
 
-struct WrappedStream {
-    stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
-    schema: SchemaRef,
-}
-
-impl RecordBatchStream for WrappedStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-impl Stream for WrappedStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
-        self.stream.poll_next_unpin(cx)
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        self.stream.size_hint()
-    }
-}
-

Review comment:
       This is moved to ballista-core utils

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),
+            Arc::new(self.schema.as_ref().clone()),
+        );
+        Ok(Box::pin(result))
     }

Review comment:
       This is the main change

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       Once the basic shuffle mechanism is implemented, there will be a lot of optimization work to follow




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on pull request #541: ShuffleReaderExec now supports multiple locations per partition

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#issuecomment-859719722


   @edrevo fyi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] edrevo commented on a change in pull request #541: ShuffleReaderExec now supports multiple locations per partition

Posted by GitBox <gi...@apache.org>.
edrevo commented on a change in pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#discussion_r650183206



##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))

Review comment:
       nit; if you change `fetch_partition` to take a refernce, you can avoid the `.clone`:
   
   ```suggestion
           let partition_locations = &self.partition[partition];
           let result = future::join_all(partition_locations.iter().map(fetch_partition))
   ```

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       > I wonder if it will serialize them all (aka not start reading from the second until the first is entirely consumed)?
   
   Yes, that's exactly what it does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #541: ShuffleReaderExec now supports multiple locations per partition

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#issuecomment-859788675


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/541?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#541](https://codecov.io/gh/apache/arrow-datafusion/pull/541?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1551d32) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/63e3045c9e0dd0579ec2be92bb174401f898833f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (63e3045) will **increase** coverage by `0.00%`.
   > The diff coverage is `2.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/541/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/541?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master     #541   +/-   ##
   =======================================
     Coverage   76.08%   76.09%           
   =======================================
     Files         156      156           
     Lines       27035    27048   +13     
   =======================================
   + Hits        20570    20581   +11     
   - Misses       6465     6467    +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/541?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [ballista/rust/client/src/context.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jbGllbnQvc3JjL2NvbnRleHQucnM=) | `0.00% <0.00%> (ø)` | |
   | [...ta/rust/core/src/execution\_plans/shuffle\_reader.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9leGVjdXRpb25fcGxhbnMvc2h1ZmZsZV9yZWFkZXIucnM=) | `0.00% <0.00%> (ø)` | |
   | [...ista/rust/core/src/serde/physical\_plan/to\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9zZXJkZS9waHlzaWNhbF9wbGFuL3RvX3Byb3RvLnJz) | `49.38% <0.00%> (-0.93%)` | :arrow_down: |
   | [ballista/rust/core/src/utils.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy91dGlscy5ycw==) | `25.53% <0.00%> (-2.06%)` | :arrow_down: |
   | [ballista/rust/scheduler/src/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9zY2hlZHVsZXIvc3JjL3BsYW5uZXIucnM=) | `66.91% <ø> (ø)` | |
   | [ballista/rust/scheduler/src/state/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9zY2hlZHVsZXIvc3JjL3N0YXRlL21vZC5ycw==) | `70.49% <0.00%> (ø)` | |
   | [...ta/rust/core/src/serde/physical\_plan/from\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9zZXJkZS9waHlzaWNhbF9wbGFuL2Zyb21fcHJvdG8ucnM=) | `39.16% <100.00%> (+1.45%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9wbGFubmVyLnJz) | `77.77% <0.00%> (-2.42%)` | :arrow_down: |
   | ... and [2 more](https://codecov.io/gh/apache/arrow-datafusion/pull/541/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/541?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/541?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [63e3045...1551d32](https://codecov.io/gh/apache/arrow-datafusion/pull/541?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #541: ShuffleReaderExec now supports multiple locations per partition

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541#discussion_r650205163



##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       this is a clever way of flattening the streams (though I wonder if it will serialize them all (aka not start reading from the second until the first is entirely consumed)?

##########
File path: ballista/rust/client/src/context.rs
##########
@@ -74,32 +71,6 @@ impl BallistaContextState {
     }
 }
 
-struct WrappedStream {
-    stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
-    schema: SchemaRef,
-}
-
-impl RecordBatchStream for WrappedStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
-impl Stream for WrappedStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
-        self.stream.poll_next_unpin(cx)
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        self.stream.size_hint()
-    }
-}
-

Review comment:
       this is a cool abstraction -- i have had need of something similar elsewhere -- perhaps it would be good to move to datafusion itself eventually

##########
File path: ballista/rust/core/src/execution_plans/shuffle_reader.rs
##########
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
         partition: usize,
     ) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
         info!("ShuffleReaderExec::execute({})", partition);
-        let partition_location = &self.partition_location[partition];
-
-        let mut client = BallistaClient::try_new(
-            &partition_location.executor_meta.host,
-            partition_location.executor_meta.port,
-        )
-        .await
-        .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;
 
-        client
-            .fetch_partition(
-                &partition_location.partition_id.job_id,
-                partition_location.partition_id.stage_id,
-                partition,
-            )
+        let x = self.partition[partition].clone();
+        let result = future::join_all(x.into_iter().map(fetch_partition))
             .await
-            .map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
+
+        let result = WrappedStream::new(
+            Box::pin(futures::stream::iter(result).flatten()),

Review comment:
       👍 
   I guess I figured I would point it out (that the different partitions wouldn't be producing in parallel)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #541: ShuffleReaderExec now supports multiple locations per partition

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #541:
URL: https://github.com/apache/arrow-datafusion/pull/541


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org