You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by weijie tong <to...@gmail.com> on 2017/01/09 08:46:02 UTC

how to write a better performance rule?

hi all:
  I have written an Drill rule to push count(distinct ) query down to Druid
, but it took more than 2~7 seconds to  go through the Volcano Physical
Planning.I want to know is there any tips to notice to write a better
performance rule ?
  I have implemented two rules:
 1st:

agg->agg->scan

agg->agg->project->scan


2st:

agg->project->scan

agg->scan



 The query sql is :

select count(distinct position_title) from testDruid.employee where
birth_date='1961-09-24' group by employee_id

the Initial phase output is:

LogicalProject(EXPR$0=[$1]): rowcount = 1.5, cumulative cost = {133.1875
rows, 232.5 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 9
  LogicalAggregate(group=[{0}], EXPR$0=[COUNT(DISTINCT $1)]): rowcount =
1.5, cumulative cost = {131.6875 rows, 231.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 8
    LogicalProject(employee_id=[$4], position_title=[$14]): rowcount =
15.0, cumulative cost = {130.0 rows, 231.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 7
      LogicalFilter(condition=[=($0, '1961-09-24')]): rowcount = 15.0,
cumulative cost = {115.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory},
id = 6
        EnumerableTableScan(table=[[testDruid, employee]]): rowcount =
100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 5

the physical volcano output is:

15:59:58.201 [278cbd08-f2c6-306d-e78e-b479d088cd5d:foreman] DEBUG
o.a.d.e.p.s.h.DefaultSqlHandler - VOLCANO:Physical Planning (7315ms):
ScreenPrel: rowcount = 2500.0, cumulative cost = {77750.0 rows, 970250.0
cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id = 115180
  UnionExchangePrel: rowcount = 2500.0, cumulative cost = {77500.0 rows,
970000.0 cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id =
115179
    ProjectPrel(EXPR$0=[$1]): rowcount = 2500.0, cumulative cost = {75000.0
rows, 950000.0 cpu, 0.0 io, 2.048E8 network, 440000.00000000006 memory}, id
= 115178
      HashAggPrel(group=[{0}], EXPR$0=[$SUM0($1)]): rowcount = 2500.0,
cumulative cost = {75000.0 rows, 950000.0 cpu, 0.0 io, 2.048E8 network,
440000.00000000006 memory}, id = 115177
        HashToRandomExchangePrel(dist0=[[$0]]): rowcount = 25000.0,
cumulative cost = {50000.0 rows, 450000.0 cpu, 0.0 io, 2.048E8 network, 0.0
memory}, id = 115176
          ProjectPrel(employee_id=[$0], EXPR$0=[$1]): rowcount = 25000.0,
cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 115175

ScanPrel(groupscan=[DruidGroupScan{groupScanSpec=DruidGroupScanSpec{dataSource='employee',
druidQueryConvertState=MODIFIED, filter=null, allFiltersConverted=true,
hasMultiValueFilters=false, groupbyColumns=[employee_id, position_title],
query=StreamingGroupByQuery{groupByQuery=GroupByQuery{dataSource='employee',
querySegmentSpec=LegacySegmentSpec{intervals=[1000-01-01T00:00:00.000+08:05:43/3000-01-01T00:00:00.000+08:00]},
limitSpec=NoopLimitSpec, dimFilter=null, granularity=AllGranularity,
dimensions=[DefaultDimensionSpec{dimension='employee_id',
outputName='employee_id'}],
aggregatorSpecs=[DistinctCountAggregatorFactory{name='EXPR$0',
fieldName='position_title'}], postAggregatorSpecs=[], havingSpec=null},
batchSize=50000}}, columns=[`employee_id`, `EXPR$0`],
segmentIntervals=[1961-09-24T00:00:00.000+08:00/1961-09-25T00:00:00.000+08:00]}]):
rowcount = 25000.0, cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io,
0.0 network, 0.0 memory}, id = 115081

Re: how to write a better performance rule?

Posted by weijie tong <to...@gmail.com>.
Thanks Julian for your reply, I would be glad to contribute to Calcite ,
for now ,I am confused about Drill or Calcite 's VolcanoPlanner. Through my
experience with Drill ,I find that  costed based query planning is
preferred but with bad performance. Some simple query sql took more time to
generate the physical plan which even took more time than the actual
execution time.

So I am wondering that why Drill let the volcano be the only choice to
generate the physical and logical plan in the code of DefaultSqlHandler and
is there any tuning aspect or innovation plan  to the VolcanoPlanner to cut
down the time.


On Tue, Jan 10, 2017 at 2:40 AM, Julian Hyde <jh...@apache.org> wrote:

> The problem is probably not the rule (individual rules tend to run very
> quickly) but the cost of a planner phase that has a lot of rules enabled
> and thus collectively they generate a lot of possibilities. Planning
> complexity is a dragon that is difficult to slay.
>
> One trick is to use the HepPlanner to apply a small set of rules, without
> reference to cost.
>
> Are you intending to contribute the rule to Calcite?
>
> Julian
>
>
>
> > On Jan 9, 2017, at 12:46 AM, weijie tong <to...@gmail.com>
> wrote:
> >
> > hi all:
> >  I have written an Drill rule to push count(distinct ) query down to
> Druid
> > , but it took more than 2~7 seconds to  go through the Volcano Physical
> > Planning.I want to know is there any tips to notice to write a better
> > performance rule ?
> >  I have implemented two rules:
> > 1st:
> >
> > agg->agg->scan
> >
> > agg->agg->project->scan
> >
> >
> > 2st:
> >
> > agg->project->scan
> >
> > agg->scan
> >
> >
> >
> > The query sql is :
> >
> > select count(distinct position_title) from testDruid.employee where
> > birth_date='1961-09-24' group by employee_id
> >
> > the Initial phase output is:
> >
> > LogicalProject(EXPR$0=[$1]): rowcount = 1.5, cumulative cost = {133.1875
> > rows, 232.5 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 9
> >  LogicalAggregate(group=[{0}], EXPR$0=[COUNT(DISTINCT $1)]): rowcount =
> > 1.5, cumulative cost = {131.6875 rows, 231.0 cpu, 0.0 io, 0.0 network,
> 0.0
> > memory}, id = 8
> >    LogicalProject(employee_id=[$4], position_title=[$14]): rowcount =
> > 15.0, cumulative cost = {130.0 rows, 231.0 cpu, 0.0 io, 0.0 network, 0.0
> > memory}, id = 7
> >      LogicalFilter(condition=[=($0, '1961-09-24')]): rowcount = 15.0,
> > cumulative cost = {115.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0
> memory},
> > id = 6
> >        EnumerableTableScan(table=[[testDruid, employee]]): rowcount =
> > 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0
> > memory}, id = 5
> >
> > the physical volcano output is:
> >
> > 15:59:58.201 [278cbd08-f2c6-306d-e78e-b479d088cd5d:foreman] DEBUG
> > o.a.d.e.p.s.h.DefaultSqlHandler - VOLCANO:Physical Planning (7315ms):
> > ScreenPrel: rowcount = 2500.0, cumulative cost = {77750.0 rows, 970250.0
> > cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id = 115180
> >  UnionExchangePrel: rowcount = 2500.0, cumulative cost = {77500.0 rows,
> > 970000.0 cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id =
> > 115179
> >    ProjectPrel(EXPR$0=[$1]): rowcount = 2500.0, cumulative cost =
> {75000.0
> > rows, 950000.0 cpu, 0.0 io, 2.048E8 network, 440000.00000000006 memory},
> id
> > = 115178
> >      HashAggPrel(group=[{0}], EXPR$0=[$SUM0($1)]): rowcount = 2500.0,
> > cumulative cost = {75000.0 rows, 950000.0 cpu, 0.0 io, 2.048E8 network,
> > 440000.00000000006 memory}, id = 115177
> >        HashToRandomExchangePrel(dist0=[[$0]]): rowcount = 25000.0,
> > cumulative cost = {50000.0 rows, 450000.0 cpu, 0.0 io, 2.048E8 network,
> 0.0
> > memory}, id = 115176
> >          ProjectPrel(employee_id=[$0], EXPR$0=[$1]): rowcount = 25000.0,
> > cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io, 0.0 network, 0.0
> > memory}, id = 115175
> >
> > ScanPrel(groupscan=[DruidGroupScan{groupScanSpec=
> DruidGroupScanSpec{dataSource='employee',
> > druidQueryConvertState=MODIFIED, filter=null, allFiltersConverted=true,
> > hasMultiValueFilters=false, groupbyColumns=[employee_id, position_title],
> > query=StreamingGroupByQuery{groupByQuery=GroupByQuery{
> dataSource='employee',
> > querySegmentSpec=LegacySegmentSpec{intervals=[
> 1000-01-01T00:00:00.000+08:05:43/3000-01-01T00:00:00.000+08:00]},
> > limitSpec=NoopLimitSpec, dimFilter=null, granularity=AllGranularity,
> > dimensions=[DefaultDimensionSpec{dimension='employee_id',
> > outputName='employee_id'}],
> > aggregatorSpecs=[DistinctCountAggregatorFactory{name='EXPR$0',
> > fieldName='position_title'}], postAggregatorSpecs=[], havingSpec=null},
> > batchSize=50000}}, columns=[`employee_id`, `EXPR$0`],
> > segmentIntervals=[1961-09-24T00:00:00.000+08:00/1961-09-
> 25T00:00:00.000+08:00]}]):
> > rowcount = 25000.0, cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io,
> > 0.0 network, 0.0 memory}, id = 115081
>
>

Re: how to write a better performance rule?

Posted by Julian Hyde <jh...@apache.org>.
The problem is probably not the rule (individual rules tend to run very quickly) but the cost of a planner phase that has a lot of rules enabled and thus collectively they generate a lot of possibilities. Planning complexity is a dragon that is difficult to slay.

One trick is to use the HepPlanner to apply a small set of rules, without reference to cost.

Are you intending to contribute the rule to Calcite? 

Julian

 

> On Jan 9, 2017, at 12:46 AM, weijie tong <to...@gmail.com> wrote:
> 
> hi all:
>  I have written an Drill rule to push count(distinct ) query down to Druid
> , but it took more than 2~7 seconds to  go through the Volcano Physical
> Planning.I want to know is there any tips to notice to write a better
> performance rule ?
>  I have implemented two rules:
> 1st:
> 
> agg->agg->scan
> 
> agg->agg->project->scan
> 
> 
> 2st:
> 
> agg->project->scan
> 
> agg->scan
> 
> 
> 
> The query sql is :
> 
> select count(distinct position_title) from testDruid.employee where
> birth_date='1961-09-24' group by employee_id
> 
> the Initial phase output is:
> 
> LogicalProject(EXPR$0=[$1]): rowcount = 1.5, cumulative cost = {133.1875
> rows, 232.5 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 9
>  LogicalAggregate(group=[{0}], EXPR$0=[COUNT(DISTINCT $1)]): rowcount =
> 1.5, cumulative cost = {131.6875 rows, 231.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}, id = 8
>    LogicalProject(employee_id=[$4], position_title=[$14]): rowcount =
> 15.0, cumulative cost = {130.0 rows, 231.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}, id = 7
>      LogicalFilter(condition=[=($0, '1961-09-24')]): rowcount = 15.0,
> cumulative cost = {115.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory},
> id = 6
>        EnumerableTableScan(table=[[testDruid, employee]]): rowcount =
> 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}, id = 5
> 
> the physical volcano output is:
> 
> 15:59:58.201 [278cbd08-f2c6-306d-e78e-b479d088cd5d:foreman] DEBUG
> o.a.d.e.p.s.h.DefaultSqlHandler - VOLCANO:Physical Planning (7315ms):
> ScreenPrel: rowcount = 2500.0, cumulative cost = {77750.0 rows, 970250.0
> cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id = 115180
>  UnionExchangePrel: rowcount = 2500.0, cumulative cost = {77500.0 rows,
> 970000.0 cpu, 0.0 io, 2.1504E8 network, 440000.00000000006 memory}, id =
> 115179
>    ProjectPrel(EXPR$0=[$1]): rowcount = 2500.0, cumulative cost = {75000.0
> rows, 950000.0 cpu, 0.0 io, 2.048E8 network, 440000.00000000006 memory}, id
> = 115178
>      HashAggPrel(group=[{0}], EXPR$0=[$SUM0($1)]): rowcount = 2500.0,
> cumulative cost = {75000.0 rows, 950000.0 cpu, 0.0 io, 2.048E8 network,
> 440000.00000000006 memory}, id = 115177
>        HashToRandomExchangePrel(dist0=[[$0]]): rowcount = 25000.0,
> cumulative cost = {50000.0 rows, 450000.0 cpu, 0.0 io, 2.048E8 network, 0.0
> memory}, id = 115176
>          ProjectPrel(employee_id=[$0], EXPR$0=[$1]): rowcount = 25000.0,
> cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io, 0.0 network, 0.0
> memory}, id = 115175
> 
> ScanPrel(groupscan=[DruidGroupScan{groupScanSpec=DruidGroupScanSpec{dataSource='employee',
> druidQueryConvertState=MODIFIED, filter=null, allFiltersConverted=true,
> hasMultiValueFilters=false, groupbyColumns=[employee_id, position_title],
> query=StreamingGroupByQuery{groupByQuery=GroupByQuery{dataSource='employee',
> querySegmentSpec=LegacySegmentSpec{intervals=[1000-01-01T00:00:00.000+08:05:43/3000-01-01T00:00:00.000+08:00]},
> limitSpec=NoopLimitSpec, dimFilter=null, granularity=AllGranularity,
> dimensions=[DefaultDimensionSpec{dimension='employee_id',
> outputName='employee_id'}],
> aggregatorSpecs=[DistinctCountAggregatorFactory{name='EXPR$0',
> fieldName='position_title'}], postAggregatorSpecs=[], havingSpec=null},
> batchSize=50000}}, columns=[`employee_id`, `EXPR$0`],
> segmentIntervals=[1961-09-24T00:00:00.000+08:00/1961-09-25T00:00:00.000+08:00]}]):
> rowcount = 25000.0, cumulative cost = {25000.0 rows, 50000.0 cpu, 0.0 io,
> 0.0 network, 0.0 memory}, id = 115081