You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Yang Liu <wh...@gmail.com> on 2020/03/06 10:12:36 UTC

"lazy" optmization?

Hi,

I am wondering if Calcite will support "lazy optimization" (execution time
optimization / runtime optimization).

For example, we want to do an inner join between an Elasticsearch table and
a MySQL table, like this:

WITH logic_table_2 AS
  (SELECT _MAP['status'] AS "status",
          _MAP['user'] AS "user"
   FROM "es"."insight-by-sql-v3"
   LIMIT 12345)
SELECT *
FROM "insight_user"."user_tab" AS t1
JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
WHERE t2."status" = 'fail'
LIMIT 10

t2 is a ES table and t1 is a MySQL table, and it may generate a execution
plan like this:

EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5], is_super=[$6],
create_time=[$7], has_all_access=[$8], status=[$0], user=[$1])
  EnumerableLimit(fetch=[10])
    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
      ElasticsearchToEnumerableConverter
        ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
'user')])
          ElasticsearchFilter(condition=[=(ITEM($0, 'status'), 'fail')])
            ElasticsearchSort(fetch=[12345])
              ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
      JdbcToEnumerableConverter
        JdbcTableScan(table=[[insight_user, user_tab]])

since here ES query has a filter, in execution Calcite will do the ES query
first and get the build table, and then do JdbcTableScan and get the probe
table, and do the HashJoin finally.

But, since this is a INNER JOIN, there is an implicit filter on the later
JdbcTableScan:
``` t1.email in (select user from t2 where t2.status='fail') ```, if
applying this implicit filter, the dataset we will handle may become
extremely small (save memory) and running much faster since the full
JdbcTableScan is always time-wasting. But since Calcite do the optimization
in planner phase, this dynamic/lazy optimization seems missed ...

To summarize, serial execution with a "lazy optimization" may be faster and
use less memory than parallel execution with an optimized execution plan
since the former one can reduce dataset we handle.

Any ideas?

Re: "lazy" optmization?

Posted by Danny Chan <da...@apache.org>.
We already did this in Flink SQL where there multiple optimize programs,
each for specific purpose, we also have our own logical and physical
conventions.

Julian Hyde <jh...@apache.org>于2020年3月10日 周二上午4:30写道:

