You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "yahoNanJing (via GitHub)" <gi...@apache.org> on 2023/04/07 03:26:26 UTC

[GitHub] [arrow-datafusion] yahoNanJing opened a new pull request, #5904: Refine the size() calculation of accumulator

yahoNanJing opened a new pull request, #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #5903.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1160414255


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -514,6 +580,19 @@ impl GroupedHashAggregateStream {
     }
 }
 
+fn get_accumulator_set_size(
+    groups_with_rows: &[usize],
+    row_group_states: &[RowGroupState],
+) -> usize {
+    groups_with_rows.iter().fold(0usize, |acc, group_idx| {
+        let group_state = &row_group_states[*group_idx];
+        group_state
+            .accumulator_set
+            .iter()
+            .fold(acc, |acc, accumulator| acc + accumulator.size())
+    })
+}
+
 /// The state that is built for each output group.

Review Comment:
   I think the logic is quite complex to collect the memory size of the accumulators, maybe the computation is more than the real useful aggregations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #5904: Refine the size() calculation of accumulator

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#issuecomment-1504536066

   Thanks @alamb for your comments. Just refactored the code based on latest main branch code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5904: Refine the size() calculation of accumulator

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#issuecomment-1568843599

   Marking as Draft until we come to a consensus on what to do with this PR (so it is not on the review lit)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1160411422


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   I would suggest rename the `size` method to `size_in_bytes`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5904: Refine the size() calculation of accumulator

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#issuecomment-1503862391

   I will try and review this carefully later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Refine the size() calculation of accumulator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#issuecomment-2043644864

   Since this has been open for more than a year, closing it down. Feel free to reopen if/when you keep working on it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1160417848


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   It may be changed. For example, to deal with null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1163316574


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -419,92 +423,154 @@ impl GroupedHashAggregateStream {
             let row_values = get_at_indices(&row_aggr_input_values, &batch_indices);
             let normal_values = get_at_indices(&normal_aggr_input_values, &batch_indices);
 
+            let accumulator_set_pre =
+                get_accumulator_set_size(&groups_with_rows, row_group_states);
             // 2.1 for each key in this batch
             // 2.2 for each aggregation
             // 2.3 `slice` from each of its arrays the keys' values
             // 2.4 update / merge the accumulator with the values
             // 2.5 clear indices
-            groups_with_rows
-                .iter()
-                .zip(offsets.windows(2))
-                .try_for_each(|(group_idx, offsets)| {
-                    let group_state = &mut row_group_states[*group_idx];
-                    // 2.2
-                    self.row_accumulators
-                        .iter_mut()
-                        .zip(row_values.iter())
-                        .map(|(accumulator, aggr_array)| {
-                            (
-                                accumulator,
-                                aggr_array
-                                    .iter()
-                                    .map(|array| {
-                                        // 2.3
-                                        array.slice(offsets[0], offsets[1] - offsets[0])
-                                    })
-                                    .collect::<Vec<ArrayRef>>(),
-                            )
-                        })
-                        .try_for_each(|(accumulator, values)| {
-                            let mut state_accessor = RowAccessor::new_from_layout(
-                                self.row_aggr_layout.clone(),
-                            );
-                            state_accessor.point_to(
-                                0,
-                                group_state.aggregation_buffer.as_mut_slice(),
-                            );
-                            match self.mode {
-                                AggregateMode::Partial => {
+            match self.mode {
+                AggregateMode::Partial => {
+                    groups_with_rows
+                        .iter()
+                        .zip(offsets.windows(2))
+                        .try_for_each(|(group_idx, offsets)| {
+                            let group_state = &mut row_group_states[*group_idx];
+                            // 2.2
+                            self.row_accumulators
+                                .iter_mut()
+                                .zip(row_values.iter())
+                                .map(|(accumulator, aggr_array)| {
+                                    (
+                                        accumulator,
+                                        aggr_array
+                                            .iter()
+                                            .map(|array| {
+                                                // 2.3
+                                                array.slice(
+                                                    offsets[0],
+                                                    offsets[1] - offsets[0],
+                                                )
+                                            })
+                                            .collect::<Vec<ArrayRef>>(),
+                                    )
+                                })
+                                .try_for_each(|(accumulator, values)| {
+                                    let mut state_accessor = RowAccessor::new_from_layout(
+                                        self.row_aggr_layout.clone(),
+                                    );
+                                    state_accessor.point_to(
+                                        0,
+                                        group_state.aggregation_buffer.as_mut_slice(),
+                                    );
                                     accumulator.update_batch(&values, &mut state_accessor)
-                                }
-                                AggregateMode::FinalPartitioned
-                                | AggregateMode::Final => {
-                                    // note: the aggregation here is over states, not values, thus the merge
-                                    accumulator.merge_batch(&values, &mut state_accessor)
-                                }
-                            }
-                        })
-                        // 2.5
-                        .and(Ok(()))?;
-                    // normal accumulators
-                    group_state
-                        .accumulator_set
-                        .iter_mut()
-                        .zip(normal_values.iter())
-                        .map(|(accumulator, aggr_array)| {
-                            (
-                                accumulator,
-                                aggr_array
-                                    .iter()
-                                    .map(|array| {
-                                        // 2.3
-                                        array.slice(offsets[0], offsets[1] - offsets[0])
-                                    })
-                                    .collect::<Vec<ArrayRef>>(),
-                            )
-                        })
-                        .try_for_each(|(accumulator, values)| {
-                            let size_pre = accumulator.size();
-                            let res = match self.mode {
-                                AggregateMode::Partial => {
+                                })
+                                // 2.5
+                                .and(Ok(()))?;
+                            // normal accumulators
+                            group_state
+                                .accumulator_set
+                                .iter_mut()
+                                .zip(normal_values.iter())
+                                .map(|(accumulator, aggr_array)| {
+                                    (
+                                        accumulator,
+                                        aggr_array
+                                            .iter()
+                                            .map(|array| {
+                                                // 2.3
+                                                array.slice(
+                                                    offsets[0],
+                                                    offsets[1] - offsets[0],
+                                                )
+                                            })
+                                            .collect::<Vec<ArrayRef>>(),
+                                    )
+                                })
+                                .try_for_each(|(accumulator, values)| {
                                     accumulator.update_batch(&values)
-                                }
-                                AggregateMode::FinalPartitioned
-                                | AggregateMode::Final => {
+                                })
+                                // 2.5
+                                .and({
+                                    group_state.indices.clear();
+                                    Ok(())
+                                })
+                        })?;
+                }
+                AggregateMode::FinalPartitioned | AggregateMode::Final => {

Review Comment:
   Maybe github is rendering the diff confusingly, but this seems like a significant amount of new code



##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   According to the docs 
   
   https://github.com/yahoNanJing/arrow-datafusion/blob/issue-5903/datafusion/expr/src/accumulator.rs#L88
   
   
   ```rust
       /// Allocated size required for this accumulator, in bytes, including `Self`.
       /// Allocated means that for internal containers such as `Vec`, the `capacity` should be used
       /// not the `len`
       fn size(&self) -> usize;
   ```
   
   The change in this PR seems to avoid extra allocations in ScalarValue (such as `ScalarValue::Utf8` which has an allocated string in it)



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -514,6 +580,19 @@ impl GroupedHashAggregateStream {
     }
 }
 
+fn get_accumulator_set_size(
+    groups_with_rows: &[usize],
+    row_group_states: &[RowGroupState],
+) -> usize {
+    groups_with_rows.iter().fold(0usize, |acc, group_idx| {
+        let group_state = &row_group_states[*group_idx];
+        group_state
+            .accumulator_set
+            .iter()
+            .fold(acc, |acc, accumulator| acc + accumulator.size())
+    })
+}
+
 /// The state that is built for each output group.

Review Comment:
   Yes, I agree the calculation for the memory size is complicated. 
   
   I think @tustvold  has been thinking about how to improve performance in this area, but I am not sure how far he has gotten
   
   In general, managing individual allocations (and then accounting for their sizes) for each group is a significant additional overhead for grouping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on pull request #5904: Refine the size() calculation of accumulator

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#issuecomment-1502618865

   Hi @Dandandan, @alamb, could you help review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1160411246


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   Will this size change after the AvgAccumulator /SumAccumulator struct is initialized?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1160417848


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   It may be changed. For example, to deal with null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5904: Refine the size() calculation of accumulator

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#issuecomment-1545664721

   I am trying to clean up outstanding PRs and I came across this one. What shall we do with it @yahoNanJing  -- should we pursue getting it merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1160443563


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   I think not. Option is of Enum type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yahoNanJing commented on a diff in pull request #5904: Refine the size() calculation of accumulator

Posted by "yahoNanJing (via GitHub)" <gi...@apache.org>.
yahoNanJing commented on code in PR #5904:
URL: https://github.com/apache/arrow-datafusion/pull/5904#discussion_r1161499033


##########
datafusion/physical-expr/src/aggregate/sum.rs:
##########
@@ -266,7 +266,7 @@ impl Accumulator for SumAccumulator {
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + self.sum.size()
+        std::mem::size_of_val(self)
     }

Review Comment:
   From the flamegraph, it seems it's not a bottleneck now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Refine the size() calculation of accumulator [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed pull request #5904: Refine the size() calculation of accumulator
URL: https://github.com/apache/arrow-datafusion/pull/5904


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org