You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/10/25 03:25:00 UTC

[jira] [Assigned] (SPARK-40793) Row-level Runtime Filtering cannot be enabled when externalTable has no stats

     [ https://issues.apache.org/jira/browse/SPARK-40793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-40793:
------------------------------------

    Assignee:     (was: Apache Spark)

> Row-level Runtime Filtering cannot be enabled when externalTable has no stats
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-40793
>                 URL: https://issues.apache.org/jira/browse/SPARK-40793
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.3.0, 3.4.0
>         Environment:  
> {panel:title=Use tpcds to generate test data}
> test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d /data/project/tpcds-kit/tpcds-kit/tools -s 500 -l hdfs://hadoop01:8020/user/apple/tpcds -f parquet
> {panel}
>  
> {panel:title=Use spark-sql-perf's TPCDSTables class to create external tables to perform tpcds q24a tests like the following}
>    import com.databricks.spark.sql.perf.tpcds.TPCDSTables;
>    ....................
>     String dsdgenDir = "/data/project/tpcds-kit/tpcds-kit/tools";
>     String scaleFactor = "500";
>     String format = "parquet";
>     String rootDir = "hdfs://hadoop01:8020/user/apple/tpcds";
>     String databaseName = "tpcds_500g";
>     String q24a = ...............
>     TPCDSTables tables = new TPCDSTables(sqlContext, dsdgenDir, scaleFactor, false, false);
>     tables.createExternalTables(rootDir, format, databaseName, true, true, "");
>     //tables.analyzeTables(databaseName, true, "");
>     Dataset<Row> queryResult = spark.sql(testSql);
>     queryResult.show();
> {panel}
> {panel:title=Spark Sql generates the following execution plan that did not use Row-level Runtime Filtering}
> == Physical Plan ==
> AdaptiveSparkPlan (121)
> +- == Final Plan ==
>    * SerializeFromObject (75)
>    +- MapPartitions (74)
>       +- DeserializeToObject (73)
>          +- * Sort (72)
>             +- AQEShuffleRead (71)
>                +- ShuffleQueryStage (70), Statistics(sizeInBytes=119.6 KiB, rowCount=1.52E+3)
>                   +- Exchange (69)
>                      +- * Filter (68)
>                         +- * HashAggregate (67)
>                            +- AQEShuffleRead (66)
>                               +- ShuffleQueryStage (65), Statistics(sizeInBytes=139.7 KiB, rowCount=1.62E+3)
>                                  +- Exchange (64)
>                                     +- * HashAggregate (63)
>                                        +- * HashAggregate (62)
>                                           +- AQEShuffleRead (61)
>                                              +- ShuffleQueryStage (60), Statistics(sizeInBytes=1425.0 KiB, rowCount=9.15E+3)
>                                                 +- Exchange (59)
>                                                    +- * HashAggregate (58)
>                                                       +- * Project (57)
>                                                          +- * SortMergeJoin Inner (56)
>                                                             :- * Sort (48)
>                                                             :  +- AQEShuffleRead (47)
>                                                             :     +- ShuffleQueryStage (46), Statistics(sizeInBytes=60.9 MiB, rowCount=3.54E+5)
>                                                             :        +- Exchange (45)
>                                                             :           +- * Project (44)
>                                                             :              +- * SortMergeJoin Inner (43)
>                                                             :                 :- * Sort (35)
>                                                             :                 :  +- AQEShuffleRead (34)
>                                                             :                 :     +- ShuffleQueryStage (33), Statistics(sizeInBytes=47.7 MiB, rowCount=3.67E+5)
>                                                             :                 :        +- Exchange (32)
>                                                             :                 :           +- * Project (31)
>                                                             :                 :              +- * BroadcastHashJoin Inner BuildRight (30)
>                                                             :                 :                 :- * Project (24)
>                                                             :                 :                 :  +- * BroadcastHashJoin Inner BuildRight (23)
>                                                             :                 :                 :     :- * Project (16)
>                                                             :                 :                 :     :  +- * SortMergeJoin Inner (15)
>                                                             :                 :                 :     :     :- * Sort (7)
>                                                             :                 :                 :     :     :  +- ShuffleQueryStage (6), Statistics(sizeInBytes=60.0 GiB, rowCount=1.34E+9)
>                                                             :                 :                 :     :     :     +- Exchange (5)
>                                                             :                 :                 :     :     :        +- * Project (4)
> {color:#de350b}                                                            :                 :                 :     :     :           +- * Filter (3){color}
>                                                             :                 :                 :     :     :              +- * ColumnarToRow (2)
>                                                             :                 :                 :     :     :                 +- Scan parquet tpcds_500.store_sales (1)
>                                                             :                 :                 :     :     +- * Sort (14)
>                                                             :                 :                 :     :        +- ShuffleQueryStage (13), Statistics(sizeInBytes=3.2 GiB, rowCount=1.44E+8)
>                                                             :                 :                 :     :           +- Exchange (12)
>                                                             :                 :                 :     :              +- * Project (11)
>                                                             :                 :                 :     :                 +- * Filter (10)
>                                                             :                 :                 :     :                    +- * ColumnarToRow (9)
>                                                             :                 :                 :     :                       +- Scan parquet tpcds_500.store_returns (8)
>                                                             :                 :                 :     +- BroadcastQueryStage (22), Statistics(sizeInBytes=1030.3 KiB, rowCount=100)
>                                                             :                 :                 :        +- BroadcastExchange (21)
>                                                             :                 :                 :           +- * Project (20)
>                                                             :                 :                 :              +- * Filter (19)
>                                                             :                 :                 :                 +- * ColumnarToRow (18)
>                                                             :                 :                 :                    +- Scan parquet tpcds_500.store (17)
>                                                             :                 :                 +- BroadcastQueryStage (29), Statistics(sizeInBytes=1280.0 KiB, rowCount=5.97E+3)
>                                                             :                 :                    +- BroadcastExchange (28)
>                                                             :                 :                       +- * Filter (27)
>                                                             :                 :                          +- * ColumnarToRow (26)
>                                                             :                 :                             +- Scan parquet tpcds_500.item (25)
>                                                             :                 +- * Sort (42)
>                                                             :                    +- AQEShuffleRead (41)
>                                                             :                       +- ShuffleQueryStage (40), Statistics(sizeInBytes=438.8 MiB, rowCount=6.76E+6)
>                                                             :                          +- Exchange (39)
>                                                             :                             +- * Filter (38)
>                                                             :                                +- * ColumnarToRow (37)
>                                                             :                                   +- Scan parquet tpcds_500.customer (36)
>                                                             +- * Sort (55)
>                                                                +- AQEShuffleRead (54)
>                                                                   +- ShuffleQueryStage (53), Statistics(sizeInBytes=203.8 MiB, rowCount=3.34E+6)
>                                                                      +- Exchange (52)
>                                                                         +- * Filter (51)
>                                                                            +- * ColumnarToRow (50)
>                                                                               +- Scan parquet tpcds_500.customer_address (49)
>  
> {color:#de350b}(3) Filter [codegen id : 1]{color}
> {color:#de350b}Input [6]: [ss_item_sk#62, ss_customer_sk#63, ss_store_sk#67, ss_ticket_number#69L, ss_net_paid#80, ss_sold_date_sk#83]{color}
> {color:#de350b}Condition : (((isnotnull(ss_ticket_number#69L) AND isnotnull(ss_item_sk#62)) AND isnotnull(ss_store_sk#67)) AND isnotnull(ss_customer_sk#63)){color}
> {panel}
>  
>  
>  
>  
>            Reporter: chenminghua
>            Priority: Major
>              Labels: patch
>
> When using external tables, Row-level Runtime Filtering cannot be enabled anyway without performing analysis on the table to generate statistics. In actual use, external tables often do not have statistics, but it is also hoped that the execution efficiency can be improved through Row-level Runtime Filtering. The reason why Row-level Runtime Filtering cannot be enabled is: 'InjectRuntimeFilter' calls the 'satisfyByteSizeRequirement' method to determine whether the application side plan's aggregated scan size meets the requirements, and because there is no statistical data, the application side plan's aggregated scan size is equal to 0, which cannot satisfy the requirement to enable Row -level Runtime Filtering requirements.
> In order to enable Row-level Runtime Filtering even when the external table has no statistics, add the RUNTIME_FILTER_{_}ENABLED_WHEN_NO_STATS parameter. When RUNTIME_FILTER{_}_ENABLED_WHEN_NO_STATS is configured to true and the external table has no statistics, the 'satisfyByteSizeRequirement' method returns true so that Row-level Runtime Filtering may be enabled .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org