You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/09 23:27:25 UTC

[GitHub] [arrow-datafusion] isidentical opened a new pull request, #4162: Derive filter statistic estimates from the predicate expression

isidentical opened a new pull request, #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #3845.
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   Being able to estimate a filter's cardinality and therefore populate the physical plan with more statistics to leverage cost based optimizations.
   
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   Physical filter operations can now estimate their end cardinality when we know the predicate's selectivity and the cardinality of the input. The selectivity analysis is pretty limited at the moment, but the good part is that, every addition to the selectivity analysis will be automatically used here so this should be all the code we need for estimating filter's cardinality during planning.
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   Yes.
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   More statistics.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#issuecomment-1312479358

   Thanks @isidentical  -- I plan to review this carefully later today or tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#issuecomment-1317709033

   Benchmark runs are scheduled for baseline = 74199d632d9d5625686b76a7f981cc8641a1f2a8 and contender = f0359a797d73c05d72d02a17dc398a924ced2742. f0359a797d73c05d72d02a17dc398a924ced2742 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/83561c3e02e846d48bea9801d1d6fc14...eb947d3c13954ed986dac3ac2aeb6982/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/bb734fb0f02f44eda8cae9bbb40ddbdb...6022899f434640e2b817459aea4c99ff/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/25a56775ee40408487c4db7dbdc31426...086eb58022bd45d9a74e9525532a69a6/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/10bef71c3c614d4dad3db1d477130db1...b28879cf8e4d454980b765465b5762a3/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] isidentical commented on pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
isidentical commented on PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#issuecomment-1317700921

   Didn't notice the reviews, thank you a lot @alamb for them! Re the problem on column level, I'll try to write back to you soon on my fork (is it OK to keep it there until we have a design, then I can create a PR directly against apache/arrow-datafusion)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] isidentical commented on a diff in pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
isidentical commented on code in PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#discussion_r1020611956


##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -380,4 +403,108 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_basic_expr() -> Result<()> {
+        // Table:
+        //      a: min=1, max=100
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100),
+                column_statistics: Some(vec![ColumnStatistics {
+                    min_value: Some(ScalarValue::Int32(Some(1))),
+                    max_value: Some(ScalarValue::Int32(Some(100))),
+                    ..Default::default()
+                }]),
+                ..Default::default()
+            },
+            schema.clone(),
+        ));
+
+        // a <= 25
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
+
+        // WHERE a <= 25
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input)?);
+
+        let statistics = filter.statistics();
+        assert_eq!(statistics.num_rows, Some(25));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[ignore]
+    // This test requires propagation of column boundaries from the comparison analysis
+    // to the analysis context. This is not yet implemented.
+    async fn test_filter_statistics_column_level_basic_expr() -> Result<()> {

Review Comment:
   @alamb while working on this, I've noticed the initial application of propagation of new column limits. Since we don't have an API to represent changes to the boundaries during an expression's analysis (like `a` becomes `[1, 25]` in the example below) we can't generate the `column_statistics` which is essentially rendering nested join optimizations unusable (and potentially any other analysis that needs column level stats).
   
   This doesn't mean it is completely ineffecttive as is, since we can at least find the cardinality of filter itself and do the local filter <-> table switch in the case below. But I think it might make sense to at least investigate potential ways to deal with this.
   
   ![image](https://user-images.githubusercontent.com/47358913/201444253-d55a7565-bf67-4583-88bf-df87a4b8b9f2.png)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#discussion_r1023174920


##########
datafusion/core/tests/statistics.rs:
##########
@@ -238,8 +238,9 @@ async fn sql_filter() -> Result<()> {
         .await
         .unwrap();
 
-    // with a filtering condition we loose all knowledge about the statistics
-    assert_eq!(Statistics::default(), physical_plan.statistics());
+    let stats = physical_plan.statistics();
+    assert!(!stats.is_exact);
+    assert_eq!(stats.num_rows, Some(1));

Review Comment:
   Nice



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -380,4 +403,108 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_basic_expr() -> Result<()> {
+        // Table:
+        //      a: min=1, max=100
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100),
+                column_statistics: Some(vec![ColumnStatistics {
+                    min_value: Some(ScalarValue::Int32(Some(1))),
+                    max_value: Some(ScalarValue::Int32(Some(100))),
+                    ..Default::default()
+                }]),
+                ..Default::default()
+            },
+            schema.clone(),
+        ));
+
+        // a <= 25
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
+
+        // WHERE a <= 25
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input)?);
+
+        let statistics = filter.statistics();
+        assert_eq!(statistics.num_rows, Some(25));

Review Comment:
   👨‍🍳 👌 
   
   Very nice



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -168,9 +168,27 @@ impl ExecutionPlan for FilterExec {
         Some(self.metrics.clone_inner())
     }
 
