You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "chenminghua (Jira)" <ji...@apache.org> on 2022/10/14 02:54:00 UTC

[jira] [Created] (SPARK-40793) Row-level Runtime Filtering cannot be enabled when externalTable has no stats

chenminghua created SPARK-40793:
-----------------------------------

             Summary: Row-level Runtime Filtering cannot be enabled when externalTable has no stats
                 Key: SPARK-40793
                 URL: https://issues.apache.org/jira/browse/SPARK-40793
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.3.0
         Environment:  
{panel:title=Use tpcds to generate test data}
test:runMain com.databricks.spark.sql.perf.tpcds.GenTPCDSData -d /data/project/tpcds-kit/tpcds-kit/tools -s 500 -l hdfs://hadoop01:8020/user/apple/tpcds -f parquet
{panel}
 
{panel:title=Use spark-sql-perf's TPCDSTables class to create external tables to perform tpcds q24a tests like the following}
   import com.databricks.spark.sql.perf.tpcds.TPCDSTables;

   ....................

    String dsdgenDir = "/data/project/tpcds-kit/tpcds-kit/tools";
    String scaleFactor = "500";
    String format = "parquet";
    String rootDir = "hdfs://hadoop01:8020/user/apple/tpcds";
    String databaseName = "tpcds_500g";
    String q24a = ...............
    TPCDSTables tables = new TPCDSTables(sqlContext, dsdgenDir, scaleFactor, false, false);
    tables.createExternalTables(rootDir, format, databaseName, true, true, "");
    //tables.analyzeTables(databaseName, true, "");
    Dataset<Row> queryResult = spark.sql(testSql);
    queryResult.show();
{panel}
{panel:title=Spark Sql generates the following execution plan that did not use Row-level Runtime Filtering}
== Physical Plan ==
AdaptiveSparkPlan (121)
+- == Final Plan ==
   * SerializeFromObject (75)
   +- MapPartitions (74)
      +- DeserializeToObject (73)
         +- * Sort (72)
            +- AQEShuffleRead (71)
               +- ShuffleQueryStage (70), Statistics(sizeInBytes=119.6 KiB, rowCount=1.52E+3)
                  +- Exchange (69)
                     +- * Filter (68)
                        +- * HashAggregate (67)
                           +- AQEShuffleRead (66)
                              +- ShuffleQueryStage (65), Statistics(sizeInBytes=139.7 KiB, rowCount=1.62E+3)
                                 +- Exchange (64)
                                    +- * HashAggregate (63)
                                       +- * HashAggregate (62)
                                          +- AQEShuffleRead (61)
                                             +- ShuffleQueryStage (60), Statistics(sizeInBytes=1425.0 KiB, rowCount=9.15E+3)
                                                +- Exchange (59)
                                                   +- * HashAggregate (58)
                                                      +- * Project (57)
                                                         +- * SortMergeJoin Inner (56)
                                                            :- * Sort (48)
                                                            :  +- AQEShuffleRead (47)
                                                            :     +- ShuffleQueryStage (46), Statistics(sizeInBytes=60.9 MiB, rowCount=3.54E+5)
                                                            :        +- Exchange (45)
                                                            :           +- * Project (44)
                                                            :              +- * SortMergeJoin Inner (43)
                                                            :                 :- * Sort (35)
                                                            :                 :  +- AQEShuffleRead (34)
                                                            :                 :     +- ShuffleQueryStage (33), Statistics(sizeInBytes=47.7 MiB, rowCount=3.67E+5)
                                                            :                 :        +- Exchange (32)
                                                            :                 :           +- * Project (31)
                                                            :                 :              +- * BroadcastHashJoin Inner BuildRight (30)
                                                            :                 :                 :- * Project (24)
                                                            :                 :                 :  +- * BroadcastHashJoin Inner BuildRight (23)
                                                            :                 :                 :     :- * Project (16)
                                                            :                 :                 :     :  +- * SortMergeJoin Inner (15)
                                                            :                 :                 :     :     :- * Sort (7)
                                                            :                 :                 :     :     :  +- ShuffleQueryStage (6), Statistics(sizeInBytes=60.0 GiB, rowCount=1.34E+9)
                                                            :                 :                 :     :     :     +- Exchange (5)
                                                            :                 :                 :     :     :        +- * Project (4)
{color:#de350b}                                                            :                 :                 :     :     :           +- * Filter (3){color}
                                                            :                 :                 :     :     :              +- * ColumnarToRow (2)
                                                            :                 :                 :     :     :                 +- Scan parquet tpcds_500.store_sales (1)
                                                            :                 :                 :     :     +- * Sort (14)
                                                            :                 :                 :     :        +- ShuffleQueryStage (13), Statistics(sizeInBytes=3.2 GiB, rowCount=1.44E+8)
                                                            :                 :                 :     :           +- Exchange (12)
                                                            :                 :                 :     :              +- * Project (11)
                                                            :                 :                 :     :                 +- * Filter (10)
                                                            :                 :                 :     :                    +- * ColumnarToRow (9)
                                                            :                 :                 :     :                       +- Scan parquet tpcds_500.store_returns (8)
                                                            :                 :                 :     +- BroadcastQueryStage (22), Statistics(sizeInBytes=1030.3 KiB, rowCount=100)
                                                            :                 :                 :        +- BroadcastExchange (21)
                                                            :                 :                 :           +- * Project (20)
                                                            :                 :                 :              +- * Filter (19)
                                                            :                 :                 :                 +- * ColumnarToRow (18)
                                                            :                 :                 :                    +- Scan parquet tpcds_500.store (17)
                                                            :                 :                 +- BroadcastQueryStage (29), Statistics(sizeInBytes=1280.0 KiB, rowCount=5.97E+3)
                                                            :                 :                    +- BroadcastExchange (28)
                                                            :                 :                       +- * Filter (27)
                                                            :                 :                          +- * ColumnarToRow (26)
                                                            :                 :                             +- Scan parquet tpcds_500.item (25)
                                                            :                 +- * Sort (42)
                                                            :                    +- AQEShuffleRead (41)
                                                            :                       +- ShuffleQueryStage (40), Statistics(sizeInBytes=438.8 MiB, rowCount=6.76E+6)
                                                            :                          +- Exchange (39)
                                                            :                             +- * Filter (38)
                                                            :                                +- * ColumnarToRow (37)
                                                            :                                   +- Scan parquet tpcds_500.customer (36)
                                                            +- * Sort (55)
                                                               +- AQEShuffleRead (54)
                                                                  +- ShuffleQueryStage (53), Statistics(sizeInBytes=203.8 MiB, rowCount=3.34E+6)
                                                                     +- Exchange (52)
                                                                        +- * Filter (51)
                                                                           +- * ColumnarToRow (50)
                                                                              +- Scan parquet tpcds_500.customer_address (49)

 

{color:#de350b}(3) Filter [codegen id : 1]{color}
{color:#de350b}Input [6]: [ss_item_sk#62, ss_customer_sk#63, ss_store_sk#67, ss_ticket_number#69L, ss_net_paid#80, ss_sold_date_sk#83]{color}
{color:#de350b}Condition : (((isnotnull(ss_ticket_number#69L) AND isnotnull(ss_item_sk#62)) AND isnotnull(ss_store_sk#67)) AND isnotnull(ss_customer_sk#63)){color}
{panel}
 

 

 

 
            Reporter: chenminghua


When using external tables, Row-level Runtime Filtering cannot be enabled anyway without performing analysis on the table to generate statistics. In actual use, external tables often do not have statistical data, but it is also hoped that the execution efficiency can be improved through Row-level Runtime Filtering. The reason why Row-level Runtime Filtering cannot be enabled is: 'InjectRuntimeFilter' calls the 'satisfyByteSizeRequirement' method to determine whether the application side plan's aggregated scan size meets the requirements, and because there is no statistical data, the application side plan's aggregated scan size is equal to 0, which cannot satisfy the requirement to enable Row -level Runtime Filtering requirements.
In order to enable Row-level Runtime Filtering even when the external table has no statistics, add the RUNTIME_FILTER__ENABLED_WHEN_NO_STATS parameter. When RUNTIME_FILTER__ENABLED_WHEN_NO_STATS is configured to true and the external table has no statistics, the 'satisfyByteSizeRequirement' method returns true so that Row-level Runtime Filtering may be enabled .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org