You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/01 13:36:44 UTC

[GitHub] [arrow-datafusion] alamb opened a new pull request, #2674: Fix `AggregateStatistics` optimization so it doens't change output type

alamb opened a new pull request, #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674

   # Which issue does this PR close?
   
   Closes https://github.com/apache/arrow-datafusion/issues/2673
   
    # Rationale for this change
   See https://github.com/apache/arrow-datafusion/issues/2673 -- the optimizer pass is changing input types.
   
   # What changes are included in this PR?
   1. Fix bug
   2. New Regresion test
   
   
   # Are there any user-facing changes?
   less bugs
   
   # Does this PR break compatibility with Ballista?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2674: Fix `AggregateStatistics` optimization so it doens't change output type

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674#discussion_r886837655


##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -37,6 +38,9 @@ use crate::error::Result;
 #[derive(Default)]
 pub struct AggregateStatistics {}
 
+/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`]
+const COUNT_STAR_NAME: &str = "COUNT(UInt8(1))";

Review Comment:
   This constant was hard coded in a few places and I think this symbolic name helps understand what it is doing



##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -148,10 +152,10 @@ fn take_optimizable_table_count(
                 .as_any()
                 .downcast_ref::<expressions::Literal>()
             {
-                if lit_expr.value() == &ScalarValue::UInt8(Some(1)) {
+                if lit_expr.value() == &COUNT_STAR_EXPANSION {

Review Comment:
   There was an implicit coupling between the SQL planner and this file, which I have now made explicit with a named constant



##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -293,38 +297,80 @@ mod tests {
     /// Checks that the count optimization was applied and we still get the right result
     async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         let (col, count) = match nulls {
-            false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3),
-            true => (Field::new("COUNT(a)", DataType::UInt64, false), 2),
+            false => (Field::new(COUNT_STAR_NAME, DataType::Int64, false), 3),
+            true => (Field::new("COUNT(a)", DataType::Int64, false), 2),
         };
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
+        let task_ctx = session_ctx.task_ctx();
         let result = common::collect(optimized.execute(0, task_ctx)?).await?;
         assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
         assert_eq!(
             result[0]
                 .column(0)
                 .as_any()
-                .downcast_ref::<UInt64Array>()
+                .downcast_ref::<Int64Array>()
                 .unwrap()
                 .values(),
             &[count]
         );
+
+        // Validate that the optimized plan returns the exact same
+        // answer (both schema and data) as the original plan
+        let task_ctx = session_ctx.task_ctx();

Review Comment:
   This test would have caught this issue when it was introduced in https://github.com/apache/arrow-datafusion/pull/2636



##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -293,38 +297,80 @@ mod tests {
     /// Checks that the count optimization was applied and we still get the right result
     async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         let (col, count) = match nulls {
-            false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3),
-            true => (Field::new("COUNT(a)", DataType::UInt64, false), 2),
+            false => (Field::new(COUNT_STAR_NAME, DataType::Int64, false), 3),
+            true => (Field::new("COUNT(a)", DataType::Int64, false), 2),
         };
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
+        let task_ctx = session_ctx.task_ctx();
         let result = common::collect(optimized.execute(0, task_ctx)?).await?;
         assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
         assert_eq!(
             result[0]
                 .column(0)
                 .as_any()
-                .downcast_ref::<UInt64Array>()
+                .downcast_ref::<Int64Array>()
                 .unwrap()
                 .values(),
             &[count]
         );
+
+        // Validate that the optimized plan returns the exact same
+        // answer (both schema and data) as the original plan
+        let task_ctx = session_ctx.task_ctx();
+        let plan_result = common::collect(plan.execute(0, task_ctx)?).await?;
+        assert_eq!(normalize(result), normalize(plan_result));
         Ok(())
     }
 
+    /// Normalize record batches for comparison:
+    /// 1. Sets nullable to `true`
+    fn normalize(batches: Vec<RecordBatch>) -> Vec<RecordBatch> {
+        let schema = normalize_schema(&batches[0].schema());
+        batches
+            .into_iter()
+            .map(|batch| {
+                RecordBatch::try_new(schema.clone(), batch.columns().to_vec())
+                    .expect("Error creating record batch")
+            })
+            .collect()
+    }
+    fn normalize_schema(schema: &Schema) -> Arc<Schema> {
+        let nullable = true;
+        let normalized_fields = schema
+            .fields()
+            .iter()
+            .map(|f| {
+                Field::new(f.name(), f.data_type().clone(), nullable)
+                    .with_metadata(f.metadata().cloned())
+            })
+            .collect();
+        Arc::new(Schema::new_with_metadata(
+            normalized_fields,
+            schema.metadata().clone(),
+        ))
+    }
+
     fn count_expr(schema: Option<&Schema>, col: Option<&str>) -> Arc<dyn AggregateExpr> {
-        // Return appropriate expr depending if COUNT is for col or table
-        let expr = match schema {
-            None => expressions::lit(ScalarValue::UInt8(Some(1))),
-            Some(s) => expressions::col(col.unwrap(), s).unwrap(),
+        // Return appropriate expr depending if COUNT is for col or table (*)
+        let (expr, name) = match schema {
+            None => (
+                expressions::lit(COUNT_STAR_EXPANSION),
+                COUNT_STAR_NAME.to_string(),
+            ),
+            Some(s) => (
+                expressions::col(col.unwrap(), s).unwrap(),
+                format!("COUNT({})", col.unwrap()),
+            ),
         };
-        Arc::new(Count::new(expr, "my_count_alias", DataType::UInt64))
+
+        Arc::new(Count::new(expr, name, DataType::Int64))

Review Comment:
   Now that the schema is checked, we can't use some arbitrary column name, we need to use the actual name the plan would



##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -293,38 +297,80 @@ mod tests {
     /// Checks that the count optimization was applied and we still get the right result
     async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         let (col, count) = match nulls {
-            false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3),
-            true => (Field::new("COUNT(a)", DataType::UInt64, false), 2),
+            false => (Field::new(COUNT_STAR_NAME, DataType::Int64, false), 3),
+            true => (Field::new("COUNT(a)", DataType::Int64, false), 2),
         };
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
+        let task_ctx = session_ctx.task_ctx();
         let result = common::collect(optimized.execute(0, task_ctx)?).await?;
         assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
         assert_eq!(
             result[0]
                 .column(0)
                 .as_any()
-                .downcast_ref::<UInt64Array>()
+                .downcast_ref::<Int64Array>()
                 .unwrap()
                 .values(),
             &[count]
         );
+
+        // Validate that the optimized plan returns the exact same
+        // answer (both schema and data) as the original plan
+        let task_ctx = session_ctx.task_ctx();
+        let plan_result = common::collect(plan.execute(0, task_ctx)?).await?;
+        assert_eq!(normalize(result), normalize(plan_result));
         Ok(())
     }
 
+    /// Normalize record batches for comparison:
+    /// 1. Sets nullable to `true`
+    fn normalize(batches: Vec<RecordBatch>) -> Vec<RecordBatch> {

Review Comment:
   This is stupid but necessary to pass the tests



##########
datafusion/sql/src/planner.rs:
##########
@@ -2197,14 +2197,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         schema: &DFSchema,
     ) -> Result<(AggregateFunction, Vec<Expr>)> {
         let args = match fun {
+            // Special case rewrite COUNT(*) to COUNT(constant)
             AggregateFunction::Count => function
                 .args
                 .into_iter()
                 .map(|a| match a {
                     FunctionArg::Unnamed(FunctionArgExpr::Expr(SQLExpr::Value(
                         Value::Number(_, _),
-                    ))) => Ok(lit(1_u8)),
-                    FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(lit(1_u8)),
+                    ))) => Ok(Expr::Literal(COUNT_STAR_EXPANSION.clone())),

Review Comment:
   this is a readability improvement to name a constant to make what is happening more explicit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2674: Fix `AggregateStatistics` optimization so it doesn't change output type

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674#discussion_r888298254


##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -291,65 +296,132 @@ mod tests {
     }
 
     /// Checks that the count optimization was applied and we still get the right result
-    async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
+    async fn assert_count_optim_success(
+        plan: AggregateExec,
+        agg: TestAggregate,
+    ) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
-
-        let (col, count) = match nulls {
-            false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3),
-            true => (Field::new("COUNT(a)", DataType::UInt64, false), 2),
-        };
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
-        let result = common::collect(optimized.execute(0, task_ctx)?).await?;
-        assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
+
+        // run both the optimized and nonoptimized plan
+        let optimized_result =
+            common::collect(optimized.execute(0, session_ctx.task_ctx())?).await?;
+        let nonoptimized_result =
+            common::collect(plan.execute(0, session_ctx.task_ctx())?).await?;
+        assert_eq!(optimized_result.len(), nonoptimized_result.len());
+
+        //  and validate the results are the same and expected
+        assert_eq!(optimized_result.len(), 1);
+        check_batch(optimized_result.into_iter().next().unwrap(), &agg);
+        // check the non optimized one too to ensure types and names remain the same
+        assert_eq!(nonoptimized_result.len(), 1);
+        check_batch(nonoptimized_result.into_iter().next().unwrap(), &agg);
+
+        Ok(())
+    }
+
+    fn check_batch(batch: RecordBatch, agg: &TestAggregate) {
+        let schema = batch.schema();
+        let fields = schema.fields();
+        assert_eq!(fields.len(), 1);
+
+        let field = &fields[0];
+        assert_eq!(field.name(), agg.column_name());
+        assert_eq!(field.data_type(), &DataType::Int64);
+        // note that nullabiolity differs
+
         assert_eq!(
-            result[0]
+            batch
                 .column(0)
                 .as_any()
-                .downcast_ref::<UInt64Array>()
+                .downcast_ref::<Int64Array>()
                 .unwrap()
                 .values(),
-            &[count]
+            &[agg.expected_count()]
         );
-        Ok(())
     }
 
-    fn count_expr(schema: Option<&Schema>, col: Option<&str>) -> Arc<dyn AggregateExpr> {
-        // Return appropriate expr depending if COUNT is for col or table
-        let expr = match schema {
-            None => expressions::lit(ScalarValue::UInt8(Some(1))),
-            Some(s) => expressions::col(col.unwrap(), s).unwrap(),
-        };
-        Arc::new(Count::new(expr, "my_count_alias", DataType::UInt64))
+    /// Describe the type of aggregate being tested
+    enum TestAggregate {

Review Comment:
   This now parameterizes the difference between different tests into an explicit `enum` rather than implicit assumptions. I think it makes the tests easier to follow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2674: Fix `AggregateStatistics` optimization so it doesn't change output type

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674#discussion_r887861790


##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -293,38 +297,80 @@ mod tests {
     /// Checks that the count optimization was applied and we still get the right result
     async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         let (col, count) = match nulls {

Review Comment:
   I was very confused by what this parameter controls, should it not be something like `column: Option<&str>` instead?



##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -293,38 +297,80 @@ mod tests {
     /// Checks that the count optimization was applied and we still get the right result
     async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         let (col, count) = match nulls {
-            false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3),
-            true => (Field::new("COUNT(a)", DataType::UInt64, false), 2),
+            false => (Field::new(COUNT_STAR_NAME, DataType::Int64, false), 3),
+            true => (Field::new("COUNT(a)", DataType::Int64, false), 2),
         };
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
+        let task_ctx = session_ctx.task_ctx();
         let result = common::collect(optimized.execute(0, task_ctx)?).await?;
         assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
         assert_eq!(
             result[0]
                 .column(0)
                 .as_any()
-                .downcast_ref::<UInt64Array>()
+                .downcast_ref::<Int64Array>()
                 .unwrap()
                 .values(),
             &[count]
         );
+
+        // Validate that the optimized plan returns the exact same
+        // answer (both schema and data) as the original plan
+        let task_ctx = session_ctx.task_ctx();
+        let plan_result = common::collect(plan.execute(0, task_ctx)?).await?;
+        assert_eq!(normalize(result), normalize(plan_result));

Review Comment:
   A couple of lines above there is
   
   ```
   assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
   ```
   
   This would suggest to me that the result has a single column and a single field. Perhaps we could just do something like
   
   ```
   let expected_a_schema = ..;
   let expected_b_schema = ..;
   for (a, b) in result.iter().zip(plan_result) {
     assert_eq!(a.column(0), b.column(0);
     assert_eq!(a.schema(), expected_a_schema);
     assert_eq!(b.schema(), expected_b_schema);
   }
   ```
   
   I think the normalization logic is a little bit hard to follow...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #2674: Fix `AggregateStatistics` optimization so it doesn't change output type

Posted by GitBox <gi...@apache.org>.
alamb merged PR #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #2674: Fix `AggregateStatistics` optimization so it doesn't change output type

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674#issuecomment-1145267720

   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2674?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2674](https://codecov.io/gh/apache/arrow-datafusion/pull/2674?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (fde3cc4) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/f547262fc2c96dada24e08365ad3591997a9eb27?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f547262) will **increase** coverage by `0.01%`.
   > The diff coverage is `98.59%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #2674      +/-   ##
   ==========================================
   + Coverage   84.69%   84.70%   +0.01%     
   ==========================================
     Files         267      267              
     Lines       47004    47036      +32     
   ==========================================
   + Hits        39810    39843      +33     
   + Misses       7194     7193       -1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/2674?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/expr/src/utils.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2674/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9leHByL3NyYy91dGlscy5ycw==) | `91.86% <ø> (ø)` | |
   | [datafusion/sql/src/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2674/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcWwvc3JjL3BsYW5uZXIucnM=) | `81.56% <66.66%> (-0.04%)` | :arrow_down: |
   | [...ore/src/physical\_optimizer/aggregate\_statistics.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2674/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9waHlzaWNhbF9vcHRpbWl6ZXIvYWdncmVnYXRlX3N0YXRpc3RpY3MucnM=) | `100.00% <100.00%> (ø)` | |
   | [datafusion/core/tests/custom\_sources.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2674/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3Rlc3RzL2N1c3RvbV9zb3VyY2VzLnJz) | `83.90% <100.00%> (ø)` | |
   | [datafusion/common/src/scalar.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2674/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb21tb24vc3JjL3NjYWxhci5ycw==) | `74.94% <0.00%> (+0.11%)` | :arrow_up: |
   | [datafusion/core/src/physical\_plan/metrics/value.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2674/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9waHlzaWNhbF9wbGFuL21ldHJpY3MvdmFsdWUucnM=) | `87.43% <0.00%> (+0.50%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2674?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2674?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f547262...fde3cc4](https://codecov.io/gh/apache/arrow-datafusion/pull/2674?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2674: Fix `AggregateStatistics` optimization so it doens't change output type

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674#discussion_r886843286


##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -148,10 +152,10 @@ fn take_optimizable_table_count(
                 .as_any()
                 .downcast_ref::<expressions::Literal>()
             {
-                if lit_expr.value() == &ScalarValue::UInt8(Some(1)) {
+                if lit_expr.value() == &COUNT_STAR_EXPANSION {
                     return Some((
-                        ScalarValue::UInt64(Some(num_rows as u64)),
-                        "COUNT(UInt8(1))",
+                        ScalarValue::Int64(Some(num_rows as i64)),

Review Comment:
   The change from `UInt64` to `Int64` here and a few lines below is the actual bug fix / change of behavior -- the rest of this PR is testing / readability improvements



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2674: Fix `AggregateStatistics` optimization so it doesn't change output type

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2674:
URL: https://github.com/apache/arrow-datafusion/pull/2674#discussion_r888250058


##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -293,38 +297,80 @@ mod tests {
     /// Checks that the count optimization was applied and we still get the right result
     async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         let (col, count) = match nulls {

Review Comment:
   I believe it is really controlling `count(*)` vs `COUNT(col)` -- I consolidated the differences in eb14658de7 into a `TestAggregate` struct and I think it is much more understandable now 



##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -293,38 +297,80 @@ mod tests {
     /// Checks that the count optimization was applied and we still get the right result
     async fn assert_count_optim_success(plan: AggregateExec, nulls: bool) -> Result<()> {
         let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
         let conf = session_ctx.copied_config();
-        let optimized = AggregateStatistics::new().optimize(Arc::new(plan), &conf)?;
+        let plan = Arc::new(plan) as _;
+        let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
 
         let (col, count) = match nulls {
-            false => (Field::new("COUNT(UInt8(1))", DataType::UInt64, false), 3),
-            true => (Field::new("COUNT(a)", DataType::UInt64, false), 2),
+            false => (Field::new(COUNT_STAR_NAME, DataType::Int64, false), 3),
+            true => (Field::new("COUNT(a)", DataType::Int64, false), 2),
         };
 
         // A ProjectionExec is a sign that the count optimization was applied
         assert!(optimized.as_any().is::<ProjectionExec>());
+        let task_ctx = session_ctx.task_ctx();
         let result = common::collect(optimized.execute(0, task_ctx)?).await?;
         assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
         assert_eq!(
             result[0]
                 .column(0)
                 .as_any()
-                .downcast_ref::<UInt64Array>()
+                .downcast_ref::<Int64Array>()
                 .unwrap()
                 .values(),
             &[count]
         );
+
+        // Validate that the optimized plan returns the exact same
+        // answer (both schema and data) as the original plan
+        let task_ctx = session_ctx.task_ctx();
+        let plan_result = common::collect(plan.execute(0, task_ctx)?).await?;
+        assert_eq!(normalize(result), normalize(plan_result));

Review Comment:
   I removed the normalization in 171c89901ecdadca6c2eccb2973bc7ad0990c92f and I think it is much simpler to follow now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org