You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Yoon-Min Nam <zk...@gmail.com> on 2020/01/21 10:29:12 UTC

Too complex query plan by Calcite having correlated subqueries (TPC-H query)

I got the following TPC-H Q21 query and a generated plan generated by
Apache Calcite.

- Query:

SELECT s.s_name, count(*) as numwait
FROM supplier s, lineitem l1, orders o, nation n
WHERE s.s_suppkey = l1.l_suppkey
AND o.o_orderkey = l1.l_orderkey
AND o.o_orderstatus = 'F'
AND l1.l_receiptdate > l1.l_commitdate
   AND EXISTS (
      SELECT *
      FROM lineitem l2
      WHERE l2.l_orderkey = l1.l_orderkey
      AND l2.l_suppkey <> l1.l_suppkey
   )
   AND NOT EXISTS (
      SELECT *
      FROM lineitem l3
      WHERE l3.l_orderkey = l1.l_orderkey
      AND l3.l_suppkey <> l1.l_suppkey
      AND l3.l_receiptdate > l3.l_commitdate
   )
AND s.s_nationkey = n.n_nationkey
AND n.n_name = 'BRAZIL'
GROUP BY s.s_name
ORDER BY numwait desc, s.s_name
LIMIT 100;

- Generated query plan (before decorrelation):