> It's difficult to do "split processing into phases" entirely within
> Calcite. Generally a DBMS would manage these phases, and call Calcite
> at each step.
>
> I'd love to have some working code in Calcite that does this, but we'd
> be stepping over the line from "framework" into "platform". It's
> difficult to get contributions for these things.
>
> A more realistic ask is this: If anyone has implemented multi-phase
> optimization with Calcite, please write a blog post or a conference
> talk and share what you learned. Include a pointer to your code if
> your project is open source.
>
> Julian
>
> On Fri, Mar 6, 2020 at 7:16 PM Yang Liu <wh...@gmail.com> wrote:
> >
> > Thanks all!
> >
> > @Julian is the “split processing into phases” you are referring to like
> > this?
> >
> > with t1 as (select * from es_table where xxx limit xxx);
> > select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> > from t1)
> >
> > which means the SQL writer need to adapt to this specific form of SQL for
> > better performance? And Calcite will cache the t1 right?
> >
> > Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to
> have
> > the specific rule: the query result of right table can be used as filters
> > for the left table?
> >
> > Thanks!
> >
> >
> > Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道:
> >
> > > Runtime optimization is always necessary, because you just don’t have
> the
> > > stats until you run the query. The best DB algorithms are adaptive, and
> > > therefore hard to write. The adaptations require a lot of tricky
> support
> > > from the runtime - e.g. propagating bloom filters against the flow of
> data.
> > >
> > > Calcite can still help a little.
> > >
> > > One runtime optimization is where you split processing into phases.
> Only
> > > optimize the first part of your query. Build temp tables, analyze
> them, and
> > > use those stats to optimize the second part of your query.
> > >
> > > Another technique is to gather stats when as you run the query today,
> so
> > > that when you run it tomorrow Calcite can do a better job.
> > >
> > > Julian
> > >
> > >
> > > > On Mar 6, 2020, at 5:52 AM, Danny Chan <da...@apache.org> wrote:
> > > >
> > > > Sorry to tell that Calcite runtime does not support this, the
> "dynamic
> > > > partition pruning" or "runtime filter" called in Impala, would build
> a
> > > > bloom filter for the join keys for the build side table and push it
> down
> > > to
> > > > the probe table source, thus, in some cases, it can reduce the data.
> > > >
> > > > Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> > > >
> > > >> discussed with one of our user groups, in Spark 3.0, this is called
> > > >> "dynamic
> > > >> partition pruning"
> > > >>
> > > >> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> I am wondering if Calcite will support "lazy optimization"
> (execution
> > > >> time
> > > >>> optimization / runtime optimization).
> > > >>>
> > > >>> For example, we want to do an inner join between an Elasticsearch
> table
> > > >>> and a MySQL table, like this:
> > > >>>
> > > >>> WITH logic_table_2 AS
> > > >>>  (SELECT _MAP['status'] AS "status",
> > > >>>          _MAP['user'] AS "user"
> > > >>>   FROM "es"."insight-by-sql-v3"
> > > >>>   LIMIT 12345)
> > > >>> SELECT *
> > > >>> FROM "insight_user"."user_tab" AS t1
> > > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > > >>> WHERE t2."status" = 'fail'
> > > >>> LIMIT 10
> > > >>>
> > > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > > execution
> > > >>> plan like this:
> > > >>>
> > > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > > >>> user=[$1])
> > > >>>  EnumerableLimit(fetch=[10])
> > > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > > >>>      ElasticsearchToEnumerableConverter
> > > >>>        ElasticsearchProject(status=[ITEM($0, 'status')],
> user=[ITEM($0,
> > > >>> 'user')])
> > > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'),
> 'fail')])
> > > >>>            ElasticsearchSort(fetch=[12345])
> > > >>>              ElasticsearchTableScan(table=[[es,
> insight-by-sql-v3]])
> > > >>>      JdbcToEnumerableConverter
> > > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > > >>>
> > > >>> since here ES query has a filter, in execution Calcite will do the
> ES
> > > >>> query first and get the build table, and then do JdbcTableScan and
> get
> > > >> the
> > > >>> probe table, and do the HashJoin finally.
> > > >>>
> > > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > > later
> > > >>> JdbcTableScan:
> > > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```,
> if
> > > >>> applying this implicit filter, the dataset we will handle may
> become
> > > >>> extremely small (save memory) and running much faster since the
> full
> > > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > > >> optimization
> > > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > > >>>
> > > >>> To summarize, serial execution with a "lazy optimization" may be
> faster
> > > >>> and use less memory than parallel execution with an optimized
> execution
> > > >>> plan since the former one can reduce dataset we handle.
> > > >>>
> > > >>> Any ideas?
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> > >
>

Re: "lazy" optmization?

Posted by Julian Hyde <jh...@apache.org>.
It's difficult to do "split processing into phases" entirely within
Calcite. Generally a DBMS would manage these phases, and call Calcite
at each step.

I'd love to have some working code in Calcite that does this, but we'd
be stepping over the line from "framework" into "platform". It's
difficult to get contributions for these things.

A more realistic ask is this: If anyone has implemented multi-phase
optimization with Calcite, please write a blog post or a conference
talk and share what you learned. Include a pointer to your code if
your project is open source.

Julian

On Fri, Mar 6, 2020 at 7:16 PM Yang Liu <wh...@gmail.com> wrote:
>
> Thanks all!
>
> @Julian is the “split processing into phases” you are referring to like
> this?
>
> with t1 as (select * from es_table where xxx limit xxx);
> select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> from t1)
>
> which means the SQL writer need to adapt to this specific form of SQL for
> better performance? And Calcite will cache the t1 right?
>
> Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to have
> the specific rule: the query result of right table can be used as filters
> for the left table?
>
> Thanks!
>
>
> Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道:
>
> > Runtime optimization is always necessary, because you just don’t have the
> > stats until you run the query. The best DB algorithms are adaptive, and
> > therefore hard to write. The adaptations require a lot of tricky support
> > from the runtime - e.g. propagating bloom filters against the flow of data.
> >
> > Calcite can still help a little.
> >
> > One runtime optimization is where you split processing into phases. Only
> > optimize the first part of your query. Build temp tables, analyze them, and
> > use those stats to optimize the second part of your query.
> >
> > Another technique is to gather stats when as you run the query today, so
> > that when you run it tomorrow Calcite can do a better job.
> >
> > Julian
> >
> >
> > > On Mar 6, 2020, at 5:52 AM, Danny Chan <da...@apache.org> wrote:
> > >
> > > Sorry to tell that Calcite runtime does not support this, the "dynamic
> > > partition pruning" or "runtime filter" called in Impala, would build a
> > > bloom filter for the join keys for the build side table and push it down
> > to
> > > the probe table source, thus, in some cases, it can reduce the data.
> > >
> > > Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> > >
> > >> discussed with one of our user groups, in Spark 3.0, this is called
> > >> "dynamic
> > >> partition pruning"
> > >>
> > >> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:
> > >>
> > >>> Hi,
> > >>>
> > >>> I am wondering if Calcite will support "lazy optimization" (execution
> > >> time
> > >>> optimization / runtime optimization).
> > >>>
> > >>> For example, we want to do an inner join between an Elasticsearch table
> > >>> and a MySQL table, like this:
> > >>>
> > >>> WITH logic_table_2 AS
> > >>>  (SELECT _MAP['status'] AS "status",
> > >>>          _MAP['user'] AS "user"
> > >>>   FROM "es"."insight-by-sql-v3"
> > >>>   LIMIT 12345)
> > >>> SELECT *
> > >>> FROM "insight_user"."user_tab" AS t1
> > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > >>> WHERE t2."status" = 'fail'
> > >>> LIMIT 10
> > >>>
> > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > execution
> > >>> plan like this:
> > >>>
> > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > >>> user=[$1])
> > >>>  EnumerableLimit(fetch=[10])
> > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > >>>      ElasticsearchToEnumerableConverter
> > >>>        ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
> > >>> 'user')])
> > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'), 'fail')])
> > >>>            ElasticsearchSort(fetch=[12345])
> > >>>              ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
> > >>>      JdbcToEnumerableConverter
> > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > >>>
> > >>> since here ES query has a filter, in execution Calcite will do the ES
> > >>> query first and get the build table, and then do JdbcTableScan and get
> > >> the
> > >>> probe table, and do the HashJoin finally.
> > >>>
> > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > later
> > >>> JdbcTableScan:
> > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```, if
> > >>> applying this implicit filter, the dataset we will handle may become
> > >>> extremely small (save memory) and running much faster since the full
> > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > >> optimization
> > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > >>>
> > >>> To summarize, serial execution with a "lazy optimization" may be faster
> > >>> and use less memory than parallel execution with an optimized execution
> > >>> plan since the former one can reduce dataset we handle.
> > >>>
> > >>> Any ideas?
> > >>>
> > >>>
> > >>>
> > >>
> >
> >

