You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:24:05 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7687: ARROW-9382: [Rust][DataFusion] Simplified hash aggregations and added Boolean type

jorgecarleitao commented on a change in pull request #7687:
URL: https://github.com/apache/arrow/pull/7687#discussion_r461825412



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -327,120 +278,49 @@ impl RecordBatchReader for GroupedHashAggregateIterator {
                 })
                 .collect::<ArrowResult<Vec<_>>>()?;
 
-            // create vector large enough to hold the grouping key
+            // create vector to hold the grouping key
             let mut key = Vec::with_capacity(group_values.len());
             for _ in 0..group_values.len() {
-                key.push(GroupByScalar::UInt32(0));
+                key.push(KeyScalar::UInt32(0));
             }
 
             // iterate over each row in the batch and create the accumulators for each grouping key
-            let mut accumulators: Vec<Rc<AccumulatorSet>> =
-                Vec::with_capacity(batch.num_rows());
-
             for row in 0..batch.num_rows() {
-                // create grouping key for this row
-                create_key(&group_values, row, &mut key)
-                    .map_err(ExecutionError::into_arrow_external_error)?;
-
-                if let Some(accumulator_set) = map.get(&key) {
-                    accumulators.push(accumulator_set.clone());
-                } else {
-                    let accumulator_set: AccumulatorSet = self
-                        .aggr_expr
-                        .iter()
-                        .map(|expr| expr.create_accumulator())
-                        .collect();
-
-                    let accumulator_set = Rc::new(accumulator_set);
-
-                    map.insert(key.clone(), accumulator_set.clone());
-                    accumulators.push(accumulator_set);
+                // create and assign the grouping key of this row
+                for i in 0..group_values.len() {
+                    key[i] = create_key(&group_values[i], row)
+                        .map_err(ExecutionError::into_arrow_external_error)?;
                 }
-            }
 
-            // iterate over each non-grouping column in the batch and update the accumulator
-            // for each row
-            for col in 0..aggr_input_values.len() {
-                let array = &aggr_input_values[col];
-
-                match array.data_type() {
-                    DataType::Int8 => update_accumulators!(
-                        array,
-                        Int8Array,
-                        ScalarValue::Int8,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Int16 => update_accumulators!(
-                        array,
-                        Int16Array,
-                        ScalarValue::Int16,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Int32 => update_accumulators!(
-                        array,
-                        Int32Array,
-                        ScalarValue::Int32,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Int64 => update_accumulators!(
-                        array,
-                        Int64Array,
-                        ScalarValue::Int64,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt8 => update_accumulators!(
-                        array,
-                        UInt8Array,
-                        ScalarValue::UInt8,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt16 => update_accumulators!(
-                        array,
-                        UInt16Array,
-                        ScalarValue::UInt16,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt32 => update_accumulators!(
-                        array,
-                        UInt32Array,
-                        ScalarValue::UInt32,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt64 => update_accumulators!(
-                        array,
-                        UInt64Array,
-                        ScalarValue::UInt64,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Float32 => update_accumulators!(
-                        array,
-                        Float32Array,
-                        ScalarValue::Float32,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Float64 => update_accumulators!(
-                        array,
-                        Float64Array,
-                        ScalarValue::Float64,
-                        col,
-                        accumulators
-                    ),
-                    other => {
-                        return Err(ExecutionError::ExecutionError(format!(
-                            "Unsupported data type {:?} for result of aggregate expression",
-                            other
-                        )).into_arrow_external_error());
+                // for each new key on the map, add an accumulatorSet to the map
+                match map.get(&key) {
+                    None => {
+                        let accumulator_set: AccumulatorSet = self
+                            .aggr_expr
+                            .iter()
+                            .map(|expr| expr.create_accumulator())
+                            .collect();
+                        map.insert(key.clone(), Rc::new(accumulator_set));
                     }
+                    _ => (),
                 };
+
+                // iterate over each non-grouping column in the batch and update the accumulator
+                // for each row
+                for col in 0..aggr_input_values.len() {
+                    let value = get_scalar_value(&aggr_input_values[col], row)
+                        .map_err(ExecutionError::into_arrow_external_error)?;
+
+                    match map.get(&key) {

Review comment:
       I fixed both issues for now.

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -327,120 +278,47 @@ impl RecordBatchReader for GroupedHashAggregateIterator {
                 })
                 .collect::<ArrowResult<Vec<_>>>()?;
 
-            // create vector large enough to hold the grouping key
+            // create vector to hold the grouping key
             let mut key = Vec::with_capacity(group_values.len());
             for _ in 0..group_values.len() {
-                key.push(GroupByScalar::UInt32(0));
+                key.push(KeyScalar::UInt32(0));
             }
 
             // iterate over each row in the batch and create the accumulators for each grouping key
-            let mut accumulators: Vec<Rc<AccumulatorSet>> =

Review comment:
       Mind blowing. Still much to learn.
   
   I struggle to run the benchmarks on my computer. Is there any design reason to not run the benchmarks as part of the pipeline? Are they too unstable against changes in hardware?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org