You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/12 12:33:33 UTC

[GitHub] [arrow] alamb commented on a change in pull request #7936: ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec [DRAFT]

alamb commented on a change in pull request #7936:
URL: https://github.com/apache/arrow/pull/7936#discussion_r469206575



##########
File path: rust/datafusion/src/execution/physical_plan/expressions.rs
##########
@@ -413,20 +413,7 @@ impl Max {
 
 impl AggregateExpr for Max {
     fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        match self.expr.data_type(input_schema)? {

Review comment:
       I agree that the type of `Max` and `Min` should be the same as the input (as those functions are never going to overflow the value type of their input, the way `Sum` or `Avg` could. 
   
   I actually think the change results in cleaner code too, which is a nice bonus

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{

Review comment:
       I think the names `$BUILDER_TY` and `$SCALAR_TY` are easier to understand than the previous `$TY:ident, $TY2:ty` 👍 

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);

Review comment:
       ```suggestion
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {

Review comment:
       ```suggestion
       for i in 0..output_schema.fields().len() {
   ```
   
   This is just a style suggestion / defensive coding suggestion (as `output_schema.field(i)` is matched below).
   
   If you wanted to get all `rust`y / functional, you could also think about rewriting this as a map over fields. Something like this (untested):
   
   ```
   let builders = output_schema
     .fields()
     .iter()
     .map(|f| { match f.data_type ... // the match statement below})
     .collect::<Result<Vec<_>>?;
   ```
   

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),
+        }
+    }};
+}
+
+/// Create a RecordBatch representing the accumulated results in a map
+fn create_batch_from_map(
+    map: &FnvHashMap<Vec<GroupByScalar>, Rc<AccumulatorSet>>,
+    num_group_expr: usize,
+    num_aggr_expr: usize,
+    output_schema: &Schema,
+) -> Result<RecordBatch> {
+    let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![];
+    for i in 0..num_group_expr + num_aggr_expr {
+        let builder: Box<dyn ArrayBuilder> = match output_schema.field(i).data_type() {
+            DataType::Int8 => Box::new(Int8Builder::new(map.len())),
+            DataType::Int16 => Box::new(Int16Builder::new(map.len())),
+            DataType::Int32 => Box::new(Int32Builder::new(map.len())),
+            DataType::Int64 => Box::new(Int64Builder::new(map.len())),
+            DataType::UInt8 => Box::new(UInt8Builder::new(map.len())),
+            DataType::UInt16 => Box::new(UInt16Builder::new(map.len())),
+            DataType::UInt32 => Box::new(UInt32Builder::new(map.len())),
+            DataType::UInt64 => Box::new(UInt64Builder::new(map.len())),
+            DataType::Float32 => Box::new(Float32Builder::new(map.len())),
+            DataType::Float64 => Box::new(Float64Builder::new(map.len())),
+            DataType::Utf8 => Box::new(StringBuilder::new(map.len())),
+            _ => {
+                return Err(ExecutionError::ExecutionError(
+                    "Unsupported group data type".to_string(),
+                ))
+            }
+        };
+        builders.push(builder);
+    }
+
+    // iterate over the map
+    for (k, v) in map.iter() {
+        // add group values to builders
+        for i in 0..num_group_expr {
+            let builder = builders[i].as_any_mut();
+            match &k[i] {
+                GroupByScalar::Int8(n) => append_group_value!(builder, Int8Builder, *n),
+                GroupByScalar::Int16(n) => append_group_value!(builder, Int16Builder, *n),
+                GroupByScalar::Int32(n) => append_group_value!(builder, Int32Builder, *n),
+                GroupByScalar::Int64(n) => append_group_value!(builder, Int64Builder, *n),
+                GroupByScalar::UInt8(n) => append_group_value!(builder, UInt8Builder, *n),
+                GroupByScalar::UInt16(n) => {
+                    append_group_value!(builder, UInt16Builder, *n)
+                }
+                GroupByScalar::UInt32(n) => {
+                    append_group_value!(builder, UInt32Builder, *n)
+                }
+                GroupByScalar::UInt64(n) => {
+                    append_group_value!(builder, UInt64Builder, *n)
+                }
+                GroupByScalar::Utf8(str) => {
+                    append_group_value!(builder, StringBuilder, str)
+                }
+            }
+        }
+
+        // add agggregate values to builders
+        for i in 0..num_aggr_expr {
+            let value = v[i].borrow().get_value()?;
+            let index = num_group_expr + i;
+            let builder = builders[index].as_any_mut();
+            match output_schema.field(i).data_type() {
+                DataType::Int8 => append_aggr_value!(builder, Int8Builder, value, Int8),
+                DataType::Int16 => {
+                    append_aggr_value!(builder, Int16Builder, value, Int16)
+                }
+                DataType::Int32 => {
+                    append_aggr_value!(builder, Int32Builder, value, Int32)
+                }
+                DataType::Int64 => {
+                    append_aggr_value!(builder, Int64Builder, value, Int64)
+                }
+                DataType::UInt8 => {
+                    append_aggr_value!(builder, UInt8Builder, value, UInt8)
+                }
+                DataType::UInt16 => {
+                    append_aggr_value!(builder, UInt16Builder, value, UInt16)
+                }
+                DataType::UInt32 => {
+                    append_aggr_value!(builder, UInt32Builder, value, UInt32)
+                }
+                DataType::UInt64 => {
+                    append_aggr_value!(builder, UInt64Builder, value, UInt64)
+                }
+                DataType::Float32 => {
+                    append_aggr_value!(builder, Float32Builder, value, Float32)
+                }
+                DataType::Float64 => {
+                    append_aggr_value!(builder, Float64Builder, value, Float64)
+                }
+                DataType::Utf8 => {

Review comment:
       I am sure you have a good reason, but I didn't see quite why this match arm can't use `append_aggr_value!` as well

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -677,6 +532,152 @@ impl RecordBatchReader for HashAggregateIterator {
     }
 }
 
+/// Append a grouping expression value to a builder
+macro_rules! append_group_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr) => {{
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast group value builder to expected type");
+        builder.append_value($VALUE)?;
+    }};
+}
+
+/// Append an aggregate expression value to a builder
+macro_rules! append_aggr_value {
+    ($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{
+        println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE);
+        let builder = $BUILDER
+            .downcast_mut::<$BUILDER_TY>()
+            .expect("failed to downcast aggregate value builder to expected type");
+        match $VALUE {
+            Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?,
+            None => builder.append_null()?,
+            Some(_) => panic!(),

Review comment:
       ```suggestion
               Some(_) => Err(ExecutionError::ExecutionError("unexpected type when creating aggregate value".to_string()),
   ```

##########
File path: rust/datafusion/src/execution/physical_plan/merge.rs
##########
@@ -81,11 +81,13 @@ impl Partition for MergePartition {
         // combine the results from each thread
         let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
         for thread in threads {
-            let join = thread.join().expect("Failed to join thread");
-            let result = join?;
-            result
-                .iter()
-                .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
+            match thread.join() {
+                Ok(join) => {
+                    join?.iter()
+                        .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
+                }
+                Err(e) => return Err(ExecutionError::General(format!("{:?}", e)))

Review comment:
       This is another good change (don't panic if there is a thread error while merging) -- maybe worth a mention in the PR title (or maybe even its own PR).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org