Re: "lazy" optmization?

Posted by Scott Reynolds <sd...@gmail.com>.
Can this be achieved with EnumerableNestedBatchJoin? Would need to make the
JdbcFilterRule and it's relation handle Correlation Variables so they can
push down the filter's into the JDBC and Elastic Search RPCs. The
EnumerableNestedBatchJoinRule [1] pushes a filter relation on top of the
right hand scan. And if JdbcFilterRule translates Correlation Variables
into Conditions into the JDBC query string I think you can make it all
work.

I spent this past week doing something similar though not precisely this
thing. I wrote our own Rule and Relation that calls correlateBatchJoin [3]
directly

[1]
https://github.com/apache/calcite/blob/b80bb1cbceb11ed31b73e419916b5cc98610503e/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableBatchNestedLoopJoinRule.java#L124-L126
[2]
https://github.com/apache/calcite/blob/b80bb1cbceb11ed31b73e419916b5cc98610503e/core/src/main/java/org/apache/calcite/rex/RexCorrelVariable.java
[3]
https://github.com/apache/calcite/blob/b80bb1cbceb11ed31b73e419916b5cc98610503e/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L1492

On Sat, Mar 7, 2020 at 7:11 AM Stamatis Zampetakis <za...@gmail.com>
wrote:

> Hi Yang,
>
> Another term that is used for the optimization that you mention is
> "selective join pushdown" which essentially relies on Bloom/Cuckoo and
> other probabilistic filters. You can check [1] for more details about this
> kind of techniques.
>
> In the example that you outlined between JDBC and Elastic maybe you could
> achieve the same result with a slightly different approach by using a
> correlated join. If the scan + filter on Elastic does not bring back many
> results then you could use this results to probe the JDBC datasource. For
> more details check the discussion in [2], I think it refers to the same
> problem.
>
> Best,
> Stamatis
>
> [1] http://www.vldb.org/pvldb/vol12/p502-lang.pdf
> [2]
>
> https://lists.apache.org/thread.html/d9f95683e66009872a53e7e617295158b98746b550d2bf68230b3096%40%3Cdev.calcite.apache.org%3E
>
> On Sat, Mar 7, 2020 at 4:16 AM Yang Liu <wh...@gmail.com> wrote:
>
> > Thanks all!
> >
> > @Julian is the “split processing into phases” you are referring to like
> > this?
> >
> > with t1 as (select * from es_table where xxx limit xxx);
> > select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> > from t1)
> >
> > which means the SQL writer need to adapt to this specific form of SQL for
> > better performance? And Calcite will cache the t1 right?
> >
> > Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to
> have
> > the specific rule: the query result of right table can be used as filters
> > for the left table?
> >
> > Thanks!
> >
> >
> > Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道:
> >
> > > Runtime optimization is always necessary, because you just don’t have
> the
> > > stats until you run the query. The best DB algorithms are adaptive, and
> > > therefore hard to write. The adaptations require a lot of tricky
> support
> > > from the runtime - e.g. propagating bloom filters against the flow of
> > data.
> > >
> > > Calcite can still help a little.
> > >
> > > One runtime optimization is where you split processing into phases.
> Only
> > > optimize the first part of your query. Build temp tables, analyze them,
> > and
> > > use those stats to optimize the second part of your query.
> > >
> > > Another technique is to gather stats when as you run the query today,
> so
> > > that when you run it tomorrow Calcite can do a better job.
> > >
> > > Julian
> > >
> > >
> > > > On Mar 6, 2020, at 5:52 AM, Danny Chan <da...@apache.org> wrote:
> > > >
> > > > Sorry to tell that Calcite runtime does not support this, the
> "dynamic
> > > > partition pruning" or "runtime filter" called in Impala, would build
> a
> > > > bloom filter for the join keys for the build side table and push it
> > down
> > > to
> > > > the probe table source, thus, in some cases, it can reduce the data.
> > > >
> > > > Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> > > >
> > > >> discussed with one of our user groups, in Spark 3.0, this is called
> > > >> "dynamic
> > > >> partition pruning"
> > > >>
> > > >> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> I am wondering if Calcite will support "lazy optimization"
> (execution
> > > >> time
> > > >>> optimization / runtime optimization).
> > > >>>
> > > >>> For example, we want to do an inner join between an Elasticsearch
> > table
> > > >>> and a MySQL table, like this:
> > > >>>
> > > >>> WITH logic_table_2 AS
> > > >>>  (SELECT _MAP['status'] AS "status",
> > > >>>          _MAP['user'] AS "user"
> > > >>>   FROM "es"."insight-by-sql-v3"
> > > >>>   LIMIT 12345)
> > > >>> SELECT *
> > > >>> FROM "insight_user"."user_tab" AS t1
> > > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > > >>> WHERE t2."status" = 'fail'
> > > >>> LIMIT 10
> > > >>>
> > > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > > execution
> > > >>> plan like this:
> > > >>>
> > > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > > >>> user=[$1])
> > > >>>  EnumerableLimit(fetch=[10])
> > > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > > >>>      ElasticsearchToEnumerableConverter
> > > >>>        ElasticsearchProject(status=[ITEM($0, 'status')],
> > user=[ITEM($0,
> > > >>> 'user')])
> > > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'),
> > 'fail')])
> > > >>>            ElasticsearchSort(fetch=[12345])
> > > >>>              ElasticsearchTableScan(table=[[es,
> insight-by-sql-v3]])
> > > >>>      JdbcToEnumerableConverter
> > > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > > >>>
> > > >>> since here ES query has a filter, in execution Calcite will do the
> ES
> > > >>> query first and get the build table, and then do JdbcTableScan and
> > get
> > > >> the
> > > >>> probe table, and do the HashJoin finally.
> > > >>>
> > > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > > later
> > > >>> JdbcTableScan:
> > > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```,
> if
> > > >>> applying this implicit filter, the dataset we will handle may
> become
> > > >>> extremely small (save memory) and running much faster since the
> full
> > > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > > >> optimization
> > > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > > >>>
> > > >>> To summarize, serial execution with a "lazy optimization" may be
> > faster
> > > >>> and use less memory than parallel execution with an optimized
> > execution
> > > >>> plan since the former one can reduce dataset we handle.
> > > >>>
> > > >>> Any ideas?
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Re: "lazy" optmization?

Posted by Stamatis Zampetakis <za...@gmail.com>.
Hi Yang,

Another term that is used for the optimization that you mention is
"selective join pushdown" which essentially relies on Bloom/Cuckoo and
other probabilistic filters. You can check [1] for more details about this
kind of techniques.

In the example that you outlined between JDBC and Elastic maybe you could
achieve the same result with a slightly different approach by using a
correlated join. If the scan + filter on Elastic does not bring back many
results then you could use this results to probe the JDBC datasource. For
more details check the discussion in [2], I think it refers to the same
problem.

Best,
Stamatis

[1] http://www.vldb.org/pvldb/vol12/p502-lang.pdf
[2]
https://lists.apache.org/thread.html/d9f95683e66009872a53e7e617295158b98746b550d2bf68230b3096%40%3Cdev.calcite.apache.org%3E

On Sat, Mar 7, 2020 at 4:16 AM Yang Liu <wh...@gmail.com> wrote:

> Thanks all!
>
> @Julian is the “split processing into phases” you are referring to like
> this?
>
> with t1 as (select * from es_table where xxx limit xxx);
> select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> from t1)
>
> which means the SQL writer need to adapt to this specific form of SQL for
> better performance? And Calcite will cache the t1 right?
>
> Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to have
> the specific rule: the query result of right table can be used as filters
> for the left table?
>
> Thanks!
>
>
> Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道:
>
> > Runtime optimization is always necessary, because you just don’t have the
> > stats until you run the query. The best DB algorithms are adaptive, and
> > therefore hard to write. The adaptations require a lot of tricky support
> > from the runtime - e.g. propagating bloom filters against the flow of
> data.
> >
> > Calcite can still help a little.
> >
> > One runtime optimization is where you split processing into phases. Only
> > optimize the first part of your query. Build temp tables, analyze them,
> and
> > use those stats to optimize the second part of your query.
> >
> > Another technique is to gather stats when as you run the query today, so
> > that when you run it tomorrow Calcite can do a better job.
> >
> > Julian
> >
> >
> > > On Mar 6, 2020, at 5:52 AM, Danny Chan <da...@apache.org> wrote:
> > >
> > > Sorry to tell that Calcite runtime does not support this, the "dynamic
> > > partition pruning" or "runtime filter" called in Impala, would build a
> > > bloom filter for the join keys for the build side table and push it
> down
> > to
> > > the probe table source, thus, in some cases, it can reduce the data.
> > >
> > > Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> > >
> > >> discussed with one of our user groups, in Spark 3.0, this is called
> > >> "dynamic
> > >> partition pruning"
> > >>
> > >> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:
> > >>
> > >>> Hi,
> > >>>
> > >>> I am wondering if Calcite will support "lazy optimization" (execution
> > >> time
> > >>> optimization / runtime optimization).
> > >>>
> > >>> For example, we want to do an inner join between an Elasticsearch
> table
> > >>> and a MySQL table, like this:
> > >>>
> > >>> WITH logic_table_2 AS
> > >>>  (SELECT _MAP['status'] AS "status",
> > >>>          _MAP['user'] AS "user"
> > >>>   FROM "es"."insight-by-sql-v3"
> > >>>   LIMIT 12345)
> > >>> SELECT *
> > >>> FROM "insight_user"."user_tab" AS t1
> > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > >>> WHERE t2."status" = 'fail'
> > >>> LIMIT 10
> > >>>
> > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > execution
> > >>> plan like this:
> > >>>
> > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > >>> user=[$1])
> > >>>  EnumerableLimit(fetch=[10])
> > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > >>>      ElasticsearchToEnumerableConverter
> > >>>        ElasticsearchProject(status=[ITEM($0, 'status')],
> user=[ITEM($0,
> > >>> 'user')])
> > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'),
> 'fail')])
> > >>>            ElasticsearchSort(fetch=[12345])
> > >>>              ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
> > >>>      JdbcToEnumerableConverter
> > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > >>>
> > >>> since here ES query has a filter, in execution Calcite will do the ES
> > >>> query first and get the build table, and then do JdbcTableScan and
> get
> > >> the
> > >>> probe table, and do the HashJoin finally.
> > >>>
> > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > later
> > >>> JdbcTableScan:
> > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```, if
> > >>> applying this implicit filter, the dataset we will handle may become
> > >>> extremely small (save memory) and running much faster since the full
> > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > >> optimization
> > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > >>>
> > >>> To summarize, serial execution with a "lazy optimization" may be
> faster
> > >>> and use less memory than parallel execution with an optimized
> execution
> > >>> plan since the former one can reduce dataset we handle.
> > >>>
> > >>> Any ideas?
> > >>>
> > >>>
> > >>>
> > >>
> >
> >
>