-    /// The output statistics of a filtering operation are unknown
+    /// The output statistics of a filtering operation can be estimated if the
+    /// predicate's selectivity value can be determined for the incoming data.
     fn statistics(&self) -> Statistics {
-        Statistics::default()
+        let input_stats = self.input.statistics();
+        let analysis_ctx =
+            AnalysisContext::from_statistics(self.input.schema().as_ref(), &input_stats);
+
+        let predicate_selectivity = self
+            .predicate
+            .boundaries(&analysis_ctx)
+            .and_then(|bounds| bounds.selectivity);
+
+        match predicate_selectivity {
+            Some(selectivity) => Statistics {
+                num_rows: input_stats
+                    .num_rows
+                    .map(|num_rows| (num_rows as f64 * selectivity).ceil() as usize),
+                ..Default::default()

Review Comment:
   I wonder if we should explicitly list out `is_exact: false` here? `Default::default()` gets the same result but maybe being explicit would be better 🤔 



##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -380,4 +403,108 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_basic_expr() -> Result<()> {
+        // Table:
+        //      a: min=1, max=100
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100),
+                column_statistics: Some(vec![ColumnStatistics {
+                    min_value: Some(ScalarValue::Int32(Some(1))),
+                    max_value: Some(ScalarValue::Int32(Some(100))),
+                    ..Default::default()
+                }]),
+                ..Default::default()
+            },
+            schema.clone(),
+        ));
+
+        // a <= 25
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
+
+        // WHERE a <= 25
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input)?);
+
+        let statistics = filter.statistics();
+        assert_eq!(statistics.num_rows, Some(25));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[ignore]
+    // This test requires propagation of column boundaries from the comparison analysis
+    // to the analysis context. This is not yet implemented.
+    async fn test_filter_statistics_column_level_basic_expr() -> Result<()> {

Review Comment:
   I don't understand what about this test requires column level analysis -- your figure has a join in it, but thietest just seems to be the same as `test_filter_statistics_basic_expr` above it. I will look at https://github.com/isidentical/arrow-datafusion/pull/5 shortly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#issuecomment-1317698678

   🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] isidentical commented on a diff in pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
isidentical commented on code in PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#discussion_r1020612588


##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -380,4 +403,108 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_basic_expr() -> Result<()> {
+        // Table:
+        //      a: min=1, max=100
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100),
+                column_statistics: Some(vec![ColumnStatistics {
+                    min_value: Some(ScalarValue::Int32(Some(1))),
+                    max_value: Some(ScalarValue::Int32(Some(100))),
+                    ..Default::default()
+                }]),
+                ..Default::default()
+            },
+            schema.clone(),
+        ));
+
+        // a <= 25
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
+
+        // WHERE a <= 25
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input)?);
+
+        let statistics = filter.statistics();
+        assert_eq!(statistics.num_rows, Some(25));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[ignore]
+    // This test requires propagation of column boundaries from the comparison analysis
+    // to the analysis context. This is not yet implemented.
+    async fn test_filter_statistics_column_level_basic_expr() -> Result<()> {

Review Comment:
   I have a simple solution for this problem (https://github.com/isidentical/arrow-datafusion/pull/5) that essentially implements a much more narrow-scoped version of the `apply()` API from the previous iteration. It doesn't add any new methods to the physical expressions, but it still shares a mutable context reference (I kind of resonate this with other similiar APIs in datafusion like `expr_to_columns`) so not sure if the same reservations still apply. I'd be really interested in your feedback on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] isidentical commented on a diff in pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
isidentical commented on code in PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162#discussion_r1020612588


##########
datafusion/core/src/physical_plan/filter.rs:
##########
@@ -380,4 +403,108 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_filter_statistics_basic_expr() -> Result<()> {
+        // Table:
+        //      a: min=1, max=100
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
+        let input = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100),
+                column_statistics: Some(vec![ColumnStatistics {
+                    min_value: Some(ScalarValue::Int32(Some(1))),
+                    max_value: Some(ScalarValue::Int32(Some(100))),
+                    ..Default::default()
+                }]),
+                ..Default::default()
+            },
+            schema.clone(),
+        ));
+
+        // a <= 25
+        let predicate: Arc<dyn PhysicalExpr> =
+            binary(col("a", &schema)?, Operator::LtEq, lit(25i32), &schema)?;
+
+        // WHERE a <= 25
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input)?);
+
+        let statistics = filter.statistics();
+        assert_eq!(statistics.num_rows, Some(25));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    #[ignore]
+    // This test requires propagation of column boundaries from the comparison analysis
+    // to the analysis context. This is not yet implemented.
+    async fn test_filter_statistics_column_level_basic_expr() -> Result<()> {

Review Comment:
   I made a PoC https://github.com/isidentical/arrow-datafusion/pull/5 that essentially implements a much more simple version of the `apply()` API from the previous iteration without adding any new methods. I'd be really interested in your feedback on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #4162: Derive filter statistic estimates from the predicate expression

Posted by GitBox <gi...@apache.org>.
alamb merged PR #4162:
URL: https://github.com/apache/arrow-datafusion/pull/4162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org