You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/05/08 21:49:22 UTC

[GitHub] [arrow-datafusion] alamb opened a new issue, #6290: Datafusion not using all cores during query with repartiton

alamb opened a new issue, #6290:
URL: https://github.com/apache/arrow-datafusion/issues/6290

   ### Describe the bug
   
   DataFusion is not taking advantage of all cores on my machine despite the plan having a repartition
   
   I ran this at 2e9beeba01b85afb6d4f6557201e673008ea9edd on main
   
   ### To Reproduce
   
   ```
   -- step 1: save this as script.sql in arrow-datafusion checkout
   --
   -- step 2: generate data:
   --  (cd benchmarks && ./bench.sh data all)
   --
   -- step 3: run this script:
   -- datafusion-cli -f script.sql
   --
   -- Expected: all cores are kept busy processing the query
   -- Actual: only one core seems to be busy
   
   -- load the data from lineitem a few times
   create table lineitem as select * from 'benchmarks/data/lineitem';
   
   insert into lineitem select * from lineitem;
   insert into lineitem select * from lineitem;
   insert into lineitem select * from lineitem;
   insert into lineitem select * from lineitem;
   
   
   -- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   -- | plan_type     | plan                                                                                                                                                                                                                                                                                            |
   -- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   -- | logical_plan  | Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST                                                                                                                                                                                                                |
   -- |               |   Projection: lineitem.l_returnflag, lineitem.l_linestatus, SUM(lineitem.l_quantity) AS sum_qty, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, COUNT(UInt8(1)) AS count_order                                          |
   -- |               |     Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]]                                                                                  |
   -- |               |       Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_returnflag, lineitem.l_linestatus                                                                                                                                                              |
   -- |               |         Filter: lineitem.l_shipdate <= Date32("10471")                                                                                                                                                                                                                                          |
   -- |               |           TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_returnflag, l_linestatus, l_shipdate]                                                                                                                                                                      |
   -- | physical_plan | SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                          |
   -- |               |   SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]                                                                                                                                                                                                                  |
   -- |               |     ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, AVG(lineitem.l_quantity)@3 as avg_qty, AVG(lineitem.l_extendedprice)@4 as avg_price, AVG(lineitem.l_discount)@5 as avg_disc, COUNT(UInt8(1))@6 as count_order] |
   -- |               |       AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                           |
   -- |               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                             |
   -- |               |           RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16), input_partitions=16                                                                                                                                   |
   -- |               |             AggregateExec: mode=Partial, gby=[l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus], aggr=[SUM(lineitem.l_quantity), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))]                                              |
   -- |               |               ProjectionExec: expr=[l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_returnflag@3 as l_returnflag, l_linestatus@4 as l_linestatus]                                                                                               |
   -- |               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                     |
   -- |               |                   FilterExec: l_shipdate@5 <= 10471                                                                                                                                                                                                                                             |
   -- |               |                     RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                                                                                                                       |
   -- |               |                       MemoryExec: partitions=1, partition_sizes=[11728]                                                                                                                                                                                                                         |
   -- |               |                                                                                                                                                                                                                                                                                                 |
   -- +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   EXPLAIN select
       l_returnflag,
       l_linestatus,
       sum(l_quantity) as sum_qty,
       avg(l_quantity) as avg_qty,
       avg(l_extendedprice) as avg_price,
       avg(l_discount) as avg_disc,
       count(*) as count_order
   from
       lineitem
   where
           l_shipdate <= date '1998-09-02'
   group by
       l_returnflag,
       l_linestatus
   order by
       l_returnflag,
       l_linestatus;
   
   -- Run the actual query:
   --
   -- 0 rows in set. Query took 1.570 seconds.
   -- 0 rows in set. Query took 0.004 seconds.
   -- 0 rows in set. Query took 0.005 seconds.
   -- 0 rows in set. Query took 0.009 seconds.
   -- 0 rows in set. Query took 0.023 seconds.
   -- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
   -- | l_returnflag | l_linestatus | sum_qty       | avg_qty   | avg_price    | avg_disc | count_order |
   -- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
   -- | A            | F            | 603745712.00  | 25.522005 | 38273.129734 | 0.049985 | 23655888    |
   -- | N            | F            | 15862672.00   | 25.516471 | 38284.467760 | 0.050093 | 621664      |
   -- | N            | O            | 1191616640.00 | 25.502226 | 38249.117988 | 0.049996 | 46725984    |
   -- | R            | F            | 603516048.00  | 25.505793 | 38250.854626 | 0.050009 | 23661920    |
   -- +--------------+--------------+---------------+-----------+--------------+----------+-------------+
   -- 4 rows in set. Query took 9.980 seconds.
   
   select
       l_returnflag,
       l_linestatus,
       sum(l_quantity) as sum_qty,
       avg(l_quantity) as avg_qty,
       avg(l_extendedprice) as avg_price,
       avg(l_discount) as avg_disc,
       count(*) as count_order
   from
       lineitem
   where
           l_shipdate <= date '1998-09-02'
   group by
       l_returnflag,
       l_linestatus
   order by
       l_returnflag,
       l_linestatus;
   
   ```
   
   ### Expected behavior
   
   I expect during the query that all the cores on my machine to be in use, but instead only a single core is used. 
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #6290: Datafusion not using all cores on a TPCH like query during query with repartiton

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #6290:
URL: https://github.com/apache/arrow-datafusion/issues/6290#issuecomment-1539099682

   @crepererum  do you have any insights / thoughts on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #6290: Datafusion not using all cores on a TPCH like query during query with repartiton

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #6290:
URL: https://github.com/apache/arrow-datafusion/issues/6290#issuecomment-1540280463

   Update here is that I think the fact that the repartition is yielding on each batch effectively is serializing the processing -- I will have a PR up shortly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #6290: Datafusion not using all cores on a TPCH like query during query with repartiton

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed issue #6290: Datafusion not using all cores on a TPCH like query during query with repartiton
URL: https://github.com/apache/arrow-datafusion/issues/6290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on issue #6290: Datafusion not using all cores on a TPCH like query during query with repartiton

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on issue #6290:
URL: https://github.com/apache/arrow-datafusion/issues/6290#issuecomment-1540362346

   https://github.com/apache/arrow-datafusion/pull/6310


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org