Re: "lazy" optmization?

Posted by Yang Liu <wh...@gmail.com>.
Thanks all!

@Julian is the “split processing into phases” you are referring to like
this?

with t1 as (select * from es_table where xxx limit xxx);
select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
from t1)

which means the SQL writer need to adapt to this specific form of SQL for
better performance? And Calcite will cache the t1 right?

Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to have
the specific rule: the query result of right table can be used as filters
for the left table?

Thanks!


Julian Hyde <jh...@apache.org> 于2020年3月7日周六 上午1:48写道:

> Runtime optimization is always necessary, because you just don’t have the
> stats until you run the query. The best DB algorithms are adaptive, and
> therefore hard to write. The adaptations require a lot of tricky support
> from the runtime - e.g. propagating bloom filters against the flow of data.
>
> Calcite can still help a little.
>
> One runtime optimization is where you split processing into phases. Only
> optimize the first part of your query. Build temp tables, analyze them, and
> use those stats to optimize the second part of your query.
>
> Another technique is to gather stats when as you run the query today, so
> that when you run it tomorrow Calcite can do a better job.
>
> Julian
>
>
> > On Mar 6, 2020, at 5:52 AM, Danny Chan <da...@apache.org> wrote:
> >
> > Sorry to tell that Calcite runtime does not support this, the "dynamic
> > partition pruning" or "runtime filter" called in Impala, would build a
> > bloom filter for the join keys for the build side table and push it down
> to
> > the probe table source, thus, in some cases, it can reduce the data.
> >
> > Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> >
> >> discussed with one of our user groups, in Spark 3.0, this is called
> >> "dynamic
> >> partition pruning"
> >>
> >> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:
> >>
> >>> Hi,
> >>>
> >>> I am wondering if Calcite will support "lazy optimization" (execution
> >> time
> >>> optimization / runtime optimization).
> >>>
> >>> For example, we want to do an inner join between an Elasticsearch table
> >>> and a MySQL table, like this:
> >>>
> >>> WITH logic_table_2 AS
> >>>  (SELECT _MAP['status'] AS "status",
> >>>          _MAP['user'] AS "user"
> >>>   FROM "es"."insight-by-sql-v3"
> >>>   LIMIT 12345)
> >>> SELECT *
> >>> FROM "insight_user"."user_tab" AS t1
> >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> >>> WHERE t2."status" = 'fail'
> >>> LIMIT 10
> >>>
> >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> execution
> >>> plan like this:
> >>>
> >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> >>> user=[$1])
> >>>  EnumerableLimit(fetch=[10])
> >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> >>>      ElasticsearchToEnumerableConverter
> >>>        ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
> >>> 'user')])
> >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'), 'fail')])
> >>>            ElasticsearchSort(fetch=[12345])
> >>>              ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
> >>>      JdbcToEnumerableConverter
> >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> >>>
> >>> since here ES query has a filter, in execution Calcite will do the ES
> >>> query first and get the build table, and then do JdbcTableScan and get
> >> the
> >>> probe table, and do the HashJoin finally.
> >>>
> >>> But, since this is a INNER JOIN, there is an implicit filter on the
> later
> >>> JdbcTableScan:
> >>> ``` t1.email in (select user from t2 where t2.status='fail') ```, if
> >>> applying this implicit filter, the dataset we will handle may become
> >>> extremely small (save memory) and running much faster since the full
> >>> JdbcTableScan is always time-wasting. But since Calcite do the
> >> optimization
> >>> in planner phase, this dynamic/lazy optimization seems missed ...
> >>>
> >>> To summarize, serial execution with a "lazy optimization" may be faster
> >>> and use less memory than parallel execution with an optimized execution
> >>> plan since the former one can reduce dataset we handle.
> >>>
> >>> Any ideas?
> >>>
> >>>
> >>>
> >>
>
>

