You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "metesynnada (via GitHub)" <gi...@apache.org> on 2023/04/10 06:59:56 UTC

[GitHub] [arrow-datafusion] metesynnada opened a new pull request, #5937: Streaming Memory Reservation in SHJ

metesynnada opened a new pull request, #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937

   # Which issue does this PR close?
   
   Closes #5636.
   
   # Rationale for this change
   
   `SymmetricHashJoin` lacks memory manager support.
   
   # What changes are included in this PR?
   
   - SHJ memory reservation implementation.
   - Hashmap shrink if necessary
   
   # Are these changes tested?
   
   Yes
   
   # Are there any user-facing changes?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on pull request #5937: Streaming Memory Reservation in SHJ

Posted by "korowa (via GitHub)" <gi...@apache.org>.
korowa commented on PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#issuecomment-1506461751

   > Currently, testing of memory limit SHJ is blocked as I need to implement a PartitionMode selection and enable with_repartition_joins to remove RepartitionExec's from the planner based on the configuration.
   
   Won't it help to enable pipeline related physical optimizer rules and use single target partition in `memory_limit.rs` over two unbounded sources to plan SHJ?
   
   > I can build ExecutionPlan without a query, which is different than other tests.
   
   And if it's blocked at this moment, I guess this is fine option -- to add operator-level test for overallocation case (like other tests in `symmetric_hash_join.rs` maybe).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #5937: Streaming Memory Reservation in SHJ

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on code in PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#discussion_r1164022590


##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -1339,6 +1370,11 @@ fn combine_two_batches(
 }
 
 impl SymmetricHashJoinStream {
+    fn size(&self) -> usize {

Review Comment:
   Since `HashJoinExec` did not include those features, I have tried to replicate the same behavior in previous commits. However, I have now added the required memory measures. If this implementation is acceptable, we can proceed with it.



##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -1442,6 +1478,9 @@ impl SymmetricHashJoinStream {
                     // Combine results:
                     let result =
                         combine_two_batches(&self.schema, equal_result, anti_result)?;
+                    let capacity = self.size();
+                    self.metrics.stream_memory_usage.set(capacity);
+                    self.reservation.lock().resize(capacity);

Review Comment:
   I add a function called `try_resize`, which does `try_grow` inside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5937: Streaming Memory Reservation in SHJ

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#discussion_r1163177098


##########
datafusion/physical-expr/src/intervals/cp_solver.rs:
##########
@@ -122,6 +122,18 @@ pub struct ExprIntervalGraph {
     root: NodeIndex,
 }
 
+impl ExprIntervalGraph {
+    pub fn size(&self) -> usize {

Review Comment:
   I suggest following the same convention as `ScalarValue` and including the size of `Self` in this calculation:
   
   https://github.com/apache/arrow-datafusion/blob/dafe99733e0f97bfb5ef750f02d02abcb641682d/datafusion/common/src/scalar.rs#L3152-L3154
   
   ```suggestion
       /// Estimate size of bytes including `Self`.
       pub fn size(&self) -> usize {
   ```
   
   I think that would also require adding size of self to the calculation
   



##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -1339,6 +1370,11 @@ fn combine_two_batches(
 }
 
 impl SymmetricHashJoinStream {
+    fn size(&self) -> usize {

Review Comment:
   Should we also include the other fields like the `input_stream` column_indexes, etc? Maybe that is effectively a constant so it is ok to miss it in the calculation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on pull request #5937: Streaming Memory Reservation in SHJ

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#issuecomment-1505072179

   > Makes sense to me -- thank you @metesynnada
   > 
   > I suggest an "integration" test that shows that this code is all hooked up correctly -- perhaps in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs
   
   Currently, testing of memory limit SHJ is blocked as I need to implement a `PartitionMode` selection and enable `with_repartition_joins` to remove `RepartitionExec`'s from the planner based on the configuration. Once this is done, I will add the required test in a future PR. As a result, I am unable to test `SHJ` within a query at the moment. Would that be acceptable?
   
   Or, I can build `ExecutionPlan` without a query, which is different than other tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5937: Streaming Memory Reservation in SHJ

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#issuecomment-1503646980

   @korowa, @alamb, can you take a quick look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo merged pull request #5937: Streaming Memory Reservation in SHJ

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo merged PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5937: Streaming Memory Reservation in SHJ

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#issuecomment-1507504622

   @mustafasrepo perhaps you can merge this when you re ready


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on pull request #5937: Streaming Memory Reservation in SHJ

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#issuecomment-1506501173

   > Won't it help to enable pipeline related physical optimizer rules and use single target partition in `memory_limit.rs` over two unbounded sources to plan SHJ?
   
   It is a good idea 😄, it worked. I send a new commit, can you check that?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on pull request #5937: Streaming Memory Reservation in SHJ

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#issuecomment-1507392603

   Thanks for reviews @korowa and @alamb, this is good to go from our perspective


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on pull request #5937: Streaming Memory Reservation in SHJ

Posted by "korowa (via GitHub)" <gi...@apache.org>.
korowa commented on PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#issuecomment-1504909980

   Thank you @metesynnada!
   
   LGTM overall, I have only one question regarding how SHJ should behave in case of overallocations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #5937: Streaming Memory Reservation in SHJ

Posted by "korowa (via GitHub)" <gi...@apache.org>.
korowa commented on code in PR #5937:
URL: https://github.com/apache/arrow-datafusion/pull/5937#discussion_r1163829779


##########
datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs:
##########
@@ -1442,6 +1478,9 @@ impl SymmetricHashJoinStream {
                     // Combine results:
                     let result =
                         combine_two_batches(&self.schema, equal_result, anti_result)?;
+                    let capacity = self.size();
+                    self.metrics.stream_memory_usage.set(capacity);
+                    self.reservation.lock().resize(capacity);

Review Comment:
   Wouldn't it be more appropriate to use fallible version of reserving memory function (like `try_grow`) instead of for cases of increasing reservation size? I guess, sharing memory pool across multiple operators, will still fail execution, but `try_grow` usage seems to look more precise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org