LogicalSort(sort0=[$1], sort1=[$0], dir0=[DESC], dir1=[ASC], fetch=[100])
  LogicalAggregate(group=[{0}], numwait=[COUNT()])
    LogicalProject(s_name=[$1])
      LogicalFilter(condition=[AND(=($0, $11), =($27, $9), =($29,
'F'), >($21, $20), IS NOT NULL($44), NOT(IS NOT NULL($45)), =($3,
$38), =($39, 'BRAZIL'))])
        LogicalCorrelate(correlation=[$cor2], joinType=[left],
requiredColumns=[{9, 11}])
          LogicalCorrelate(correlation=[$cor0], joinType=[left],
requiredColumns=[{9, 11}])
            LogicalJoin(condition=[true], joinType=[inner])
              LogicalJoin(condition=[true], joinType=[inner])
                LogicalJoin(condition=[true], joinType=[inner])
                  EnumerableTableScan(table=[[tpch_sf10, supplier]])
                  EnumerableTableScan(table=[[tpch_sf10, lineitem]])
                EnumerableTableScan(table=[[tpch_sf10, orders]])
              EnumerableTableScan(table=[[tpch_sf10, nation]])
            LogicalAggregate(group=[{}], agg#0=[MIN($0)])
              LogicalProject($f0=[true])
                LogicalFilter(condition=[AND(=($0, $cor0.l_orderkey),
<>($2, $cor0.l_suppkey))])
                  EnumerableTableScan(table=[[tpch_sf10, lineitem]])
          LogicalAggregate(group=[{}], agg#0=[MIN($0)])
            LogicalProject($f0=[true])
              LogicalFilter(condition=[AND(=($0, $cor2.l_orderkey),
<>($2, $cor2.l_suppkey), >($12, $11))])
                EnumerableTableScan(table=[[tpch_sf10, lineitem]])

It seems that it detects correlated subqueries well.
However, applying decorrelation on that plan generates too complex
query plan, so it seems I need some extra optimization to make it
reasonable.
Especially, a series of inner joins are pushed down to each correlated
subquery, i.e., three joins among lineitem, orders, nation and
supplier, so processing cost is significantly increased.

- Decorrelated Query Plan:

LogicalSort(sort0=[$1], sort1=[$0], dir0=[DESC], dir1=[ASC], fetch=[100])
  LogicalAggregate(group=[{0}], numwait=[COUNT()])
    LogicalProject(s_name=[$0])
      LogicalProject(s_name=[$1])
        LogicalFilter(condition=[IS NULL($47)])
          LogicalJoin(condition=[AND(=($45, $9), =($46, $11))], joinType=[left])
            LogicalFilter(condition=[AND(=($0, $11), =($27, $9),
=($29, 'F'), >($21, $20), =($3, $38), =($39, 'BRAZIL'), IS NOT
NULL($44))])
              LogicalProject(...)
                LogicalJoin(condition=[AND(=($44, $9), =($45, $11))],
joinType=[left])
                  LogicalJoin(condition=[true], joinType=[inner])
                    LogicalJoin(condition=[true], joinType=[inner])
                      LogicalJoin(condition=[true], joinType=[inner])
                        EnumerableTableScan(table=[[tpch_sf10, supplier]])
                        EnumerableTableScan(table=[[tpch_sf10, lineitem]])
                      EnumerableTableScan(table=[[tpch_sf10, orders]])
                    EnumerableTableScan(table=[[tpch_sf10, nation]])
                  LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
                    LogicalProject(l_orderkey0=[$1], l_suppkey0=[$2], $f0=[$0])
                      LogicalProject($f0=[true], l_orderkey0=[$18],
l_suppkey0=[$19])
                        LogicalJoin(condition=[AND(=($0, $18), <>($2,
$19))], joinType=[inner])
                          EnumerableTableScan(table=[[tpch_sf10, lineitem]])
                          LogicalAggregate(group=[{0, 1}])
                            LogicalProject(l_orderkey=[$9], l_suppkey=[$11])
                              LogicalJoin(condition=[true], joinType=[inner])
                                LogicalJoin(condition=[true], joinType=[inner])
                                  LogicalJoin(condition=[true],
joinType=[inner])

EnumerableTableScan(table=[[tpch_sf10, supplier]])

EnumerableTableScan(table=[[tpch_sf10, lineitem]])

EnumerableTableScan(table=[[tpch_sf10, orders]])
                                EnumerableTableScan(table=[[tpch_sf10, nation]])
            LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
              LogicalProject(l_orderkey0=[$1], l_suppkey0=[$2], $f0=[$0])
                LogicalProject($f0=[true], l_orderkey0=[$18], l_suppkey0=[$19])
                  LogicalJoin(condition=[AND(=($0, $18), <>($2,
$19))], joinType=[inner])
                    LogicalFilter(condition=[>($12, $11)])
                      EnumerableTableScan(table=[[tpch_sf10, lineitem]])
                    LogicalAggregate(group=[{0, 1}])
                      LogicalProject(l_orderkey=[$9], l_suppkey=[$11])
                        LogicalJoin(condition=[AND(=($44, $9), =($45,
$11))], joinType=[left])
                          LogicalJoin(condition=[true], joinType=[inner])
                            LogicalJoin(condition=[true], joinType=[inner])
                              LogicalJoin(condition=[true], joinType=[inner])
                                EnumerableTableScan(table=[[tpch_sf10,
supplier]])
                                EnumerableTableScan(table=[[tpch_sf10,
lineitem]])
                              EnumerableTableScan(table=[[tpch_sf10, orders]])
                            EnumerableTableScan(table=[[tpch_sf10, nation]])
                          LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
                            LogicalProject(l_orderkey0=[$1],
l_suppkey0=[$2], $f0=[$0])
                              LogicalProject($f0=[true],
l_orderkey0=[$18], l_suppkey0=[$19])
                                LogicalJoin(condition=[AND(=($0, $18),
<>($2, $19))], joinType=[inner])

EnumerableTableScan(table=[[tpch_sf10, lineitem]])
                                  LogicalAggregate(group=[{0, 1}])
                                    LogicalProject(l_orderkey=[$9],
l_suppkey=[$11])
                                      LogicalJoin(condition=[true],
joinType=[inner])
                                        LogicalJoin(condition=[true],
joinType=[inner])

LogicalJoin(condition=[true], joinType=[inner])

EnumerableTableScan(table=[[tpch_sf10, supplier]])

EnumerableTableScan(table=[[tpch_sf10, lineitem]])

EnumerableTableScan(table=[[tpch_sf10, orders]])

EnumerableTableScan(table=[[tpch_sf10, nation]])

Note that this query plan is generated by using PlannerImpl class
existed in Calcite code-base, i.e.,
...
  assert validatedSqlNode != null;
  final RexBuilder rexBuilder = createRexBuilder();
  final RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);

  final SqlToRelConverter.Config config = SqlToRelConverter.configBuilder()
        .withConfig(sqlToRelConverterConfig)
        .withTrimUnusedFields(false)
        .withConvertTableAccess(false)
        .build();
  final SqlToRelConverter sqlToRelConverter =
        new SqlToRelConverter(this, validator,
                createCatalogReader(), cluster, convertletTable, config);
  root =
        sqlToRelConverter.convertQuery(validatedSqlNode, false, true);
  root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
...

Any suggestions?
Thank you!


(P.S. Apache Hive's query plan by Calcite)

- Before decorrelation and removing subquery:

HiveSortLimit(fetch=[100])
  HiveProject(s_name=[$0], numwait=[$1])
    HiveSortLimit(sort0=[$1], sort1=[$0], dir0=[DESC-nulls-last], dir1=[ASC])
      HiveProject(s_name=[$0], numwait=[$1], _o__col2=[$0],
(tok_functionstar count)=[$1])
        HiveAggregate(group=[{0}], agg#0=[count()])
          HiveProject($f0=[$1])
            HiveFilter(condition=[AND(=($0, $12), =($29, $10), =($31,
_UTF-16LE'F'), >($22, $21), EXISTS({
HiveProject(...)
  HiveFilter(condition=[AND(=($0, $cor0._col10), <>($2, $cor0._col12))])
    HiveTableScan(table=[[tpch_sf10, lineitem]], table:alias=[l2])
}), NOT(EXISTS({
HiveProject(...)
  HiveFilter(condition=[AND(=($0, $cor1._col10), <>($2, $cor1._col12),
>($12, $11))])
    HiveTableScan(table=[[tpch_sf10, lineitem]], table:alias=[l3])})),
=($3, $41), =($42, _UTF-16LE'BRAZIL'))])
              HiveJoin(condition=[true], joinType=[inner],
algorithm=[none], cost=[not available])
                HiveJoin(condition=[true], joinType=[inner],
algorithm=[none], cost=[not available])
                  HiveJoin(condition=[true], joinType=[inner],
algorithm=[none], cost=[not available])
                    HiveTableScan(table=[[tpch_sf10, supplier]],
table:alias=[s])
                    HiveTableScan(table=[[tpch_sf10, lineitem]],
table:alias=[l1])
                  HiveTableScan(table=[[tpch_sf10, orders]], table:alias=[o])
                HiveTableScan(table=[[tpch_sf10, nation]], table:alias=[n])


- After decorrelation and remove subqueries:

HiveSortLimit(fetch=[100])
  HiveProject(s_name=[$0], numwait=[$1])
    HiveSortLimit(sort0=[$1], sort1=[$0], dir0=[DESC-nulls-last], dir1=[ASC])
      HiveProject(s_name=[$0], numwait=[$1], _o__col2=[$0],
(tok_functionstar count)=[$1])
        HiveAggregate(group=[{0}], agg#0=[count()])
          HiveProject($f0=[$0])
            HiveProject($f0=[$1])
              HiveProject(...)
                HiveFilter(condition=[IS NULL($48)])
                  HiveJoin(condition=[AND(=($49, $10), <>($50, $12))],
joinType=[left], algorithm=[none], cost=[not available])
                    HiveSemiJoin(condition=[AND(=($49, $10), <>($50,
$12))], joinType=[semi])
                      HiveJoin(condition=[=($3, $41)],
joinType=[inner], algorithm=[none], cost=[not available])
                        HiveJoin(condition=[=($29, $10)],
joinType=[inner], algorithm=[none], cost=[not available])
                          HiveJoin(condition=[=($0, $12)],
joinType=[inner], algorithm=[none], cost=[not available])
                            HiveTableScan(table=[[tpch_sf10,
supplier]], table:alias=[s])
                            HiveFilter(condition=[>($12, $11)])
                              HiveTableScan(table=[[tpch_sf10,
lineitem]], table:alias=[l1])
                          HiveFilter(condition=[=($2, _UTF-16LE'F')])
                            HiveTableScan(table=[[tpch_sf10, orders]],
table:alias=[o])
                        HiveFilter(condition=[=($1, _UTF-16LE'BRAZIL')])
                          HiveTableScan(table=[[tpch_sf10, nation]],
table:alias=[n])
                      HiveProject(literalTrue=[true],
l_orderkey0=[$16], l_suppkey0=[$17])
                        HiveProject(...)
                          HiveFilter(condition=[AND(IS NOT NULL($0),
IS NOT NULL($2))])
                            HiveTableScan(table=[[tpch_sf10,
lineitem]], table:alias=[l2])
                    HiveAggregate(group=[{0, 1, 2}])
                      HiveProject(literalTrue=[$0], l_orderkey0=[$1],
l_suppkey0=[$2])
                        HiveProject(literalTrue=[true],
l_orderkey0=[$16], l_suppkey0=[$17])
                          HiveProject(...)
                            HiveFilter(condition=[AND(>($12, $11), IS
NOT NULL($0), IS NOT NULL($2))])
                              HiveTableScan(table=[[tpch_sf10,
lineitem]], table:alias=[l3])