Re: "lazy" optmization?

Posted by Julian Hyde <jh...@apache.org>.
Runtime optimization is always necessary, because you just don’t have the stats until you run the query. The best DB algorithms are adaptive, and therefore hard to write. The adaptations require a lot of tricky support from the runtime - e.g. propagating bloom filters against the flow of data.

Calcite can still help a little.

One runtime optimization is where you split processing into phases. Only optimize the first part of your query. Build temp tables, analyze them, and use those stats to optimize the second part of your query.

Another technique is to gather stats when as you run the query today, so that when you run it tomorrow Calcite can do a better job.

Julian


> On Mar 6, 2020, at 5:52 AM, Danny Chan <da...@apache.org> wrote:
> 
> Sorry to tell that Calcite runtime does not support this, the "dynamic
> partition pruning" or "runtime filter" called in Impala, would build a
> bloom filter for the join keys for the build side table and push it down to
> the probe table source, thus, in some cases, it can reduce the data.
> 
> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:54写道:
> 
>> discussed with one of our user groups, in Spark 3.0, this is called
>> "dynamic
>> partition pruning"
>> 
>> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:
>> 
>>> Hi,
>>> 
>>> I am wondering if Calcite will support "lazy optimization" (execution
>> time
>>> optimization / runtime optimization).
>>> 
>>> For example, we want to do an inner join between an Elasticsearch table
>>> and a MySQL table, like this:
>>> 
>>> WITH logic_table_2 AS
>>>  (SELECT _MAP['status'] AS "status",
>>>          _MAP['user'] AS "user"
>>>   FROM "es"."insight-by-sql-v3"
>>>   LIMIT 12345)
>>> SELECT *
>>> FROM "insight_user"."user_tab" AS t1
>>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
>>> WHERE t2."status" = 'fail'
>>> LIMIT 10
>>> 
>>> t2 is a ES table and t1 is a MySQL table, and it may generate a execution
>>> plan like this:
>>> 
>>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
>>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
>>> user=[$1])
>>>  EnumerableLimit(fetch=[10])
>>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
>>>      ElasticsearchToEnumerableConverter
>>>        ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
>>> 'user')])
>>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'), 'fail')])
>>>            ElasticsearchSort(fetch=[12345])
>>>              ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
>>>      JdbcToEnumerableConverter
>>>        JdbcTableScan(table=[[insight_user, user_tab]])
>>> 
>>> since here ES query has a filter, in execution Calcite will do the ES
>>> query first and get the build table, and then do JdbcTableScan and get
>> the
>>> probe table, and do the HashJoin finally.
>>> 
>>> But, since this is a INNER JOIN, there is an implicit filter on the later
>>> JdbcTableScan:
>>> ``` t1.email in (select user from t2 where t2.status='fail') ```, if
>>> applying this implicit filter, the dataset we will handle may become
>>> extremely small (save memory) and running much faster since the full
>>> JdbcTableScan is always time-wasting. But since Calcite do the
>> optimization
>>> in planner phase, this dynamic/lazy optimization seems missed ...
>>> 
>>> To summarize, serial execution with a "lazy optimization" may be faster
>>> and use less memory than parallel execution with an optimized execution
>>> plan since the former one can reduce dataset we handle.
>>> 
>>> Any ideas?
>>> 
>>> 
>>> 
>> 


