You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/06 16:25:56 UTC

[GitHub] [arrow-datafusion] ic4y commented on issue #956: Make aggregate accumulators storage column-based

ic4y commented on issue #956:
URL: https://github.com/apache/arrow-datafusion/issues/956#issuecomment-986936253


   @Dandandan 
   I am currently working out ways to solve the performance problem of high cardinality aggregation. Follow your method and tested it. I found that there is a certain performance improvement, but not ideal enough, only improved by about 10% under high base aggregation (I think it needs several times performance improvement likes doris and trino's performance under the high cardinality aggregation #1246). Do you have any better advice or other optimizations for these codes.
   The relevant code segments are as follows.
   Accumulators:
   ```rust
   struct Accumulators {
       /// Logically maps group values to an index in `group_states`
       ///
       /// Uses the raw API of hashbrown to avoid actually storing the
       /// keys in the table
       ///
       /// keys: u64 hashes of the GroupValue
       /// values: (hash, index into `group_states`)
       map: RawTable<(u64, usize)>,
   
       // Accumulator state, keeps state of each group state
       accumulator_items: Vec<AccumulatorItem>,
   
       //group_states: Vec<GroupState>,
       group_by_values: Vec<Vec<ScalarValue>>,
   
       group_indices : Vec<Vec<u32>>,
   }
   ```
   group_aggregate_batch:
   ```rust
   fn group_aggregate_batch(
       mode: &AggregateMode,
       random_state: &RandomState,
       group_expr: &[Arc<dyn PhysicalExpr>],
       aggr_expr: &[Arc<dyn AggregateExpr>],
       batch: RecordBatch,
       mut accumulators: Accumulators,
       aggregate_expressions: &[Vec<Arc<dyn PhysicalExpr>>],
   ) -> Result<Accumulators> {
       // evaluate the grouping expressions
   
       let group_values = evaluate(group_expr, &batch)?;
   
       // evaluate the aggregation expressions.
       // We could evaluate them after the `take`, but since we need to evaluate all
       // of them anyways, it is more performant to do it while they are together.
   
       let aggr_input_values = evaluate_many(aggregate_expressions, &batch)?;
   
       // 1.1 construct the key from the group values
       // 1.2 construct the mapping key if it does not exist
       // 1.3 add the row' index to `indices`
   
       // track which entries in `accumulators` have rows in this batch to aggregate
       let mut groups_with_rows = vec![];
       // 1.1 Calculate the group keys for the group values
       let mut batch_hashes = vec![0; batch.num_rows()];
   
       create_hashes(&group_values, random_state, &mut batch_hashes)?;
   
       for (row, hash) in batch_hashes.into_iter().enumerate() {
           let Accumulators { map, accumulator_items,group_by_values, group_indices } = &mut accumulators;
   
           let entry = map.get_mut(hash, |(_hash, group_idx)| {
               let group_state_c = &group_by_values[*group_idx];
               group_values
                   .iter()
                   .zip(group_state_c.iter())
                   .all(|(array, scalar)| scalar.eq_array(array, row))
           });
   
           match entry {
               // Existing entry for this group value
               Some((_hash, group_idx)) => {
                   let indices = &mut group_indices[*group_idx];
                   // 1.3
                   if indices.is_empty() {
                       groups_with_rows.push(*group_idx);
                   };
                   indices.push(row as u32); // remember this row
               }
               //  1.2 Need to create new entry
               None => {
                   // Copy group values out of arrays into `ScalarValue`s
                   let col_group_by_values = group_values
                       .iter()
                       .map(|col| ScalarValue::try_from_array(col, row))
                       .collect::<Result<Vec<_>>>()?;
   
                   let group_idx = group_by_values.len();
                   group_by_values.push(col_group_by_values);
   
                   //TODO 这个地方需要给每个agg的状态初始化
                   accumulator_items[0].init_state(group_idx);
                   groups_with_rows.push(group_idx);
                   group_indices.push(vec![row as u32]);
   
                   // for hasher function, use precomputed hash value
                   map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
   
               }
           };
       }
   
       // Collect all indices + offsets based on keys in this vec
       let mut batch_indices_cc: UInt32Builder = UInt32Builder::new(0);
       let mut offsets = vec![0];
       let mut offset_so_far = 0;
       for group_idx in groups_with_rows.iter() {
           let indices = &accumulators.group_indices[*group_idx];
           batch_indices_cc.append_slice(indices)?;
           offset_so_far += indices.len();
           offsets.push(offset_so_far);
       }
       let batch_indices = batch_indices_cc.finish();
       // `Take` all values based on indices into Arrays
       let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
           .iter()
           .map(|array| {
               array
                   .iter()
                   .map(|array| {
                       compute::take(
                           array.as_ref(),
                           &batch_indices,
                           None, // None: no index check
                       )
                           .unwrap()
                   })
                   .collect()
               // 2.3
           })
           .collect();
   
       // 2.1 for each key in this batch
       // 2.2 for each aggregation
       // 2.3 `slice` from each of its arrays the keys' values
       // 2.4 update / merge the accumulator with the values
       // 2.5 clear indices
       groups_with_rows.iter()
           .zip(offsets.windows(2))
           .try_for_each(|(group_idx, offsets)| {
               accumulators.group_indices[*group_idx].clear();
               accumulators.accumulator_items.iter_mut()
                   .zip(values.iter())
                   .try_for_each(|(accumulator, aggr_array)| {
                       let values = aggr_array
                           .iter()
                           .map(|array| {
                               array.slice(offsets[0], offsets[1] - offsets[0])
                           })
                           .collect::<Vec<ArrayRef>>();
                       match mode {
                           AggregateMode::Partial => accumulator.update_batch(*group_idx, &values),
                           AggregateMode::FinalPartitioned | AggregateMode::Final => {
                               accumulator.merge_batch(*group_idx, &values)
                           }
                       }
               })
           });
       Ok(accumulators)
   }
   ```
   Count:
   ```rust
   pub struct CountAccumulatorFly {
       count: Vec<u64>,
   }
   
   impl CountAccumulatorFly {
       /// new count accumulator
       pub fn new() -> Self {
           Self { count: vec![] }
       }
   }
   
   impl Accumulator for CountAccumulatorFly {
       fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
           let array = &values[0];
           self.count[0] += (array.len() - array.data().null_count()) as u64;
           Ok(())
       }
   
       fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
           let value = &values[0];
           if !value.is_null() {
               self.count[0] += 1;
           }
           Ok(())
       }
   
       fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
           let count = &states[0];
           if let ScalarValue::UInt64(Some(delta)) = count {
               self.count[0] += *delta;
           } else {
               unreachable!()
           }
           Ok(())
       }
   
       fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
           let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
           let delta = &compute::sum(counts);
           if let Some(d) = delta {
               self.count[0] += *d;
           }
           Ok(())
       }
   
       fn state(&self) -> Result<Vec<ScalarValue>> {
           Ok(vec![ScalarValue::UInt64(Some(self.count[0]))])
       }
   
       fn evaluate(&self) -> Result<ScalarValue> {
           Ok(ScalarValue::UInt64(Some(self.count[0])))
       }
   }
   
   
   impl AccumulatorFly for CountAccumulatorFly {
       fn init_state(&mut self, index: usize) {
           assert_eq!(self.count.len(), index);
           self.count.push(0);
       }
       fn update_batch(&mut self, index: usize, values: &[ArrayRef]) -> Result<()> {
           let array = &values[0];
           self.count[index] += (array.len() - array.data().null_count()) as u64;
           Ok(())
       }
   
       fn update(&mut self, index: usize, values: &[ScalarValue]) -> Result<()> {
           let value = &values[0];
           if !value.is_null() {
               self.count[index] += 1;
           }
           Ok(())
       }
   
       fn merge(&mut self, index: usize, states: &[ScalarValue]) -> Result<()> {
           let count = &states[0];
           if let ScalarValue::UInt64(Some(delta)) = count {
               self.count[index] += *delta;
           } else {
               unreachable!()
           }
           Ok(())
       }
   
       fn merge_batch(&mut self, index: usize, states: &[ArrayRef]) -> Result<()> {
           let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
           let delta = &compute::sum(counts);
           if let Some(d) = delta {
               self.count[index] += *d;
           }
           Ok(())
       }
   
       fn state(&self, index: usize) -> Result<Vec<ScalarValue>> {
           Ok(vec![ScalarValue::UInt64(Some(self.count[index]))])
       }
   
       fn evaluate(&self, index: usize) -> Result<ScalarValue> {
           Ok(ScalarValue::UInt64(Some(self.count[index])))
       }
   
       fn evaluate_all(&self) -> Result<ArrayRef> {
           let result = ScalarValue::iter_to_array(
               self.count.iter().map(|x| {
                   ScalarValue::UInt64(Some(*x))
               }),
           );
           result
       }
   
       fn state_all(&self) -> Result<Vec<Vec<ScalarValue>>> {
           let dt = Local::now();
           let result = Ok(vec![self.count.iter().map(|x| {
               ScalarValue::UInt64(Some(*x))
           }).collect()]);
           println!(
               "state_all usage millis: {}",
               Local::now().timestamp_millis() - dt.timestamp_millis()
           );
           result
       }
   }
   ```
   more code:
   https://github.com/ic4y/arrow-datafusion/blob/9d26b797f2e1565f7f65048ef1fc2ad2940d8f95/datafusion/src/physical_plan/hash_aggregate_fly.rs#L578-L596


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org