You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Andy Grove (Jira)" <ji...@apache.org> on 2020/12/28 20:00:00 UTC

[jira] [Issue Comment Deleted] (ARROW-11030) [Rust] [DataFusion] TPC-H q12 does not finish with SF=100 Parquet input

     [ https://issues.apache.org/jira/browse/ARROW-11030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andy Grove updated ARROW-11030:
-------------------------------
    Comment: was deleted

(was: With batch size 4096 the query seems to never end. With batch size 32768 it completes in ~60 seconds which is around 2x longer that Spark takes locally.

 
{code:java}
RUST_LOG=debug cargo run --release --bin tpch -- benchmark --iterations 3 --path /mnt/tpch/parquet-sf100-partitioned/ --format parquet --query 12 --batch-size 32768 --concurrency 48 --debug 

Logical plan:
Sort: #l_shipmode ASC NULLS FIRST
  Projection: #l_shipmode, #SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, #SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count
    Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
      Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq CAST(Utf8("1994-01-01") AS Date32(Day)) And #l_receiptdate Lt CAST(Utf8("1995-01-01") AS Date32(Day))
        Join: l_orderkey = o_orderkey
          TableScan: lineitem projection=None
          TableScan: orders projection=None
[2020-12-28T19:37:31Z DEBUG datafusion::execution::context] Logical plan:
     Sort: #l_shipmode ASC NULLS FIRST
      Projection: #l_shipmode, #SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, #SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count
        Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
          Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq CAST(Utf8("1994-01-01") AS Date32(Day)) And #l_receiptdate Lt CAST(Utf8("1995-01-01") AS Date32(Day))
            Join: l_orderkey = o_orderkey
              TableScan: lineitem projection=None
              TableScan: orders projection=None
[2020-12-28T19:37:31Z DEBUG datafusion::execution::context] Optimized logical plan:
     Sort: #l_shipmode ASC NULLS FIRST
      Projection: #l_shipmode, #SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, #SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count
        Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
          Join: l_orderkey = o_orderkey
            Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq CAST(Utf8("1994-01-01") AS Date32(Day)) And #l_receiptdate Lt CAST(Utf8("1995-01-01") AS Date32(Day))
              TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
            TableScan: orders projection=Some([0, 5])
Optimized logical plan:
Sort: #l_shipmode ASC NULLS FIRST
  Projection: #l_shipmode, #SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, #SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count
    Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]]
      Join: l_orderkey = o_orderkey
        Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq CAST(Utf8("1994-01-01") AS Date32(Day)) And #l_receiptdate Lt CAST(Utf8("1995-01-01") AS Date32(Day))
          TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
        TableScan: orders projection=Some([0, 5])
+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL       | 623115          | 934713         |
| SHIP       | 622979          | 934534         |
+------------+-----------------+----------------+
Query 12 iteration 0 took 63780.1 ms{code})

> [Rust] [DataFusion] TPC-H q12 does not finish with SF=100 Parquet input
> -----------------------------------------------------------------------
>
>                 Key: ARROW-11030
>                 URL: https://issues.apache.org/jira/browse/ARROW-11030
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Andy Grove
>            Priority: Major
>             Fix For: 3.0.0
>
>
> TPC-H q12 does not finish with SF=100 Parquet input.
> Either I have something wrong with my local setup or there is a major performance regression. I will investigate more after the holidays.
> It is also possible that I am getting confused about which queries and scale factors were working well for me before. We really need to start tracking benchmark results somewhere.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)