You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/12 10:37:53 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #320: Implement hash partitioned aggregation

alamb commented on a change in pull request #320:
URL: https://github.com/apache/arrow-datafusion/pull/320#discussion_r630919098



##########
File path: datafusion/src/execution/context.rs
##########
@@ -1305,7 +1314,6 @@ mod tests {
     #[tokio::test]
     async fn aggregate_grouped() -> Result<()> {
         let results = execute("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?;
-        assert_eq!(results.len(), 1);

Review comment:
       this is a good change (to remove the redundant assertion)

##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -76,6 +76,8 @@ pub enum AggregateMode {
     Partial,
     /// Final aggregate that produces a single partition of output
     Final,
+    /// Final aggregate that works on pre-partitioned data

Review comment:
       ```suggestion
       /// Final aggregate that works on pre-partitioned data.
       ///
       /// This requires the invariant that all rows with a particular
       /// grouping key are in the same partitions, such as is the case
       /// with Hash repartitioning on the group keys. If a group key is
       /// duplicated, duplicate groups would be produced 
   ```

##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -201,7 +203,9 @@ impl ExecutionPlan for HashAggregateExec {
 
     fn required_child_distribution(&self) -> Distribution {
         match &self.mode {
-            AggregateMode::Partial => Distribution::UnspecifiedDistribution,
+            AggregateMode::Partial | AggregateMode::FinalPartitioned => {
+                Distribution::UnspecifiedDistribution

Review comment:
       It seems like the required  required distribution for `FinalPartitioned` is really "hash partitioned" or some other partitioning that guarantees the same group key doesn't appear in multiple partitions. Leaving it as `UnspecifiedDistribution` might cause subtle bugs in the future.

##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -184,19 +184,54 @@ impl DefaultPhysicalPlanner {
                 let final_group: Vec<Arc<dyn PhysicalExpr>> =
                     (0..groups.len()).map(|i| col(&groups[i].1)).collect();
 
-                // construct a second aggregation, keeping the final column name equal to the first aggregation
-                // and the expressions corresponding to the respective aggregate
-                Ok(Arc::new(HashAggregateExec::try_new(
-                    AggregateMode::Final,
-                    final_group
-                        .iter()
-                        .enumerate()
-                        .map(|(i, expr)| (expr.clone(), groups[i].1.clone()))
-                        .collect(),
-                    aggregates,
-                    initial_aggr,
-                    input_schema,
-                )?))
+                // TODO: dictionary type not yet supported in Hash Repartition
+                let contains_dict = groups

Review comment:
       thank you

##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -954,6 +1028,26 @@ pub fn create_hashes<'a>(
                     multi_col
                 );
             }
+            DataType::Date32 => {

Review comment:
       Did you have to add these match arms because tests failed without doing so?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org