You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/25 13:54:27 UTC

[GitHub] [arrow-datafusion] andygrove opened a new issue #63: Implement scalable distributed joins

andygrove opened a new issue #63:
URL: https://github.com/apache/arrow-datafusion/issues/63


   **Is your feature request related to a problem or challenge? Please describe what you are trying to do.**
   
   The main issue limiting scalability in Ballista today is that joins are implemented as hash joins where each partition of the probe side causes the entire left side to be loaded into memory.
   
   
   **Describe the solution you'd like**
   
   To make this scalable we need to hash partition left and right inputs so that we can join the left and right partitions in parallel.
   
   There is already work underway in DataFusion to implement this that we can leverage.
   
   **Describe alternatives you've considered**
   None
   
   **Additional context**
   None
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan closed issue #63: Implement scalable distributed joins

Posted by GitBox <gi...@apache.org>.
Dandandan closed issue #63:
URL: https://github.com/apache/arrow-datafusion/issues/63


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on issue #63: Implement scalable distributed joins

Posted by GitBox <gi...@apache.org>.
andygrove commented on issue #63:
URL: https://github.com/apache/arrow-datafusion/issues/63#issuecomment-857637980


   I created a Google doc to discuss the design, and planned work, in more detail.
   
   https://docs.google.com/document/d/1yUnGWsHKYOAxWijDJisEFYU4dIym_GSRSMpwfWjVZq8/edit?usp=sharing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] boazberman commented on issue #63: Implement scalable distributed joins

Posted by GitBox <gi...@apache.org>.
boazberman commented on issue #63:
URL: https://github.com/apache/arrow-datafusion/issues/63#issuecomment-830211472


   I'd love to work on this if someone can provide further reading material and/or the area in the code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] andygrove commented on issue #63: Implement scalable distributed joins

Posted by GitBox <gi...@apache.org>.
andygrove commented on issue #63:
URL: https://github.com/apache/arrow-datafusion/issues/63#issuecomment-846413793


   Here is some additional information. When I run TPC-H query 5 in the benchmarks, against DataFusion, I see that the physical plan used partitioned joins.
   
   For example,  I see that both inputs to the join are partitioned on the join keys, and the join mode is `Partitioned`.
   
   ```
   HashJoinExec: mode=Partitioned, join_type=Inner, on=[("c_custkey", "o_custkey")]
     RepartitionExec: partitioning=Hash([Column { name: "c_custkey" }], 24)
       ParquetExec: batch_size=8192, limit=None, partitions=[...]
     RepartitionExec: partitioning=Hash([Column { name: "o_custkey" }], 24)
       FilterExec: o_orderdate >= CAST(1994-01-01 AS Date32) AND o_orderdate < CAST(1995-01-01 AS Date32)
         ParquetExec: batch_size=8192, limit=None, partitions=[...]
   ```
   
   This means that the join can run in parallel because the inputs are partitioned. So partition 1 of the join reads partition 1 of the left and right inputs, and so on.
   
   When I run the same query against Ballista, I see.
   
   ```
   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[("c_custkey", "o_custkey")]
     ParquetExec: batch_size=8192, limit=None, partitions=[...]
     FilterExec: o_orderdate >= CAST(1994-01-01 AS Date32) AND o_orderdate < CAST(1995-01-01 AS Date32)
       ParquetExec: batch_size=8192, limit=None, partitions=[
   ```
   
   Here, we see join mode `CollectLeft`, which means that each partition being executed will go and fetch the entire left-side of the join into memory. This is very inefficient both in terms of memory and compute and potentially gets exponentially slower the more partitions we have.
   
   What we need to do is apply the same "partitioned hash join" pattern to Ballista.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org