You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "mustafasrepo (via GitHub)" <gi...@apache.org> on 2023/04/10 06:49:15 UTC

[GitHub] [arrow-datafusion] mustafasrepo opened a new pull request, #5936: [Minor]: Refactor row_hash implementation

mustafasrepo opened a new pull request, #5936:
URL: https://github.com/apache/arrow-datafusion/pull/5936

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #.
   
   # Rationale for this change
   `row_hash.rs` file has large chunks of code inside single function. This makes code hard to read. Also scope of the some variables are larger than necessary, which is limiting.
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   This PR refactors `group_aggregate_batch`. All code is the same except some sections of the function `group_aggregate_batch` moved `under update_group_state` and `update_accumulators`.
   Also `get_at_indices` function now returns Result<> to remove `.unwrap` inside it.
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are these changes tested?
   Existing tests should work
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5936: [Minor]: Refactor row_hash implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5936:
URL: https://github.com/apache/arrow-datafusion/pull/5936#discussion_r1161482679


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -319,192 +499,31 @@ impl GroupedHashAggregateStream {
 
         let row_converter_size_pre = self.row_converter.size();
         for group_values in &group_by_values {
-            let group_rows = self.row_converter.convert_columns(group_values)?;

Review Comment:
   This chunk is moved to the function `update_group_state`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo commented on a diff in pull request #5936: [Minor]: Refactor row_hash implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo commented on code in PR #5936:
URL: https://github.com/apache/arrow-datafusion/pull/5936#discussion_r1161482858


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -319,192 +499,31 @@ impl GroupedHashAggregateStream {
 
         let row_converter_size_pre = self.row_converter.size();
         for group_values in &group_by_values {
-            let group_rows = self.row_converter.convert_columns(group_values)?;
-
-            // 1.1 construct the key from the group values
-            // 1.2 construct the mapping key if it does not exist
-            // 1.3 add the row' index to `indices`
-
-            // track which entries in `aggr_state` have rows in this batch to aggregate
-            let mut groups_with_rows = vec![];
-
-            // 1.1 Calculate the group keys for the group values
-            let mut batch_hashes = vec![0; batch.num_rows()];
-            create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
-
-            for (row, hash) in batch_hashes.into_iter().enumerate() {
-                let entry = row_map.get_mut(hash, |(_hash, group_idx)| {
-                    // verify that a group that we are inserting with hash is
-                    // actually the same key value as the group in
-                    // existing_idx  (aka group_values @ row)
-                    let group_state = &row_group_states[*group_idx];
-                    group_rows.row(row) == group_state.group_by_values.row()
-                });
-
-                match entry {
-                    // Existing entry for this group value
-                    Some((_hash, group_idx)) => {
-                        let group_state = &mut row_group_states[*group_idx];
-
-                        // 1.3
-                        if group_state.indices.is_empty() {
-                            groups_with_rows.push(*group_idx);
-                        };
-
-                        group_state
-                            .indices
-                            .push_accounted(row as u32, &mut allocated); // remember this row
-                    }
-                    //  1.2 Need to create new entry
-                    None => {
-                        let accumulator_set =
-                            aggregates::create_accumulators(&self.normal_aggr_expr)?;
-                        // Add new entry to group_states and save newly created index
-                        let group_state = RowGroupState {
-                            group_by_values: group_rows.row(row).owned(),
-                            aggregation_buffer: vec![
-                                0;
-                                self.row_aggr_layout
-                                    .fixed_part_width()
-                            ],
-                            accumulator_set,
-                            indices: vec![row as u32], // 1.3
-                        };
-                        let group_idx = row_group_states.len();
-
-                        // NOTE: do NOT include the `RowGroupState` struct size in here because this is captured by
-                        // `group_states` (see allocation down below)
-                        allocated += (std::mem::size_of::<u8>()
-                            * group_state.group_by_values.as_ref().len())
-                            + (std::mem::size_of::<u8>()
-                                * group_state.aggregation_buffer.capacity())
-                            + (std::mem::size_of::<u32>()
-                                * group_state.indices.capacity());
-
-                        // Allocation done by normal accumulators
-                        allocated += (std::mem::size_of::<Box<dyn Accumulator>>()
-                            * group_state.accumulator_set.capacity())
-                            + group_state
-                                .accumulator_set
-                                .iter()
-                                .map(|accu| accu.size())
-                                .sum::<usize>();
-
-                        // for hasher function, use precomputed hash value
-                        row_map.insert_accounted(
-                            (hash, group_idx),
-                            |(hash, _group_index)| *hash,
-                            &mut allocated,
-                        );
-
-                        row_group_states.push_accounted(group_state, &mut allocated);
-
-                        groups_with_rows.push(group_idx);
-                    }
-                };
-            }
+            let groups_with_rows =
+                self.update_group_state(group_values, &mut allocated)?;
 
             // Collect all indices + offsets based on keys in this vec
             let mut batch_indices: UInt32Builder = UInt32Builder::with_capacity(0);
             let mut offsets = vec![0];
             let mut offset_so_far = 0;
             for &group_idx in groups_with_rows.iter() {
-                let indices = &row_group_states[group_idx].indices;
+                let indices = &self.row_aggr_state.group_states[group_idx].indices;
                 batch_indices.append_slice(indices);
                 offset_so_far += indices.len();
                 offsets.push(offset_so_far);
             }
             let batch_indices = batch_indices.finish();
 
-            let row_values = get_at_indices(&row_aggr_input_values, &batch_indices);

Review Comment:
   This chunk is moved to the function `update_accumulators`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #5936: minor: Refactor row_hash implementation

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on PR #5936:
URL: https://github.com/apache/arrow-datafusion/pull/5936#issuecomment-1501901241

   Thanks @mustafasrepo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo merged pull request #5936: minor: Refactor row_hash implementation

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo merged PR #5936:
URL: https://github.com/apache/arrow-datafusion/pull/5936


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org