Re: "lazy" optmization?

Posted by Danny Chan <da...@apache.org>.
Sorry to tell that Calcite runtime does not support this, the "dynamic
partition pruning" or "runtime filter" called in Impala, would build a
bloom filter for the join keys for the build side table and push it down to
the probe table source, thus, in some cases, it can reduce the data.

Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:54写道:

> discussed with one of our user groups, in Spark 3.0, this is called
> "dynamic
> partition pruning"
>
> Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:
>
> > Hi,
> >
> > I am wondering if Calcite will support "lazy optimization" (execution
> time
> > optimization / runtime optimization).
> >
> > For example, we want to do an inner join between an Elasticsearch table
> > and a MySQL table, like this:
> >
> > WITH logic_table_2 AS
> >   (SELECT _MAP['status'] AS "status",
> >           _MAP['user'] AS "user"
> >    FROM "es"."insight-by-sql-v3"
> >    LIMIT 12345)
> > SELECT *
> > FROM "insight_user"."user_tab" AS t1
> > JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > WHERE t2."status" = 'fail'
> > LIMIT 10
> >
> > t2 is a ES table and t1 is a MySQL table, and it may generate a execution
> > plan like this:
> >
> > EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > user=[$1])
> >   EnumerableLimit(fetch=[10])
> >     EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> >       ElasticsearchToEnumerableConverter
> >         ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
> > 'user')])
> >           ElasticsearchFilter(condition=[=(ITEM($0, 'status'), 'fail')])
> >             ElasticsearchSort(fetch=[12345])
> >               ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
> >       JdbcToEnumerableConverter
> >         JdbcTableScan(table=[[insight_user, user_tab]])
> >
> > since here ES query has a filter, in execution Calcite will do the ES
> > query first and get the build table, and then do JdbcTableScan and get
> the
> > probe table, and do the HashJoin finally.
> >
> > But, since this is a INNER JOIN, there is an implicit filter on the later
> > JdbcTableScan:
> > ``` t1.email in (select user from t2 where t2.status='fail') ```, if
> > applying this implicit filter, the dataset we will handle may become
> > extremely small (save memory) and running much faster since the full
> > JdbcTableScan is always time-wasting. But since Calcite do the
> optimization
> > in planner phase, this dynamic/lazy optimization seems missed ...
> >
> > To summarize, serial execution with a "lazy optimization" may be faster
> > and use less memory than parallel execution with an optimized execution
> > plan since the former one can reduce dataset we handle.
> >
> > Any ideas?
> >
> >
> >
>

