You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/07/15 21:11:50 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6932: Consolidate `BoundedAggregateStream`

alamb commented on code in PR #6932:
URL: https://github.com/apache/arrow-datafusion/pull/6932#discussion_r1264555881


##########
datafusion/core/src/physical_plan/aggregates/order/full.rs:
##########
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   This design tries to keep all the ordering isolated out of the main row_hash.rs logic to manage the complexity



##########
datafusion/core/src/physical_plan/aggregates/order/partial.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_expr::EmitTo;
+use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+/// Tracks grouping state when the data is ordered by some subset of
+/// the group keys.
+///
+/// Once the next *sort key* value is seen, never see groups with that
+/// sort key again, so we can emit all groups wtih the previous sort
+/// key and earlier.
+///
+/// For example, given `SUM(amt) GROUP BY id, state` if the input is
+/// sorted by `state, when a new value of `state` is seen, all groups
+/// with prior values of `state` can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+///                                            ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓
+///     ┌─────┐    ┌───────────────────┐ ┌─────┃        9        ┃ ┃ "MD"  ┃
+///     │┌───┐│    │ ┌──────────────┐  │ │     ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
+///     ││ 0 ││    │ │  123, "MA"   │  │ │        current_sort      sort_key
+///     │└───┘│    │ └──────────────┘  │ │
+///     │ ... │    │    ...            │ │      current_sort tracks the most
+///     │┌───┐│    │ ┌──────────────┐  │ │      recent group index that had
+///     ││12 ││    │ │  765, "MA"   │  │ │      the same sort_key as current
+///     │├───┤│    │ ├──────────────┤  │ │
+///     ││12 ││    │ │  923, "MD"   │◀─┼─┘
+///     │├───┤│    │ ├──────────────┤  │        ┏━━━━━━━━━━━━━━┓
+///     ││13 ││    │ │  345, "MD"   │◀─┼────────┃      12      ┃
+///     │└───┘│    │ └──────────────┘  │        ┗━━━━━━━━━━━━━━┛
+///     └─────┘    └───────────────────┘            current
+///  group indices
+/// (in group value  group_values               current tracks the most
+///      order)                                    recent group index
+///```
+#[derive(Debug)]
+pub(crate) struct GroupOrderingPartial {
+    /// State machine
+    state: State,
+
+    /// The indexes of the group by columns that form the sort key.
+    /// For example if grouping by `id, state` and ordered by `state`
+    /// this would be `[1]`.
+    order_indices: Vec<usize>,
+
+    /// Converter for the sort key (used on the group columns
+    /// specified in `order_indexes`)
+    row_converter: RowConverter,
+
+    /// Hash values for groups in 0..completed
+    hashes: Vec<u64>,
+}
+
+#[derive(Debug, Default)]
+enum State {
+    /// The ordering was temporarily taken.  `Self::Taken` is left
+    /// when state must be temporarily taken to satisfy the borrow
+    /// checker. If an error happens before the state can be restored,
+    /// the ordering information is lost and execution can not
+    /// proceed, but there is no undefined behavior.
+    #[default]
+    Taken,
+
+    /// Seen no input yet
+    Start,
+
+    /// Data is in progress.
+    InProgress {
+        /// first group index with the sort_key
+        current_sort: usize,
+        /// The sort key of group_index `current_sort`
+        sort_key: OwnedRow,

Review Comment:
   I expect this code to be quite a bit faster than the current `BoundedWindowAggregate` as it uses the row format rather than `ScalarValue`, but I don't know of any benchmarks of the streaming group by code



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -320,36 +350,36 @@ impl Stream for GroupedHashAggregateStream {
                         // new batch to aggregate
                         Some(Ok(batch)) => {
                             let timer = elapsed_compute.timer();
-                            let result = self.group_aggregate_batch(batch);
-                            timer.done();
-
-                            // allocate memory AFTER we actually used
-                            // the memory, which simplifies the whole
-                            // accounting and we are OK with
-                            // overshooting a bit.
-                            //
-                            // Also this means we either store the
-                            // whole record batch or not.
-                            let result = result.and_then(|allocated| {
-                                self.reservation.try_grow(allocated)
-                            });
-
-                            if let Err(e) = result {
-                                return Poll::Ready(Some(Err(e)));
+                            // Do the grouping

Review Comment:
   The main state machine is updated to emit data when possible (so it needs to transition back and forth between input and emitting)



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -397,16 +431,15 @@ impl GroupedHashAggregateStream {
     fn update_group_state(
         &mut self,
         group_values: &[ArrayRef],
-        allocated: &mut usize,
+        memory_delta: &mut MemoryDelta,
     ) -> Result<()> {
         // Convert the group keys into the row format
         // Avoid reallocation when https://github.com/apache/arrow-rs/issues/4479 is available
         let group_rows = self.row_converter.convert_columns(group_values)?;
         let n_rows = group_rows.num_rows();
 
         // track memory used
-        let group_values_size_pre = self.group_values.size();
-        let scratch_size_pre = self.scratch_space.size();
+        memory_delta.dec(self.state_size());

Review Comment:
   I needed to update the memory accounting logic because previously the group operator never decreased its memory reservation, but now that state is cleared early, it need to shrink as well



##########
datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs:
##########
@@ -71,7 +71,7 @@ impl AccumulatorState {
     fn size(&self) -> usize {
         self.accumulator.size()
             + std::mem::size_of_val(self)
-            + std::mem::size_of::<u32>() * self.indices.capacity()
+            + self.indices.allocated_size()

Review Comment:
   This adapter needs to be updated to account for the fact that groups can be removed and thus memory freed



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -199,6 +199,14 @@ pub(crate) struct GroupedHashAggregateStream {
 
     /// max rows in output RecordBatches
     batch_size: usize,
+
+    /// Optional ordering information, that might allow groups to be
+    /// emitted from the hash table prior to seeing the end of the
+    /// input
+    group_ordering: GroupOrdering,

Review Comment:
   I am quite pleased that the extra ordering state is tracked in a single struct



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -1116,6 +1104,7 @@ fn create_accumulators(
         .collect::<Result<Vec<_>>>()
 }
 
+#[allow(dead_code)]

Review Comment:
   This will be removed in https://github.com/apache/arrow-datafusion/pull/6968



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -439,47 +474,50 @@ impl GroupedHashAggregateStream {
                     self.map.insert_accounted(
                         (hash, group_idx),
                         |(hash, _group_index)| *hash,
-                        allocated,
+                        &mut allocated,
                     );
                     group_idx
                 }
             };
             group_indices.push(group_idx);
         }
+        memory_delta.inc(allocated);
+
+        // Update ordering information if necessary
+        let total_num_groups = self.group_values.num_rows();
+        if total_num_groups > starting_num_groups {

Review Comment:
   Here is where the ordering information is updated, and it is a few checks per batch overhead when there is no ordering



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org