You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/12 00:25:07 UTC

[GitHub] [arrow] andygrove opened a new pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec [DRAFT]

andygrove opened a new pull request #7936:
URL: https://github.com/apache/arrow/pull/7936


   This isn't quite there yet. I hope to wrap this up tomorrow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec [DRAFT]

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#issuecomment-672408880


   https://issues.apache.org/jira/browse/ARROW-9679


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#issuecomment-673072087


   LGTM! Thanks a lot for this, nice cleanup! I closed #7687 in favor of this one as the overall is too high to salvage.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #7936:
URL: https://github.com/apache/arrow/pull/7936


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469292982



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {
+        let builder: Box<dyn ArrayBuilder> = match output_schema.field(i).data_type() {
+            DataType::Int8 => Box::new(Int8Builder::new(map.len())),
+            DataType::Int16 => Box::new(Int16Builder::new(map.len())),
+            DataType::Int32 => Box::new(Int32Builder::new(map.len())),
+            DataType::Int64 => Box::new(Int64Builder::new(map.len())),
+            DataType::UInt8 => Box::new(UInt8Builder::new(map.len())),
+            DataType::UInt16 => Box::new(UInt16Builder::new(map.len())),
+            DataType::UInt32 => Box::new(UInt32Builder::new(map.len())),
+            DataType::UInt64 => Box::new(UInt64Builder::new(map.len())),
+            DataType::Float32 => Box::new(Float32Builder::new(map.len())),
+            DataType::Float64 => Box::new(Float64Builder::new(map.len())),
+            DataType::Utf8 => Box::new(StringBuilder::new(map.len())),
+            _ => {
+                return Err(ExecutionError::ExecutionError(
+                    "Unsupported group data type".to_string(),
+                ))
+            }
+        };
+        builders.push(builder);
+    }
+
+    // iterate over the map
+    for (k, v) in map.iter() {
+        // add group values to builders
+        for i in 0..num_group_expr {
+            let builder = builders[i].as_any_mut();
+            match &k[i] {
+                GroupByScalar::Int8(n) => append_group_value!(builder, Int8Builder, *n),
+                GroupByScalar::Int16(n) => append_group_value!(builder, Int16Builder, *n),
+                GroupByScalar::Int32(n) => append_group_value!(builder, Int32Builder, *n),
+                GroupByScalar::Int64(n) => append_group_value!(builder, Int64Builder, *n),
+                GroupByScalar::UInt8(n) => append_group_value!(builder, UInt8Builder, *n),
+                GroupByScalar::UInt16(n) => {
+                    append_group_value!(builder, UInt16Builder, *n)
+                }
+                GroupByScalar::UInt32(n) => {
+                    append_group_value!(builder, UInt32Builder, *n)
+                }
+                GroupByScalar::UInt64(n) => {
+                    append_group_value!(builder, UInt64Builder, *n)
+                }
+                GroupByScalar::Utf8(str) => {
+                    append_group_value!(builder, StringBuilder, str)
+                }
+            }
+        }
+
+        // add agggregate values to builders
+        for i in 0..num_aggr_expr {
+            let value = v[i].borrow().get_value()?;
+            let index = num_group_expr + i;
+            let builder = builders[index].as_any_mut();
+            match output_schema.field(i).data_type() {
+                DataType::Int8 => append_aggr_value!(builder, Int8Builder, value, Int8),
+                DataType::Int16 => {
+                    append_aggr_value!(builder, Int16Builder, value, Int16)
+                }
+                DataType::Int32 => {
+                    append_aggr_value!(builder, Int32Builder, value, Int32)
+                }
+                DataType::Int64 => {
+                    append_aggr_value!(builder, Int64Builder, value, Int64)
+                }
+                DataType::UInt8 => {
+                    append_aggr_value!(builder, UInt8Builder, value, UInt8)
+                }
+                DataType::UInt16 => {
+                    append_aggr_value!(builder, UInt16Builder, value, UInt16)
+                }
+                DataType::UInt32 => {
+                    append_aggr_value!(builder, UInt32Builder, value, UInt32)
+                }
+                DataType::UInt64 => {
+                    append_aggr_value!(builder, UInt64Builder, value, UInt64)
+                }
+                DataType::Float32 => {
+                    append_aggr_value!(builder, Float32Builder, value, Float32)
+                }
+                DataType::Float64 => {
+                    append_aggr_value!(builder, Float64Builder, value, Float64)
+                }
+                DataType::Utf8 => {

Review comment:
       The issue here was that `ScalarValue::Utf8` contains `String` and the builder wants `&str`. In all other cases the scalar and builder types are the same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469309756



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {
+        let builder: Box<dyn ArrayBuilder> = match output_schema.field(i).data_type() {
+            DataType::Int8 => Box::new(Int8Builder::new(map.len())),
+            DataType::Int16 => Box::new(Int16Builder::new(map.len())),
+            DataType::Int32 => Box::new(Int32Builder::new(map.len())),
+            DataType::Int64 => Box::new(Int64Builder::new(map.len())),
+            DataType::UInt8 => Box::new(UInt8Builder::new(map.len())),
+            DataType::UInt16 => Box::new(UInt16Builder::new(map.len())),
+            DataType::UInt32 => Box::new(UInt32Builder::new(map.len())),
+            DataType::UInt64 => Box::new(UInt64Builder::new(map.len())),
+            DataType::Float32 => Box::new(Float32Builder::new(map.len())),
+            DataType::Float64 => Box::new(Float64Builder::new(map.len())),
+            DataType::Utf8 => Box::new(StringBuilder::new(map.len())),
+            _ => {
+                return Err(ExecutionError::ExecutionError(
+                    "Unsupported group data type".to_string(),
+                ))
+            }
+        };
+        builders.push(builder);
+    }
+
+    // iterate over the map
+    for (k, v) in map.iter() {
+        // add group values to builders
+        for i in 0..num_group_expr {
+            let builder = builders[i].as_any_mut();
+            match &k[i] {
+                GroupByScalar::Int8(n) => append_group_value!(builder, Int8Builder, *n),
+                GroupByScalar::Int16(n) => append_group_value!(builder, Int16Builder, *n),
+                GroupByScalar::Int32(n) => append_group_value!(builder, Int32Builder, *n),
+                GroupByScalar::Int64(n) => append_group_value!(builder, Int64Builder, *n),
+                GroupByScalar::UInt8(n) => append_group_value!(builder, UInt8Builder, *n),
+                GroupByScalar::UInt16(n) => {
+                    append_group_value!(builder, UInt16Builder, *n)
+                }
+                GroupByScalar::UInt32(n) => {
+                    append_group_value!(builder, UInt32Builder, *n)
+                }
+                GroupByScalar::UInt64(n) => {
+                    append_group_value!(builder, UInt64Builder, *n)
+                }
+                GroupByScalar::Utf8(str) => {
+                    append_group_value!(builder, StringBuilder, str)
+                }
+            }
+        }
+
+        // add agggregate values to builders
+        for i in 0..num_aggr_expr {
+            let value = v[i].borrow().get_value()?;
+            let index = num_group_expr + i;
+            let builder = builders[index].as_any_mut();
+            match output_schema.field(i).data_type() {
+                DataType::Int8 => append_aggr_value!(builder, Int8Builder, value, Int8),
+                DataType::Int16 => {
+                    append_aggr_value!(builder, Int16Builder, value, Int16)
+                }
+                DataType::Int32 => {
+                    append_aggr_value!(builder, Int32Builder, value, Int32)
+                }
+                DataType::Int64 => {
+                    append_aggr_value!(builder, Int64Builder, value, Int64)
+                }
+                DataType::UInt8 => {
+                    append_aggr_value!(builder, UInt8Builder, value, UInt8)
+                }
+                DataType::UInt16 => {
+                    append_aggr_value!(builder, UInt16Builder, value, UInt16)
+                }
+                DataType::UInt32 => {
+                    append_aggr_value!(builder, UInt32Builder, value, UInt32)
+                }
+                DataType::UInt64 => {
+                    append_aggr_value!(builder, UInt64Builder, value, UInt64)
+                }
+                DataType::Float32 => {
+                    append_aggr_value!(builder, Float32Builder, value, Float32)
+                }
+                DataType::Float64 => {
+                    append_aggr_value!(builder, Float64Builder, value, Float64)
+                }
+                DataType::Utf8 => {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec [DRAFT]

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469206575



##########
File path: rust/datafusion/src/execution/physical_plan/expressions.rs
##########
@@ -413,20 +413,7 @@ impl Max {
 
 impl AggregateExpr for Max {
     fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {

Review comment:
       I agree that the type of `Max` and `Min` should be the same as the input (as those functions are never going to overflow the value type of their input, the way `Sum` or `Avg` could. 
   
   I actually think the change results in cleaner code too, which is a nice bonus

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{

Review comment:
       I think the names `$BUILDER_TY` and `$SCALAR_TY` are easier to understand than the previous `$TY:ident, $TY2:ty` 👍 

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {

Review comment:
       ```suggestion
       for i in 0..output_schema.fields().len() {
   ```
   
   This is just a style suggestion / defensive coding suggestion (as `output_schema.field(i)` is matched below).
   
   If you wanted to get all `rust`y / functional, you could also think about rewriting this as a map over fields. Something like this (untested):
   
   ```
   let builders = output_schema
     .fields()
     .iter()
     .map(|f| { match f.data_type ... // the match statement below})
     .collect::<Result<Vec<_>>?;
   ```
   

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {
+        let builder: Box<dyn ArrayBuilder> = match output_schema.field(i).data_type() {
+            DataType::Int8 => Box::new(Int8Builder::new(map.len())),
+            DataType::Int16 => Box::new(Int16Builder::new(map.len())),
+            DataType::Int32 => Box::new(Int32Builder::new(map.len())),
+            DataType::Int64 => Box::new(Int64Builder::new(map.len())),
+            DataType::UInt8 => Box::new(UInt8Builder::new(map.len())),
+            DataType::UInt16 => Box::new(UInt16Builder::new(map.len())),
+            DataType::UInt32 => Box::new(UInt32Builder::new(map.len())),
+            DataType::UInt64 => Box::new(UInt64Builder::new(map.len())),
+            DataType::Float32 => Box::new(Float32Builder::new(map.len())),
+            DataType::Float64 => Box::new(Float64Builder::new(map.len())),
+            DataType::Utf8 => Box::new(StringBuilder::new(map.len())),
+            _ => {
+                return Err(ExecutionError::ExecutionError(
+                    "Unsupported group data type".to_string(),
+                ))
+            }
+        };
+        builders.push(builder);
+    }
+
+    // iterate over the map
+    for (k, v) in map.iter() {
+        // add group values to builders
+        for i in 0..num_group_expr {
+            let builder = builders[i].as_any_mut();
+            match &k[i] {
+                GroupByScalar::Int8(n) => append_group_value!(builder, Int8Builder, *n),
+                GroupByScalar::Int16(n) => append_group_value!(builder, Int16Builder, *n),
+                GroupByScalar::Int32(n) => append_group_value!(builder, Int32Builder, *n),
+                GroupByScalar::Int64(n) => append_group_value!(builder, Int64Builder, *n),
+                GroupByScalar::UInt8(n) => append_group_value!(builder, UInt8Builder, *n),
+                GroupByScalar::UInt16(n) => {
+                    append_group_value!(builder, UInt16Builder, *n)
+                }
+                GroupByScalar::UInt32(n) => {
+                    append_group_value!(builder, UInt32Builder, *n)
+                }
+                GroupByScalar::UInt64(n) => {
+                    append_group_value!(builder, UInt64Builder, *n)
+                }
+                GroupByScalar::Utf8(str) => {
+                    append_group_value!(builder, StringBuilder, str)
+                }
+            }
+        }
+
+        // add agggregate values to builders
+        for i in 0..num_aggr_expr {
+            let value = v[i].borrow().get_value()?;
+            let index = num_group_expr + i;
+            let builder = builders[index].as_any_mut();
+            match output_schema.field(i).data_type() {
+                DataType::Int8 => append_aggr_value!(builder, Int8Builder, value, Int8),
+                DataType::Int16 => {
+                    append_aggr_value!(builder, Int16Builder, value, Int16)
+                }
+                DataType::Int32 => {
+                    append_aggr_value!(builder, Int32Builder, value, Int32)
+                }
+                DataType::Int64 => {
+                    append_aggr_value!(builder, Int64Builder, value, Int64)
+                }
+                DataType::UInt8 => {
+                    append_aggr_value!(builder, UInt8Builder, value, UInt8)
+                }
+                DataType::UInt16 => {
+                    append_aggr_value!(builder, UInt16Builder, value, UInt16)
+                }
+                DataType::UInt32 => {
+                    append_aggr_value!(builder, UInt32Builder, value, UInt32)
+                }
+                DataType::UInt64 => {
+                    append_aggr_value!(builder, UInt64Builder, value, UInt64)
+                }
+                DataType::Float32 => {
+                    append_aggr_value!(builder, Float32Builder, value, Float32)
+                }
+                DataType::Float64 => {
+                    append_aggr_value!(builder, Float64Builder, value, Float64)
+                }
+                DataType::Utf8 => {

Review comment:
       I am sure you have a good reason, but I didn't see quite why this match arm can't use `append_aggr_value!` as well

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),

Review comment:
       ```suggestion
               Some(_) => Err(ExecutionError::ExecutionError("unexpected type when creating aggregate value".to_string()),
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/merge.rs
##########
@@ -81,11 +81,13 @@ impl Partition for MergePartition {
         // combine the results from each thread
         let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
         for thread in threads {
-            let join = thread.join().expect("Failed to join thread");
-            let result = join?;
-            result
-                .iter()
-                .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
+            match thread.join() {
+                Ok(join) => {
+                    join?.iter()
+                        .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
+                }
+                Err(e) => return Err(ExecutionError::General(format!("{:?}", e)))

Review comment:
       This is another good change (don't panic if there is a thread error while merging) -- maybe worth a mention in the PR title (or maybe even its own PR).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469297203



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),

Review comment:
       Thanks. Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#issuecomment-672899284


   Thanks for the review @alamb. I've addressed the formatting, removed debug printlns, and tests are now passing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469294746



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {
+        let builder: Box<dyn ArrayBuilder> = match output_schema.field(i).data_type() {
+            DataType::Int8 => Box::new(Int8Builder::new(map.len())),
+            DataType::Int16 => Box::new(Int16Builder::new(map.len())),
+            DataType::Int32 => Box::new(Int32Builder::new(map.len())),
+            DataType::Int64 => Box::new(Int64Builder::new(map.len())),
+            DataType::UInt8 => Box::new(UInt8Builder::new(map.len())),
+            DataType::UInt16 => Box::new(UInt16Builder::new(map.len())),
+            DataType::UInt32 => Box::new(UInt32Builder::new(map.len())),
+            DataType::UInt64 => Box::new(UInt64Builder::new(map.len())),
+            DataType::Float32 => Box::new(Float32Builder::new(map.len())),
+            DataType::Float64 => Box::new(Float64Builder::new(map.len())),
+            DataType::Utf8 => Box::new(StringBuilder::new(map.len())),
+            _ => {
+                return Err(ExecutionError::ExecutionError(
+                    "Unsupported group data type".to_string(),
+                ))
+            }
+        };
+        builders.push(builder);
+    }
+
+    // iterate over the map
+    for (k, v) in map.iter() {
+        // add group values to builders
+        for i in 0..num_group_expr {
+            let builder = builders[i].as_any_mut();
+            match &k[i] {
+                GroupByScalar::Int8(n) => append_group_value!(builder, Int8Builder, *n),
+                GroupByScalar::Int16(n) => append_group_value!(builder, Int16Builder, *n),
+                GroupByScalar::Int32(n) => append_group_value!(builder, Int32Builder, *n),
+                GroupByScalar::Int64(n) => append_group_value!(builder, Int64Builder, *n),
+                GroupByScalar::UInt8(n) => append_group_value!(builder, UInt8Builder, *n),
+                GroupByScalar::UInt16(n) => {
+                    append_group_value!(builder, UInt16Builder, *n)
+                }
+                GroupByScalar::UInt32(n) => {
+                    append_group_value!(builder, UInt32Builder, *n)
+                }
+                GroupByScalar::UInt64(n) => {
+                    append_group_value!(builder, UInt64Builder, *n)
+                }
+                GroupByScalar::Utf8(str) => {
+                    append_group_value!(builder, StringBuilder, str)
+                }
+            }
+        }
+
+        // add agggregate values to builders
+        for i in 0..num_aggr_expr {
+            let value = v[i].borrow().get_value()?;
+            let index = num_group_expr + i;
+            let builder = builders[index].as_any_mut();
+            match output_schema.field(i).data_type() {
+                DataType::Int8 => append_aggr_value!(builder, Int8Builder, value, Int8),
+                DataType::Int16 => {
+                    append_aggr_value!(builder, Int16Builder, value, Int16)
+                }
+                DataType::Int32 => {
+                    append_aggr_value!(builder, Int32Builder, value, Int32)
+                }
+                DataType::Int64 => {
+                    append_aggr_value!(builder, Int64Builder, value, Int64)
+                }
+                DataType::UInt8 => {
+                    append_aggr_value!(builder, UInt8Builder, value, UInt8)
+                }
+                DataType::UInt16 => {
+                    append_aggr_value!(builder, UInt16Builder, value, UInt16)
+                }
+                DataType::UInt32 => {
+                    append_aggr_value!(builder, UInt32Builder, value, UInt32)
+                }
+                DataType::UInt64 => {
+                    append_aggr_value!(builder, UInt64Builder, value, UInt64)
+                }
+                DataType::Float32 => {
+                    append_aggr_value!(builder, Float32Builder, value, Float32)
+                }
+                DataType::Float64 => {
+                    append_aggr_value!(builder, Float64Builder, value, Float64)
+                }
+                DataType::Utf8 => {

Review comment:
       that might be worth adding in a comment so future readers would not have to wonder. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#issuecomment-672899448


   @nevi-me @paddyhoran PTAL if you have time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec [DRAFT]

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#issuecomment-672393908


   @alamb @jorgecarleitao @houqp fyi, since you've all been contributing to DataFusion lately


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469332693



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {

Review comment:
       I took a quick look at doing this but I needed an explicit assignment still otherwise it complained about the match arms returning different types, so it ended up not being much cleaner really. However, I did change it to use `for data_type in &output_types` which is a little cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org