Re: "lazy" optmization?

Posted by Yang Liu <wh...@gmail.com>.
discussed with one of our user groups, in Spark 3.0, this is called "dynamic
partition pruning"

Yang Liu <wh...@gmail.com> 于2020年3月6日周五 下午6:12写道:

> Hi,
>
> I am wondering if Calcite will support "lazy optimization" (execution time
> optimization / runtime optimization).
>
> For example, we want to do an inner join between an Elasticsearch table
> and a MySQL table, like this:
>
> WITH logic_table_2 AS
>   (SELECT _MAP['status'] AS "status",
>           _MAP['user'] AS "user"
>    FROM "es"."insight-by-sql-v3"
>    LIMIT 12345)
> SELECT *
> FROM "insight_user"."user_tab" AS t1
> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> WHERE t2."status" = 'fail'
> LIMIT 10
>
> t2 is a ES table and t1 is a MySQL table, and it may generate a execution
> plan like this:
>
> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> user=[$1])
>   EnumerableLimit(fetch=[10])
>     EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
>       ElasticsearchToEnumerableConverter
>         ElasticsearchProject(status=[ITEM($0, 'status')], user=[ITEM($0,
> 'user')])
>           ElasticsearchFilter(condition=[=(ITEM($0, 'status'), 'fail')])
>             ElasticsearchSort(fetch=[12345])
>               ElasticsearchTableScan(table=[[es, insight-by-sql-v3]])
>       JdbcToEnumerableConverter
>         JdbcTableScan(table=[[insight_user, user_tab]])
>
> since here ES query has a filter, in execution Calcite will do the ES
> query first and get the build table, and then do JdbcTableScan and get the
> probe table, and do the HashJoin finally.
>
> But, since this is a INNER JOIN, there is an implicit filter on the later
> JdbcTableScan:
> ``` t1.email in (select user from t2 where t2.status='fail') ```, if
> applying this implicit filter, the dataset we will handle may become
> extremely small (save memory) and running much faster since the full
> JdbcTableScan is always time-wasting. But since Calcite do the optimization
> in planner phase, this dynamic/lazy optimization seems missed ...
>
> To summarize, serial execution with a "lazy optimization" may be faster
> and use less memory than parallel execution with an optimized execution
> plan since the former one can reduce dataset we handle.
>
> Any ideas?
>
>
>