You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/13 14:23:07 UTC

[GitHub] [arrow-datafusion] andygrove opened a new issue #871: DataFusion should scan Parquet statistics once per query

andygrove opened a new issue #871:
URL: https://github.com/apache/arrow-datafusion/issues/871


   **Is your feature request related to a problem or challenge? Please describe what you are trying to do.**
   
   When running the benchmarks with DataFusion I noticed that we scan statistics for all tables early on (even tables not referenced in the query). This happens in `ExecutionContext::register_table`. We then scan statistics again later on for the tables that are actually used in the query.
   
   ```
   ../target/release/tpch benchmark datafusion   --path /mnt/bigdata/tpch-sf1000-parquet/   --format parquet   --iterations 1   --debug   --concurrency 24   --query 3
   Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 3, debug: true, iterations: 1, concurrency: 24, batch_size: 8192, path: "/mnt/bigdata/tpch-sf1000-parquet/", file_format: "parquet", mem_table: false, partitions: 8 }
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//part)
   Scanned 48 Parquet files for statistics in 0 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//supplier)
   Scanned 48 Parquet files for statistics in 0 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//partsupp)
   Scanned 48 Parquet files for statistics in 1 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
   Scanned 48 Parquet files for statistics in 0 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
   Scanned 48 Parquet files for statistics in 4 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
   Scanned 48 Parquet files for statistics in 30 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//nation)
   Scanned 1 Parquet files for statistics in 0 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//region)
   Scanned 1 Parquet files for statistics in 0 seconds
   === Logical plan ===
   Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
     Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
       Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
         Filter: #customer.c_mktsegment Eq Utf8("BUILDING") And #orders.o_orderdate Lt CAST(Utf8("1995-03-15") AS Date32) And #lineitem.l_shipdate Gt CAST(Utf8("1995-03-15") AS Date32)
           Join: #orders.o_orderkey = #lineitem.l_orderkey
             Join: #customer.c_custkey = #orders.o_custkey
               TableScan: customer projection=None
               TableScan: orders projection=None
             TableScan: lineitem projection=None
   
   === Optimized logical plan ===
   Sort: #revenue DESC NULLS FIRST, #orders.o_orderdate ASC NULLS FIRST
     Projection: #lineitem.l_orderkey, #SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount) AS revenue, #orders.o_orderdate, #orders.o_shippriority
       Aggregate: groupBy=[[#lineitem.l_orderkey, #orders.o_orderdate, #orders.o_shippriority]], aggr=[[SUM(#lineitem.l_extendedprice Multiply Int64(1) Minus #lineitem.l_discount)]]
         Join: #orders.o_orderkey = #lineitem.l_orderkey
           Join: #customer.c_custkey = #orders.o_custkey
             Filter: #customer.c_mktsegment Eq Utf8("BUILDING")
               TableScan: customer projection=Some([0, 6]), filters=[#customer.c_mktsegment Eq Utf8("BUILDING")]
             Filter: #orders.o_orderdate Lt Date32("9204")
               TableScan: orders projection=Some([0, 1, 4, 7]), filters=[#orders.o_orderdate Lt Date32("9204")]
           Filter: #lineitem.l_shipdate Gt Date32("9204")
             TableScan: lineitem projection=Some([0, 5, 6, 10]), filters=[#lineitem.l_shipdate Gt Date32("9204")]
   
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//customer)
   Scanned 48 Parquet files for statistics in 0 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//orders)
   Scanned 48 Parquet files for statistics in 4 seconds
   ParquetExec::try_from_path(/mnt/bigdata/tpch-sf1000-parquet//lineitem)
   Scanned 48 Parquet files for statistics in 30 seconds
   === Physical plan ===
   SortExec: [revenue@1 DESC,o_orderdate@2 ASC]
     CoalescePartitionsExec
       ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice Multiply Int64(1) Minus lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
         HashAggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
           CoalesceBatchesExec: target_batch_size=4096
             RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderdate", index: 1 }, Column { name: "o_shippriority", index: 2 }], 24)
               HashAggregateExec: mode=Partial, gby=[l_orderkey@6 as l_orderkey, o_orderdate@4 as o_orderdate, o_shippriority@5 as o_shippriority], aggr=[SUM(l_extendedprice Multiply Int64(1) Minus l_discount)]
                 CoalesceBatchesExec: target_batch_size=4096
                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })]
                     CoalesceBatchesExec: target_batch_size=4096
                       RepartitionExec: partitioning=Hash([Column { name: "o_orderkey", index: 2 }], 24)
                         CoalesceBatchesExec: target_batch_size=4096
                           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })]
                             CoalesceBatchesExec: target_batch_size=4096
                               RepartitionExec: partitioning=Hash([Column { name: "c_custkey", index: 0 }], 24)
                                 CoalesceBatchesExec: target_batch_size=4096
                                   FilterExec: c_mktsegment@1 = BUILDING
                                     ParquetExec: batch_size=8192, limit=None, partitions=[...]
                             CoalesceBatchesExec: target_batch_size=4096
                               RepartitionExec: partitioning=Hash([Column { name: "o_custkey", index: 1 }], 24)
                                 CoalesceBatchesExec: target_batch_size=4096
                                   FilterExec: o_orderdate@2 < 9204
                                     ParquetExec: batch_size=8192, limit=None, partitions=[...]
                     CoalesceBatchesExec: target_batch_size=4096
                       RepartitionExec: partitioning=Hash([Column { name: "l_orderkey", index: 0 }], 24)
                         CoalesceBatchesExec: target_batch_size=4096
                           FilterExec: l_shipdate@3 > 9204
                             ParquetExec: batch_size=8192, limit=None, partitions=[...]
   
   ```
   
   **Describe the solution you'd like**
   - We should only scan statistics for tables that are used in the query
   - We should only scan statistics once
   
   **Describe alternatives you've considered**
   N/A
   
   **Additional context**
   N/A
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org