You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:26:19 UTC

[GitHub] [arrow-datafusion] andygrove opened a new pull request #543: Ballista: Implement map-side shuffle

andygrove opened a new pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #456
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   Another step towards implementing full shuffle support.
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   The result meta-data from executing a query stage now has an additional column with a partition number.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r659184439



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {

Review comment:
       That makes sense. I filed https://github.com/apache/arrow-datafusion/issues/626 for this. I'd like to get the basic end-to-end shuffle mechanism working before we start optimizing too much.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] edrevo commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
edrevo commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r650883972



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {

Review comment:
       just thinking out loud without any data to back me up, but maybe it is worth special-casing when n==1, so we don't actually perform the hash of everything, since all of the data is going to end up in the same partition anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#issuecomment-859918299


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#543](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (12f2f5e) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/63e3045c9e0dd0579ec2be92bb174401f898833f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (63e3045) will **increase** coverage by `0.13%`.
   > The diff coverage is `91.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/543/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #543      +/-   ##
   ==========================================
   + Coverage   76.08%   76.22%   +0.13%     
   ==========================================
     Files         156      156              
     Lines       27035    27160     +125     
   ==========================================
   + Hits        20570    20702     +132     
   + Misses       6465     6458       -7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...lista/rust/core/src/execution\_plans/query\_stage.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9leGVjdXRpb25fcGxhbnMvcXVlcnlfc3RhZ2UucnM=) | `85.65% <91.54%> (+9.86%)` | :arrow_up: |
   | [ballista/rust/core/src/serde/scheduler/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9zZXJkZS9zY2hlZHVsZXIvbW9kLnJz) | `60.71% <100.00%> (+1.78%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `78.70% <100.00%> (ø)` | |
   | [datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9wbGFubmVyLnJz) | `77.77% <0.00%> (-2.42%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/windows.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi93aW5kb3dzLnJz) | `86.46% <0.00%> (+0.24%)` | :arrow_up: |
   | [datafusion/src/logical\_plan/window\_frames.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvbG9naWNhbF9wbGFuL3dpbmRvd19mcmFtZXMucnM=) | `89.39% <0.00%> (+1.51%)` | :arrow_up: |
   | [...ta/rust/core/src/serde/physical\_plan/from\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9zZXJkZS9waHlzaWNhbF9wbGFuL2Zyb21fcHJvdG8ucnM=) | `39.83% <0.00%> (+2.11%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [63e3045...12f2f5e](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] edrevo commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
edrevo commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r650881738



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {
+                let num_output_partitions = *n;
+
+                // we won't necessary produce output for every possible partition, so we
+                // create writers on demand
+                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
+                for _ in 0..num_output_partitions {
+                    writers.push(None);
+                }
+
+                let hashes_buf = &mut vec![];
+                let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
+                while let Some(result) = stream.next().await {
+                    let input_batch = result?;
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| {
+                            Ok(expr
+                                .evaluate(&input_batch)?
+                                .into_array(input_batch.num_rows()))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    hashes_buf.clear();
+                    hashes_buf.resize(arrays[0].len(), 0);

Review comment:
       noob question: is there a guarantee that all recordbatches have at least one element?

##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {
+                let num_output_partitions = *n;
+
+                // we won't necessary produce output for every possible partition, so we
+                // create writers on demand
+                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];

Review comment:
       Looks like Arc + Mutex is unnecessary if you use `.iter_mut()` when necessary

##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {

Review comment:
       just thinking out loud without any data to back me up, but maybe it is worth special-casing then n==1, so we don't actually perform the hash of everything, since all of the data is going to end up in the same partition anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#issuecomment-859939472


   @edrevo fyi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r650554371



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path).unwrap();

Review comment:
       The unwrap here can be replaced with `?` right?
   
   ```suggestion
                   path_builder.append_value(&path)?;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove merged pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
andygrove merged pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r651085152



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {
+                let num_output_partitions = *n;
+
+                // we won't necessary produce output for every possible partition, so we
+                // create writers on demand
+                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
+                for _ in 0..num_output_partitions {
+                    writers.push(None);
+                }
+
+                let hashes_buf = &mut vec![];
+                let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
+                while let Some(result) = stream.next().await {
+                    let input_batch = result?;
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| {
+                            Ok(expr
+                                .evaluate(&input_batch)?
+                                .into_array(input_batch.num_rows()))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    hashes_buf.clear();
+                    hashes_buf.resize(arrays[0].len(), 0);

Review comment:
       Yes, I believe so: https://github.com/apache/arrow-rs/blob/master/arrow/src/record_batch.rs#L114-L118




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter edited a comment on pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#issuecomment-859918299


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#543](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (eb2d673) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/63e3045c9e0dd0579ec2be92bb174401f898833f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (63e3045) will **increase** coverage by `0.08%`.
   > The diff coverage is `90.90%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/543/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #543      +/-   ##
   ==========================================
   + Coverage   76.08%   76.17%   +0.08%     
   ==========================================
     Files         156      156              
     Lines       27035    27174     +139     
   ==========================================
   + Hits        20570    20699     +129     
   - Misses       6465     6475      +10     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...lista/rust/core/src/execution\_plans/query\_stage.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9leGVjdXRpb25fcGxhbnMvcXVlcnlfc3RhZ2UucnM=) | `85.13% <90.78%> (+9.34%)` | :arrow_up: |
   | [ballista/rust/core/src/serde/scheduler/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9zZXJkZS9zY2hlZHVsZXIvbW9kLnJz) | `60.71% <100.00%> (+1.78%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `79.09% <100.00%> (+0.38%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9wbGFubmVyLnJz) | `77.53% <0.00%> (-2.66%)` | :arrow_down: |
   | [ballista/rust/core/src/utils.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy91dGlscy5ycw==) | `25.53% <0.00%> (-2.06%)` | :arrow_down: |
   | [...ista/rust/core/src/serde/physical\_plan/to\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jb3JlL3NyYy9zZXJkZS9waHlzaWNhbF9wbGFuL3RvX3Byb3RvLnJz) | `49.38% <0.00%> (-0.93%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/hash\_join.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9oYXNoX2pvaW4ucnM=) | `84.89% <0.00%> (-0.63%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/expressions/case.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9leHByZXNzaW9ucy9jYXNlLnJz) | `75.00% <0.00%> (-0.57%)` | :arrow_down: |
   | [datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvZXhlY3V0aW9uL2NvbnRleHQucnM=) | `92.00% <0.00%> (-0.09%)` | :arrow_down: |
   | [ballista/rust/client/src/context.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jbGllbnQvc3JjL2NvbnRleHQucnM=) | `0.00% <0.00%> (ø)` | |
   | ... and [9 more](https://codecov.io/gh/apache/arrow-datafusion/pull/543/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [63e3045...eb2d673](https://codecov.io/gh/apache/arrow-datafusion/pull/543?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r651076922



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {
+                let num_output_partitions = *n;
+
+                // we won't necessary produce output for every possible partition, so we
+                // create writers on demand
+                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
+                for _ in 0..num_output_partitions {
+                    writers.push(None);
+                }
+
+                let hashes_buf = &mut vec![];
+                let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
+                while let Some(result) = stream.next().await {
+                    let input_batch = result?;
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| {
+                            Ok(expr
+                                .evaluate(&input_batch)?
+                                .into_array(input_batch.num_rows()))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    hashes_buf.clear();
+                    hashes_buf.resize(arrays[0].len(), 0);

Review comment:
       There needs to be at least one *column*  based on the expressions in hash repartitioning - which I think should be a prerequisite when doing hash repartitioning - I am not sure whether DataFusion checks on that explicitly when constructing it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r659184184



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {
+                let num_output_partitions = *n;
+
+                // we won't necessary produce output for every possible partition, so we
+                // create writers on demand
+                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];

Review comment:
       I tried changing this but ran into ownership issues. I'll go ahead and merge and perhaps someone can help me with fixing this as a follow up PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #543: Ballista: Implement map-side shuffle

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #543:
URL: https://github.com/apache/arrow-datafusion/pull/543#discussion_r650512654



##########
File path: ballista/rust/core/src/execution_plans/query_stage.rs
##########
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path).unwrap();
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {
+                let num_output_partitions = *n;
+
+                // we won't necessary produce output for every possible partition, so we
+                // create writers on demand
+                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = vec![];
+                for _ in 0..num_output_partitions {
+                    writers.push(None);
+                }
+
+                let hashes_buf = &mut vec![];
+                let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
+                while let Some(result) = stream.next().await {
+                    let input_batch = result?;
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| {
+                            Ok(expr
+                                .evaluate(&input_batch)?
+                                .into_array(input_batch.num_rows()))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    hashes_buf.clear();

Review comment:
       Maybe we could reuse the code better at some moment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org