You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@calcite.apache.org by Alireza Samadian <as...@google.com.INVALID> on 2019/07/02 21:22:54 UTC

Streams Query Optimization Using Rate and Window Size

Dear Members of Calcite Community,

I'm working on Apache Beam SQL and we use Calcite for query optimization.
We represent both tables and streams as a subclass of
AbstractQueryableTable. In calcite implementation of cost model and
statistics, one of the key elements is row count. Also all the nodes can
present a rowcount estimation based on their inputs. For instance, in the
case of joins, the rowcount is estimated by:
left.rowCount*right.rowCount*selectivity_estimate.

My first question is, what is the correct way of representing streams in
Calcite's Optimizer? Does calcite still uses row_count for streams? If so,
what does row count represent in case of streams?

In [1] they are suggesting to use both window (size of window in terms of
tuples) and rate to represent output of all nodes in stream processing
systems, and for every node these two values are estimated. For instance,
they suggest to estimate window and rate of the joins using:
join_rate = (left_rate*right_window + right_rate*left_window)*selectivitiy
join_window = (left_window*right_window)*selectivitiy

We were thinking about using this approach for Beam SQL; however, I am
wondering where would be the point of extension? I was thinking to
implement computeSelfCost() using a different cost model (rate,
window_size) for our physical Rel Nodes, in which we don't call
estimate_row_count and instead we use inputs' non cumulative cost to
estimate the node's cost. However, I am not sure if this is a good approach
and whether this can potentially cause problems in the optimization
(because there will still be logical nodes that are implemented in calcite
and may use row count estimation). Does calcite uses cost estimation for
logical nodes such as logical join? or it only calculates the cost when the
nodes are physical?

I will appreciate if someone can help me. I will also appreciate if someone
has other suggestions for streams query optimization.

Best,
Alireza Samadian

[1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
conjunctive queries with sliding windows over infinite streams." *Proceedings
of the 2004 ACM SIGMOD international conference on Management of data*.
ACM, 2004.

Re: Streams Query Optimization Using Rate and Window Size

Posted by Julian Hyde <jh...@apache.org>.
Row-count is not the only statistic you can use in computing your cost function. You can use many statistics in your cost function (e.g. selectivity, column cardinality, predicates).

So, if you decide to use rows-per-minute as your cost function, you can compute rows-per-minute of the join based on a combination of rows-per-minute and row-count of the input streams.

Julian


> On Jul 15, 2019, at 11:57 AM, Alireza Samadian <as...@google.com.INVALID> wrote:
> 
> Hi Julian,
> 
> Thank you for you reply.
> 
> I think the problem with interpreting rowCount as the rate is we need both
> rate and window size of the inputs for the estimation of output rate of the
> RelNodes, and this cannot be embedded into a single number.
> 
> As an example, let A and B be two windowed streams with rates r(A) and r(B)
> that are joined (for simplicity let's assume its a cross join) and we want
> to estimate the output rate of the join. The output rate cannot be
> estimated as r(A)r(B). That is because even if the rates are small, having
> a large window size causes us to produce many tuples per unit of time in
> the output. Or take an example of aggregation RelNode. The output rate of
> aggregation depends on both rate and window size of its input because for
> every window it is emitting one tuple.
> 
> For combination of bounded sources and unbounded sources in many cases the
> number of times that a table is scanned is once in the total execution. For
> instance, let U1 be an unbounded source, and B1 and B2 be bounded sources.
> Then in Join(U1, Join(B1,B2)) both B1 and B2 are scanned only once, which
> means the suggested rows per second for them is zero. This can
> potentially makes problem if a part of the query plan is bounded sources
> joining together (the row count for all of them will be zero). Furthermore,
> for bounded sources, we can consider rate=0 and the row count can be
> interpreted as a window size (in terms of tuples) and that can be used when
> we are trying to estimate the output rate of Join(U1,B1); whereas, I am not
> sure how considering rows per second for bounded sources can help us
> estimating the output rate.
> 
> Best,
> Alireza
> 
> On Fri, Jul 12, 2019 at 5:43 PM Julian Hyde <jh...@apache.org> wrote:
> 
>> In practice, the rowCount is just a number. So you can think of it as
>> rows-per-second if you are optimizing a continuous query.
>> 
>> If you are using a table in a streaming query, does it have a “rows per
>> second?”. Yes - it is the number of rows in the table multiplied by the
>> number of times per second that the table is scanned.
>> 
>> I don’t know of any good academic work on optimizing continuous queries.
>> (There is work on self-tuning queries, as indeed there is plenty of work on
>> self-tuning sort & join algorithms for traditional DB, but nothing I know
>> that bridges streaming query to classical cost-based query optimization.) I
>> will remark that streaming queries have one big disadvantage one big
>> advantage. The disadvantage is that there are no stats available at the
>> start, so you are not able to do cost-based optimization. The advantage is
>> that the query runs forever, so you can start off with a bad plan, gather
>> stats, and periodically re-optimize. The optimal plan might change over
>> time as the streams change volume.
>> 
>> Julian
>> 
>> 
>>> On Jul 10, 2019, at 4:23 PM, Stamatis Zampetakis <za...@gmail.com>
>> wrote:
>>> 
>>> Looking forward for the outcome :)
>>> 
>>> Below a few comments regarding the extensibility concerns of Kenn.
>>> 
>>> In order to find the best plan the VolcanoPlanner just needs to know if
>> one
>>> cost is less than another cost [1] and this is encapsulated in the
>>> isLe/isLt methods [2].
>>> Adding a new cost class (other than VolcanoCost) which implements the
>>> RelOptCost interface and respect its contract should work.
>>> The VolcanoPlanner can be instantiated with a custom RelOptCostFactory
>> [3]
>>> which returns any kind of RelOptCost object.
>>> 
>>> It is true that there is a strong link with CPU, I/O, and cardinality
>>> metrics but it is hard to imagine an optimizer that does not take these
>>> into consideration.
>>> At the moment the cost comparison in Volcano is rather simple since it
>> uses
>>> only the cardinality estimations [4] so I guess we could improve on this.
>>> However recent studies [5] have shown that cardinality estimations matter
>>> much more than other metrics (such as I/O and CPU) so in the end it may
>> not
>>> worth the effort.
>>> 
>>> Best,
>>> Stamatis
>>> 
>>> [1]
>>> 
>> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java#L348
>>> [2]
>>> 
>> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/RelOptCost.java#L83
>>> [3]
>>> 
>> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L263
>>> [4]
>>> 
>> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoCost.java#L113
>>> [5] Viktor Leis, Andrey Gubichev, Atanas Mirchev, Peter A. Boncz, Alfons
>>> Kemper, Thomas Neumann: How Good Are Query Optimizers, Really? PVLDB
>> 9(3):
>>> 204-215 (2015) (http://www.vldb.org/pvldb/vol9/p204-leis.pdf)
>>> 
>>> On Wed, Jul 10, 2019 at 11:45 PM Alireza Samadian
>>> <as...@google.com.invalid> wrote:
>>> 
>>>> Dear Stamatis,
>>>> Thank you for your reply. I will probably go with overriding
>>>> computeSelfCost() as the first step. I checked it, and it seems to be
>>>> working.
>>>> 
>>>> Dear Kenn,
>>>> The cited paper estimates those two values for each node and passes it
>> up
>>>> but they are not the cost. The cost of a node depends on the operation
>> we
>>>> are performing on the input and the rate of the input (input to that
>>>> relational node). So for all of the nodes the cost is modeled as c*rate
>>>> where c is the number of operations per tuple and rate is the rate of
>> the
>>>> input. It might be possible to have some other factors in the
>> calculation
>>>> of the cost for each node. So at the end there will be always a single
>>>> scalar as the cost of a node. This single scalar can be a function of
>>>> number of rows accessed, number of I/O access, and etc.. In calcite it
>> is
>>>> assumed any value that we are getting as a cost is going to be function
>> of
>>>> (row_count, CPU, I/O). Note that in the streaming model there is no need
>>>> for window to be in the cost (the cost does not depend on it), I am
>>>> including it in the cost model only because estimating the output rate
>> of
>>>> the nodes depends on it, and I don't know any other way to get it from
>> the
>>>> inputs of the RelNodes.
>>>> 
>>>> Best,
>>>> Alireza
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <ke...@apache.org>
>> wrote:
>>>> 
>>>>> Following this discussion, I have a question which I think is on topic.
>>>>> Seems like there's two places that from my brief reading are not quite
>>>>> extensible enough.
>>>>> 
>>>>> 1. RelNode.computeSelfCost returns RelOptCost has particular measures
>>>> built
>>>>> in. Would Alireza's proposal require extensibility here to add/remove
>>>>> measures? The planner seems to depend on them being CPU, IO, rows.
>>>>> 2. But the VolcanoPlanner also just adds the above together to a single
>>>>> scalar. Does the cited paper avoid this practice and instead retain
>> both
>>>>> measures?
>>>>> 
>>>>> Again, I'm just jumping around in the code to educate myself.
>>>>> 
>>>>> Kenn
>>>>> 
>>>>> On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <za...@gmail.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Alireza,
>>>>>> 
>>>>>> Cost models for streams is a very cool topic but I don't have much
>>>>>> knowledge in the domain.
>>>>>> 
>>>>>> Regarding the implementation details if you have custom physical
>>>>> operators
>>>>>> then it makes sense to implement computeSelfCost() function as you see
>>>>> fit.
>>>>>> 
>>>>>> Another option is to plug in your custom RelMetadataProvider [1]; you
>>>> can
>>>>>> find a few examples in RelMetadataTest [2].
>>>>>> That way you can also change the cost function of existing operators
>>>>>> (logical or not) without changing the operators themselves.
>>>>>> 
>>>>>> As far as it concerns the cost of logical operators the behavior of
>> the
>>>>>> planner can be customized [3].
>>>>>> The most common configuration is to ignore the cost of logical
>>>> operators
>>>>> so
>>>>>> leaving it as infinite.
>>>>>> 
>>>>>> Best,
>>>>>> Stamatis
>>>>>> 
>>>>>> [1]
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
>>>>>> [2]
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005
>>>>>> 
>>>>>> On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
>>>>>> <as...@google.com.invalid> wrote:
>>>>>> 
>>>>>>> Dear Members of Calcite Community,
>>>>>>> 
>>>>>>> I'm working on Apache Beam SQL and we use Calcite for query
>>>>> optimization.
>>>>>>> We represent both tables and streams as a subclass of
>>>>>>> AbstractQueryableTable. In calcite implementation of cost model and
>>>>>>> statistics, one of the key elements is row count. Also all the nodes
>>>>> can
>>>>>>> present a rowcount estimation based on their inputs. For instance, in
>>>>> the
>>>>>>> case of joins, the rowcount is estimated by:
>>>>>>> left.rowCount*right.rowCount*selectivity_estimate.
>>>>>>> 
>>>>>>> My first question is, what is the correct way of representing streams
>>>>> in
>>>>>>> Calcite's Optimizer? Does calcite still uses row_count for streams?
>>>> If
>>>>>> so,
>>>>>>> what does row count represent in case of streams?
>>>>>>> 
>>>>>>> In [1] they are suggesting to use both window (size of window in
>>>> terms
>>>>> of
>>>>>>> tuples) and rate to represent output of all nodes in stream
>>>> processing
>>>>>>> systems, and for every node these two values are estimated. For
>>>>> instance,
>>>>>>> they suggest to estimate window and rate of the joins using:
>>>>>>> join_rate = (left_rate*right_window +
>>>>>> right_rate*left_window)*selectivitiy
>>>>>>> join_window = (left_window*right_window)*selectivitiy
>>>>>>> 
>>>>>>> We were thinking about using this approach for Beam SQL; however, I
>>>> am
>>>>>>> wondering where would be the point of extension? I was thinking to
>>>>>>> implement computeSelfCost() using a different cost model (rate,
>>>>>>> window_size) for our physical Rel Nodes, in which we don't call
>>>>>>> estimate_row_count and instead we use inputs' non cumulative cost to
>>>>>>> estimate the node's cost. However, I am not sure if this is a good
>>>>>> approach
>>>>>>> and whether this can potentially cause problems in the optimization
>>>>>>> (because there will still be logical nodes that are implemented in
>>>>>> calcite
>>>>>>> and may use row count estimation). Does calcite uses cost estimation
>>>>> for
>>>>>>> logical nodes such as logical join? or it only calculates the cost
>>>> when
>>>>>> the
>>>>>>> nodes are physical?
>>>>>>> 
>>>>>>> I will appreciate if someone can help me. I will also appreciate if
>>>>>> someone
>>>>>>> has other suggestions for streams query optimization.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Alireza Samadian
>>>>>>> 
>>>>>>> [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
>>>>>>> conjunctive queries with sliding windows over infinite streams."
>>>>>>> *Proceedings
>>>>>>> of the 2004 ACM SIGMOD international conference on Management of
>>>> data*.
>>>>>>> ACM, 2004.
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
>> 


Re: Streams Query Optimization Using Rate and Window Size

Posted by Alireza Samadian <as...@google.com.INVALID>.
Hi Julian,

Thank you for you reply.

I think the problem with interpreting rowCount as the rate is we need both
rate and window size of the inputs for the estimation of output rate of the
RelNodes, and this cannot be embedded into a single number.

As an example, let A and B be two windowed streams with rates r(A) and r(B)
that are joined (for simplicity let's assume its a cross join) and we want
to estimate the output rate of the join. The output rate cannot be
estimated as r(A)r(B). That is because even if the rates are small, having
a large window size causes us to produce many tuples per unit of time in
the output. Or take an example of aggregation RelNode. The output rate of
aggregation depends on both rate and window size of its input because for
every window it is emitting one tuple.

For combination of bounded sources and unbounded sources in many cases the
number of times that a table is scanned is once in the total execution. For
instance, let U1 be an unbounded source, and B1 and B2 be bounded sources.
Then in Join(U1, Join(B1,B2)) both B1 and B2 are scanned only once, which
means the suggested rows per second for them is zero. This can
potentially makes problem if a part of the query plan is bounded sources
joining together (the row count for all of them will be zero). Furthermore,
for bounded sources, we can consider rate=0 and the row count can be
interpreted as a window size (in terms of tuples) and that can be used when
we are trying to estimate the output rate of Join(U1,B1); whereas, I am not
sure how considering rows per second for bounded sources can help us
estimating the output rate.

Best,
Alireza

On Fri, Jul 12, 2019 at 5:43 PM Julian Hyde <jh...@apache.org> wrote:

> In practice, the rowCount is just a number. So you can think of it as
> rows-per-second if you are optimizing a continuous query.
>
> If you are using a table in a streaming query, does it have a “rows per
> second?”. Yes - it is the number of rows in the table multiplied by the
> number of times per second that the table is scanned.
>
> I don’t know of any good academic work on optimizing continuous queries.
> (There is work on self-tuning queries, as indeed there is plenty of work on
> self-tuning sort & join algorithms for traditional DB, but nothing I know
> that bridges streaming query to classical cost-based query optimization.) I
> will remark that streaming queries have one big disadvantage one big
> advantage. The disadvantage is that there are no stats available at the
> start, so you are not able to do cost-based optimization. The advantage is
> that the query runs forever, so you can start off with a bad plan, gather
> stats, and periodically re-optimize. The optimal plan might change over
> time as the streams change volume.
>
> Julian
>
>
> > On Jul 10, 2019, at 4:23 PM, Stamatis Zampetakis <za...@gmail.com>
> wrote:
> >
> > Looking forward for the outcome :)
> >
> > Below a few comments regarding the extensibility concerns of Kenn.
> >
> > In order to find the best plan the VolcanoPlanner just needs to know if
> one
> > cost is less than another cost [1] and this is encapsulated in the
> > isLe/isLt methods [2].
> > Adding a new cost class (other than VolcanoCost) which implements the
> > RelOptCost interface and respect its contract should work.
> > The VolcanoPlanner can be instantiated with a custom RelOptCostFactory
> [3]
> > which returns any kind of RelOptCost object.
> >
> > It is true that there is a strong link with CPU, I/O, and cardinality
> > metrics but it is hard to imagine an optimizer that does not take these
> > into consideration.
> > At the moment the cost comparison in Volcano is rather simple since it
> uses
> > only the cardinality estimations [4] so I guess we could improve on this.
> > However recent studies [5] have shown that cardinality estimations matter
> > much more than other metrics (such as I/O and CPU) so in the end it may
> not
> > worth the effort.
> >
> > Best,
> > Stamatis
> >
> > [1]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java#L348
> > [2]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/RelOptCost.java#L83
> > [3]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L263
> > [4]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoCost.java#L113
> > [5] Viktor Leis, Andrey Gubichev, Atanas Mirchev, Peter A. Boncz, Alfons
> > Kemper, Thomas Neumann: How Good Are Query Optimizers, Really? PVLDB
> 9(3):
> > 204-215 (2015) (http://www.vldb.org/pvldb/vol9/p204-leis.pdf)
> >
> > On Wed, Jul 10, 2019 at 11:45 PM Alireza Samadian
> > <as...@google.com.invalid> wrote:
> >
> >> Dear Stamatis,
> >> Thank you for your reply. I will probably go with overriding
> >> computeSelfCost() as the first step. I checked it, and it seems to be
> >> working.
> >>
> >> Dear Kenn,
> >> The cited paper estimates those two values for each node and passes it
> up
> >> but they are not the cost. The cost of a node depends on the operation
> we
> >> are performing on the input and the rate of the input (input to that
> >> relational node). So for all of the nodes the cost is modeled as c*rate
> >> where c is the number of operations per tuple and rate is the rate of
> the
> >> input. It might be possible to have some other factors in the
> calculation
> >> of the cost for each node. So at the end there will be always a single
> >> scalar as the cost of a node. This single scalar can be a function of
> >> number of rows accessed, number of I/O access, and etc.. In calcite it
> is
> >> assumed any value that we are getting as a cost is going to be function
> of
> >> (row_count, CPU, I/O). Note that in the streaming model there is no need
> >> for window to be in the cost (the cost does not depend on it), I am
> >> including it in the cost model only because estimating the output rate
> of
> >> the nodes depends on it, and I don't know any other way to get it from
> the
> >> inputs of the RelNodes.
> >>
> >> Best,
> >> Alireza
> >>
> >>
> >>
> >>
> >> On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>
> >>> Following this discussion, I have a question which I think is on topic.
> >>> Seems like there's two places that from my brief reading are not quite
> >>> extensible enough.
> >>>
> >>> 1. RelNode.computeSelfCost returns RelOptCost has particular measures
> >> built
> >>> in. Would Alireza's proposal require extensibility here to add/remove
> >>> measures? The planner seems to depend on them being CPU, IO, rows.
> >>> 2. But the VolcanoPlanner also just adds the above together to a single
> >>> scalar. Does the cited paper avoid this practice and instead retain
> both
> >>> measures?
> >>>
> >>> Again, I'm just jumping around in the code to educate myself.
> >>>
> >>> Kenn
> >>>
> >>> On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <za...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Alireza,
> >>>>
> >>>> Cost models for streams is a very cool topic but I don't have much
> >>>> knowledge in the domain.
> >>>>
> >>>> Regarding the implementation details if you have custom physical
> >>> operators
> >>>> then it makes sense to implement computeSelfCost() function as you see
> >>> fit.
> >>>>
> >>>> Another option is to plug in your custom RelMetadataProvider [1]; you
> >> can
> >>>> find a few examples in RelMetadataTest [2].
> >>>> That way you can also change the cost function of existing operators
> >>>> (logical or not) without changing the operators themselves.
> >>>>
> >>>> As far as it concerns the cost of logical operators the behavior of
> the
> >>>> planner can be customized [3].
> >>>> The most common configuration is to ignore the cost of logical
> >> operators
> >>> so
> >>>> leaving it as infinite.
> >>>>
> >>>> Best,
> >>>> Stamatis
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
> >>>> [2]
> >>>>
> >>>>
> >>>
> >>
> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005
> >>>>
> >>>> On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
> >>>> <as...@google.com.invalid> wrote:
> >>>>
> >>>>> Dear Members of Calcite Community,
> >>>>>
> >>>>> I'm working on Apache Beam SQL and we use Calcite for query
> >>> optimization.
> >>>>> We represent both tables and streams as a subclass of
> >>>>> AbstractQueryableTable. In calcite implementation of cost model and
> >>>>> statistics, one of the key elements is row count. Also all the nodes
> >>> can
> >>>>> present a rowcount estimation based on their inputs. For instance, in
> >>> the
> >>>>> case of joins, the rowcount is estimated by:
> >>>>> left.rowCount*right.rowCount*selectivity_estimate.
> >>>>>
> >>>>> My first question is, what is the correct way of representing streams
> >>> in
> >>>>> Calcite's Optimizer? Does calcite still uses row_count for streams?
> >> If
> >>>> so,
> >>>>> what does row count represent in case of streams?
> >>>>>
> >>>>> In [1] they are suggesting to use both window (size of window in
> >> terms
> >>> of
> >>>>> tuples) and rate to represent output of all nodes in stream
> >> processing
> >>>>> systems, and for every node these two values are estimated. For
> >>> instance,
> >>>>> they suggest to estimate window and rate of the joins using:
> >>>>> join_rate = (left_rate*right_window +
> >>>> right_rate*left_window)*selectivitiy
> >>>>> join_window = (left_window*right_window)*selectivitiy
> >>>>>
> >>>>> We were thinking about using this approach for Beam SQL; however, I
> >> am
> >>>>> wondering where would be the point of extension? I was thinking to
> >>>>> implement computeSelfCost() using a different cost model (rate,
> >>>>> window_size) for our physical Rel Nodes, in which we don't call
> >>>>> estimate_row_count and instead we use inputs' non cumulative cost to
> >>>>> estimate the node's cost. However, I am not sure if this is a good
> >>>> approach
> >>>>> and whether this can potentially cause problems in the optimization
> >>>>> (because there will still be logical nodes that are implemented in
> >>>> calcite
> >>>>> and may use row count estimation). Does calcite uses cost estimation
> >>> for
> >>>>> logical nodes such as logical join? or it only calculates the cost
> >> when
> >>>> the
> >>>>> nodes are physical?
> >>>>>
> >>>>> I will appreciate if someone can help me. I will also appreciate if
> >>>> someone
> >>>>> has other suggestions for streams query optimization.
> >>>>>
> >>>>> Best,
> >>>>> Alireza Samadian
> >>>>>
> >>>>> [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
> >>>>> conjunctive queries with sliding windows over infinite streams."
> >>>>> *Proceedings
> >>>>> of the 2004 ACM SIGMOD international conference on Management of
> >> data*.
> >>>>> ACM, 2004.
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Streams Query Optimization Using Rate and Window Size

Posted by Kenneth Knowles <ke...@apache.org>.
On Fri, Jul 12, 2019 at 5:43 PM Julian Hyde <jh...@apache.org> wrote:

> In practice, the rowCount is just a number. So you can think of it as
> rows-per-second if you are optimizing a continuous query.
>
> If you are using a table in a streaming query, does it have a “rows per
> second?”. Yes - it is the number of rows in the table multiplied by the
> number of times per second that the table is scanned.
>
> I don’t know of any good academic work on optimizing continuous queries.
> (There is work on self-tuning queries, as indeed there is plenty of work on
> self-tuning sort & join algorithms for traditional DB, but nothing I know
> that bridges streaming query to classical cost-based query optimization.) I
> will remark that streaming queries have one big disadvantage one big
> advantage. The disadvantage is that there are no stats available at the
> start, so you are not able to do cost-based optimization. The advantage is
> that the query runs forever, so you can start off with a bad plan, gather
> stats, and periodically re-optimize. The optimal plan might change over
> time as the streams change volume.
>

Yea, this is a very tricky point. For Beam SQL backends like Flink and
Dataflow, re-planning based on the gathered info or changes in the nature
of the inputs would require data migrations beyond current capabilities. Do
you know of a system that dynamically re-plans and migrates intermediate
state?

FWIW you might be able to have some stats at the start from a metadata
store, but it is of course subject to change.

Kenn


> Julian
>
>
> > On Jul 10, 2019, at 4:23 PM, Stamatis Zampetakis <za...@gmail.com>
> wrote:
> >
> > Looking forward for the outcome :)
> >
> > Below a few comments regarding the extensibility concerns of Kenn.
> >
> > In order to find the best plan the VolcanoPlanner just needs to know if
> one
> > cost is less than another cost [1] and this is encapsulated in the
> > isLe/isLt methods [2].
> > Adding a new cost class (other than VolcanoCost) which implements the
> > RelOptCost interface and respect its contract should work.
> > The VolcanoPlanner can be instantiated with a custom RelOptCostFactory
> [3]
> > which returns any kind of RelOptCost object.
> >
> > It is true that there is a strong link with CPU, I/O, and cardinality
> > metrics but it is hard to imagine an optimizer that does not take these
> > into consideration.
> > At the moment the cost comparison in Volcano is rather simple since it
> uses
> > only the cardinality estimations [4] so I guess we could improve on this.
> > However recent studies [5] have shown that cardinality estimations matter
> > much more than other metrics (such as I/O and CPU) so in the end it may
> not
> > worth the effort.
> >
> > Best,
> > Stamatis
> >
> > [1]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java#L348
> > [2]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/RelOptCost.java#L83
> > [3]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L263
> > [4]
> >
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoCost.java#L113
> > [5] Viktor Leis, Andrey Gubichev, Atanas Mirchev, Peter A. Boncz, Alfons
> > Kemper, Thomas Neumann: How Good Are Query Optimizers, Really? PVLDB
> 9(3):
> > 204-215 (2015) (http://www.vldb.org/pvldb/vol9/p204-leis.pdf)
> >
> > On Wed, Jul 10, 2019 at 11:45 PM Alireza Samadian
> > <as...@google.com.invalid> wrote:
> >
> >> Dear Stamatis,
> >> Thank you for your reply. I will probably go with overriding
> >> computeSelfCost() as the first step. I checked it, and it seems to be
> >> working.
> >>
> >> Dear Kenn,
> >> The cited paper estimates those two values for each node and passes it
> up
> >> but they are not the cost. The cost of a node depends on the operation
> we
> >> are performing on the input and the rate of the input (input to that
> >> relational node). So for all of the nodes the cost is modeled as c*rate
> >> where c is the number of operations per tuple and rate is the rate of
> the
> >> input. It might be possible to have some other factors in the
> calculation
> >> of the cost for each node. So at the end there will be always a single
> >> scalar as the cost of a node. This single scalar can be a function of
> >> number of rows accessed, number of I/O access, and etc.. In calcite it
> is
> >> assumed any value that we are getting as a cost is going to be function
> of
> >> (row_count, CPU, I/O). Note that in the streaming model there is no need
> >> for window to be in the cost (the cost does not depend on it), I am
> >> including it in the cost model only because estimating the output rate
> of
> >> the nodes depends on it, and I don't know any other way to get it from
> the
> >> inputs of the RelNodes.
> >>
> >> Best,
> >> Alireza
> >>
> >>
> >>
> >>
> >> On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>
> >>> Following this discussion, I have a question which I think is on topic.
> >>> Seems like there's two places that from my brief reading are not quite
> >>> extensible enough.
> >>>
> >>> 1. RelNode.computeSelfCost returns RelOptCost has particular measures
> >> built
> >>> in. Would Alireza's proposal require extensibility here to add/remove
> >>> measures? The planner seems to depend on them being CPU, IO, rows.
> >>> 2. But the VolcanoPlanner also just adds the above together to a single
> >>> scalar. Does the cited paper avoid this practice and instead retain
> both
> >>> measures?
> >>>
> >>> Again, I'm just jumping around in the code to educate myself.
> >>>
> >>> Kenn
> >>>
> >>> On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <za...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Alireza,
> >>>>
> >>>> Cost models for streams is a very cool topic but I don't have much
> >>>> knowledge in the domain.
> >>>>
> >>>> Regarding the implementation details if you have custom physical
> >>> operators
> >>>> then it makes sense to implement computeSelfCost() function as you see
> >>> fit.
> >>>>
> >>>> Another option is to plug in your custom RelMetadataProvider [1]; you
> >> can
> >>>> find a few examples in RelMetadataTest [2].
> >>>> That way you can also change the cost function of existing operators
> >>>> (logical or not) without changing the operators themselves.
> >>>>
> >>>> As far as it concerns the cost of logical operators the behavior of
> the
> >>>> planner can be customized [3].
> >>>> The most common configuration is to ignore the cost of logical
> >> operators
> >>> so
> >>>> leaving it as infinite.
> >>>>
> >>>> Best,
> >>>> Stamatis
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
> >>>> [2]
> >>>>
> >>>>
> >>>
> >>
> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005
> >>>>
> >>>> On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
> >>>> <as...@google.com.invalid> wrote:
> >>>>
> >>>>> Dear Members of Calcite Community,
> >>>>>
> >>>>> I'm working on Apache Beam SQL and we use Calcite for query
> >>> optimization.
> >>>>> We represent both tables and streams as a subclass of
> >>>>> AbstractQueryableTable. In calcite implementation of cost model and
> >>>>> statistics, one of the key elements is row count. Also all the nodes
> >>> can
> >>>>> present a rowcount estimation based on their inputs. For instance, in
> >>> the
> >>>>> case of joins, the rowcount is estimated by:
> >>>>> left.rowCount*right.rowCount*selectivity_estimate.
> >>>>>
> >>>>> My first question is, what is the correct way of representing streams
> >>> in
> >>>>> Calcite's Optimizer? Does calcite still uses row_count for streams?
> >> If
> >>>> so,
> >>>>> what does row count represent in case of streams?
> >>>>>
> >>>>> In [1] they are suggesting to use both window (size of window in
> >> terms
> >>> of
> >>>>> tuples) and rate to represent output of all nodes in stream
> >> processing
> >>>>> systems, and for every node these two values are estimated. For
> >>> instance,
> >>>>> they suggest to estimate window and rate of the joins using:
> >>>>> join_rate = (left_rate*right_window +
> >>>> right_rate*left_window)*selectivitiy
> >>>>> join_window = (left_window*right_window)*selectivitiy
> >>>>>
> >>>>> We were thinking about using this approach for Beam SQL; however, I
> >> am
> >>>>> wondering where would be the point of extension? I was thinking to
> >>>>> implement computeSelfCost() using a different cost model (rate,
> >>>>> window_size) for our physical Rel Nodes, in which we don't call
> >>>>> estimate_row_count and instead we use inputs' non cumulative cost to
> >>>>> estimate the node's cost. However, I am not sure if this is a good
> >>>> approach
> >>>>> and whether this can potentially cause problems in the optimization
> >>>>> (because there will still be logical nodes that are implemented in
> >>>> calcite
> >>>>> and may use row count estimation). Does calcite uses cost estimation
> >>> for
> >>>>> logical nodes such as logical join? or it only calculates the cost
> >> when
> >>>> the
> >>>>> nodes are physical?
> >>>>>
> >>>>> I will appreciate if someone can help me. I will also appreciate if
> >>>> someone
> >>>>> has other suggestions for streams query optimization.
> >>>>>
> >>>>> Best,
> >>>>> Alireza Samadian
> >>>>>
> >>>>> [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
> >>>>> conjunctive queries with sliding windows over infinite streams."
> >>>>> *Proceedings
> >>>>> of the 2004 ACM SIGMOD international conference on Management of
> >> data*.
> >>>>> ACM, 2004.
> >>>>>
> >>>>
> >>>
> >>
>
>

Re: Streams Query Optimization Using Rate and Window Size

Posted by Julian Hyde <jh...@apache.org>.
In practice, the rowCount is just a number. So you can think of it as rows-per-second if you are optimizing a continuous query.

If you are using a table in a streaming query, does it have a “rows per second?”. Yes - it is the number of rows in the table multiplied by the number of times per second that the table is scanned.

I don’t know of any good academic work on optimizing continuous queries. (There is work on self-tuning queries, as indeed there is plenty of work on self-tuning sort & join algorithms for traditional DB, but nothing I know that bridges streaming query to classical cost-based query optimization.) I will remark that streaming queries have one big disadvantage one big advantage. The disadvantage is that there are no stats available at the start, so you are not able to do cost-based optimization. The advantage is that the query runs forever, so you can start off with a bad plan, gather stats, and periodically re-optimize. The optimal plan might change over time as the streams change volume.

Julian


> On Jul 10, 2019, at 4:23 PM, Stamatis Zampetakis <za...@gmail.com> wrote:
> 
> Looking forward for the outcome :)
> 
> Below a few comments regarding the extensibility concerns of Kenn.
> 
> In order to find the best plan the VolcanoPlanner just needs to know if one
> cost is less than another cost [1] and this is encapsulated in the
> isLe/isLt methods [2].
> Adding a new cost class (other than VolcanoCost) which implements the
> RelOptCost interface and respect its contract should work.
> The VolcanoPlanner can be instantiated with a custom RelOptCostFactory [3]
> which returns any kind of RelOptCost object.
> 
> It is true that there is a strong link with CPU, I/O, and cardinality
> metrics but it is hard to imagine an optimizer that does not take these
> into consideration.
> At the moment the cost comparison in Volcano is rather simple since it uses
> only the cardinality estimations [4] so I guess we could improve on this.
> However recent studies [5] have shown that cardinality estimations matter
> much more than other metrics (such as I/O and CPU) so in the end it may not
> worth the effort.
> 
> Best,
> Stamatis
> 
> [1]
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java#L348
> [2]
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/RelOptCost.java#L83
> [3]
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L263
> [4]
> https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoCost.java#L113
> [5] Viktor Leis, Andrey Gubichev, Atanas Mirchev, Peter A. Boncz, Alfons
> Kemper, Thomas Neumann: How Good Are Query Optimizers, Really? PVLDB 9(3):
> 204-215 (2015) (http://www.vldb.org/pvldb/vol9/p204-leis.pdf)
> 
> On Wed, Jul 10, 2019 at 11:45 PM Alireza Samadian
> <as...@google.com.invalid> wrote:
> 
>> Dear Stamatis,
>> Thank you for your reply. I will probably go with overriding
>> computeSelfCost() as the first step. I checked it, and it seems to be
>> working.
>> 
>> Dear Kenn,
>> The cited paper estimates those two values for each node and passes it up
>> but they are not the cost. The cost of a node depends on the operation we
>> are performing on the input and the rate of the input (input to that
>> relational node). So for all of the nodes the cost is modeled as c*rate
>> where c is the number of operations per tuple and rate is the rate of the
>> input. It might be possible to have some other factors in the calculation
>> of the cost for each node. So at the end there will be always a single
>> scalar as the cost of a node. This single scalar can be a function of
>> number of rows accessed, number of I/O access, and etc.. In calcite it is
>> assumed any value that we are getting as a cost is going to be function of
>> (row_count, CPU, I/O). Note that in the streaming model there is no need
>> for window to be in the cost (the cost does not depend on it), I am
>> including it in the cost model only because estimating the output rate of
>> the nodes depends on it, and I don't know any other way to get it from the
>> inputs of the RelNodes.
>> 
>> Best,
>> Alireza
>> 
>> 
>> 
>> 
>> On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <ke...@apache.org> wrote:
>> 
>>> Following this discussion, I have a question which I think is on topic.
>>> Seems like there's two places that from my brief reading are not quite
>>> extensible enough.
>>> 
>>> 1. RelNode.computeSelfCost returns RelOptCost has particular measures
>> built
>>> in. Would Alireza's proposal require extensibility here to add/remove
>>> measures? The planner seems to depend on them being CPU, IO, rows.
>>> 2. But the VolcanoPlanner also just adds the above together to a single
>>> scalar. Does the cited paper avoid this practice and instead retain both
>>> measures?
>>> 
>>> Again, I'm just jumping around in the code to educate myself.
>>> 
>>> Kenn
>>> 
>>> On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <za...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Alireza,
>>>> 
>>>> Cost models for streams is a very cool topic but I don't have much
>>>> knowledge in the domain.
>>>> 
>>>> Regarding the implementation details if you have custom physical
>>> operators
>>>> then it makes sense to implement computeSelfCost() function as you see
>>> fit.
>>>> 
>>>> Another option is to plug in your custom RelMetadataProvider [1]; you
>> can
>>>> find a few examples in RelMetadataTest [2].
>>>> That way you can also change the cost function of existing operators
>>>> (logical or not) without changing the operators themselves.
>>>> 
>>>> As far as it concerns the cost of logical operators the behavior of the
>>>> planner can be customized [3].
>>>> The most common configuration is to ignore the cost of logical
>> operators
>>> so
>>>> leaving it as infinite.
>>>> 
>>>> Best,
>>>> Stamatis
>>>> 
>>>> [1]
>>>> 
>>>> 
>>> 
>> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
>>>> [2]
>>>> 
>>>> 
>>> 
>> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005
>>>> 
>>>> On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
>>>> <as...@google.com.invalid> wrote:
>>>> 
>>>>> Dear Members of Calcite Community,
>>>>> 
>>>>> I'm working on Apache Beam SQL and we use Calcite for query
>>> optimization.
>>>>> We represent both tables and streams as a subclass of
>>>>> AbstractQueryableTable. In calcite implementation of cost model and
>>>>> statistics, one of the key elements is row count. Also all the nodes
>>> can
>>>>> present a rowcount estimation based on their inputs. For instance, in
>>> the
>>>>> case of joins, the rowcount is estimated by:
>>>>> left.rowCount*right.rowCount*selectivity_estimate.
>>>>> 
>>>>> My first question is, what is the correct way of representing streams
>>> in
>>>>> Calcite's Optimizer? Does calcite still uses row_count for streams?
>> If
>>>> so,
>>>>> what does row count represent in case of streams?
>>>>> 
>>>>> In [1] they are suggesting to use both window (size of window in
>> terms
>>> of
>>>>> tuples) and rate to represent output of all nodes in stream
>> processing
>>>>> systems, and for every node these two values are estimated. For
>>> instance,
>>>>> they suggest to estimate window and rate of the joins using:
>>>>> join_rate = (left_rate*right_window +
>>>> right_rate*left_window)*selectivitiy
>>>>> join_window = (left_window*right_window)*selectivitiy
>>>>> 
>>>>> We were thinking about using this approach for Beam SQL; however, I
>> am
>>>>> wondering where would be the point of extension? I was thinking to
>>>>> implement computeSelfCost() using a different cost model (rate,
>>>>> window_size) for our physical Rel Nodes, in which we don't call
>>>>> estimate_row_count and instead we use inputs' non cumulative cost to
>>>>> estimate the node's cost. However, I am not sure if this is a good
>>>> approach
>>>>> and whether this can potentially cause problems in the optimization
>>>>> (because there will still be logical nodes that are implemented in
>>>> calcite
>>>>> and may use row count estimation). Does calcite uses cost estimation
>>> for
>>>>> logical nodes such as logical join? or it only calculates the cost
>> when
>>>> the
>>>>> nodes are physical?
>>>>> 
>>>>> I will appreciate if someone can help me. I will also appreciate if
>>>> someone
>>>>> has other suggestions for streams query optimization.
>>>>> 
>>>>> Best,
>>>>> Alireza Samadian
>>>>> 
>>>>> [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
>>>>> conjunctive queries with sliding windows over infinite streams."
>>>>> *Proceedings
>>>>> of the 2004 ACM SIGMOD international conference on Management of
>> data*.
>>>>> ACM, 2004.
>>>>> 
>>>> 
>>> 
>> 


Re: Streams Query Optimization Using Rate and Window Size

Posted by Stamatis Zampetakis <za...@gmail.com>.
Looking forward for the outcome :)

Below a few comments regarding the extensibility concerns of Kenn.

In order to find the best plan the VolcanoPlanner just needs to know if one
cost is less than another cost [1] and this is encapsulated in the
isLe/isLt methods [2].
Adding a new cost class (other than VolcanoCost) which implements the
RelOptCost interface and respect its contract should work.
The VolcanoPlanner can be instantiated with a custom RelOptCostFactory [3]
which returns any kind of RelOptCost object.

It is true that there is a strong link with CPU, I/O, and cardinality
metrics but it is hard to imagine an optimizer that does not take these
into consideration.
At the moment the cost comparison in Volcano is rather simple since it uses
only the cardinality estimations [4] so I guess we could improve on this.
However recent studies [5] have shown that cardinality estimations matter
much more than other metrics (such as I/O and CPU) so in the end it may not
worth the effort.

Best,
Stamatis

[1]
https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/RelSubset.java#L348
[2]
https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/RelOptCost.java#L83
[3]
https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoPlanner.java#L263
[4]
https://github.com/apache/calcite/blob/72c52f46eab7fbe57ae5236ecc3831113684ff71/core/src/main/java/org/apache/calcite/plan/volcano/VolcanoCost.java#L113
[5] Viktor Leis, Andrey Gubichev, Atanas Mirchev, Peter A. Boncz, Alfons
Kemper, Thomas Neumann: How Good Are Query Optimizers, Really? PVLDB 9(3):
204-215 (2015) (http://www.vldb.org/pvldb/vol9/p204-leis.pdf)

On Wed, Jul 10, 2019 at 11:45 PM Alireza Samadian
<as...@google.com.invalid> wrote:

> Dear Stamatis,
> Thank you for your reply. I will probably go with overriding
> computeSelfCost() as the first step. I checked it, and it seems to be
> working.
>
> Dear Kenn,
> The cited paper estimates those two values for each node and passes it up
> but they are not the cost. The cost of a node depends on the operation we
> are performing on the input and the rate of the input (input to that
> relational node). So for all of the nodes the cost is modeled as c*rate
> where c is the number of operations per tuple and rate is the rate of the
> input. It might be possible to have some other factors in the calculation
> of the cost for each node. So at the end there will be always a single
> scalar as the cost of a node. This single scalar can be a function of
> number of rows accessed, number of I/O access, and etc.. In calcite it is
> assumed any value that we are getting as a cost is going to be function of
> (row_count, CPU, I/O). Note that in the streaming model there is no need
> for window to be in the cost (the cost does not depend on it), I am
> including it in the cost model only because estimating the output rate of
> the nodes depends on it, and I don't know any other way to get it from the
> inputs of the RelNodes.
>
> Best,
> Alireza
>
>
>
>
> On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <ke...@apache.org> wrote:
>
> > Following this discussion, I have a question which I think is on topic.
> > Seems like there's two places that from my brief reading are not quite
> > extensible enough.
> >
> > 1. RelNode.computeSelfCost returns RelOptCost has particular measures
> built
> > in. Would Alireza's proposal require extensibility here to add/remove
> > measures? The planner seems to depend on them being CPU, IO, rows.
> > 2. But the VolcanoPlanner also just adds the above together to a single
> > scalar. Does the cited paper avoid this practice and instead retain both
> > measures?
> >
> > Again, I'm just jumping around in the code to educate myself.
> >
> > Kenn
> >
> > On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <za...@gmail.com>
> > wrote:
> >
> > > Hi Alireza,
> > >
> > > Cost models for streams is a very cool topic but I don't have much
> > > knowledge in the domain.
> > >
> > > Regarding the implementation details if you have custom physical
> > operators
> > > then it makes sense to implement computeSelfCost() function as you see
> > fit.
> > >
> > > Another option is to plug in your custom RelMetadataProvider [1]; you
> can
> > > find a few examples in RelMetadataTest [2].
> > > That way you can also change the cost function of existing operators
> > > (logical or not) without changing the operators themselves.
> > >
> > > As far as it concerns the cost of logical operators the behavior of the
> > > planner can be customized [3].
> > > The most common configuration is to ignore the cost of logical
> operators
> > so
> > > leaving it as infinite.
> > >
> > > Best,
> > > Stamatis
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
> > > [2]
> > >
> > >
> >
> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005
> > >
> > > On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
> > > <as...@google.com.invalid> wrote:
> > >
> > > > Dear Members of Calcite Community,
> > > >
> > > > I'm working on Apache Beam SQL and we use Calcite for query
> > optimization.
> > > > We represent both tables and streams as a subclass of
> > > > AbstractQueryableTable. In calcite implementation of cost model and
> > > > statistics, one of the key elements is row count. Also all the nodes
> > can
> > > > present a rowcount estimation based on their inputs. For instance, in
> > the
> > > > case of joins, the rowcount is estimated by:
> > > > left.rowCount*right.rowCount*selectivity_estimate.
> > > >
> > > > My first question is, what is the correct way of representing streams
> > in
> > > > Calcite's Optimizer? Does calcite still uses row_count for streams?
> If
> > > so,
> > > > what does row count represent in case of streams?
> > > >
> > > > In [1] they are suggesting to use both window (size of window in
> terms
> > of
> > > > tuples) and rate to represent output of all nodes in stream
> processing
> > > > systems, and for every node these two values are estimated. For
> > instance,
> > > > they suggest to estimate window and rate of the joins using:
> > > > join_rate = (left_rate*right_window +
> > > right_rate*left_window)*selectivitiy
> > > > join_window = (left_window*right_window)*selectivitiy
> > > >
> > > > We were thinking about using this approach for Beam SQL; however, I
> am
> > > > wondering where would be the point of extension? I was thinking to
> > > > implement computeSelfCost() using a different cost model (rate,
> > > > window_size) for our physical Rel Nodes, in which we don't call
> > > > estimate_row_count and instead we use inputs' non cumulative cost to
> > > > estimate the node's cost. However, I am not sure if this is a good
> > > approach
> > > > and whether this can potentially cause problems in the optimization
> > > > (because there will still be logical nodes that are implemented in
> > > calcite
> > > > and may use row count estimation). Does calcite uses cost estimation
> > for
> > > > logical nodes such as logical join? or it only calculates the cost
> when
> > > the
> > > > nodes are physical?
> > > >
> > > > I will appreciate if someone can help me. I will also appreciate if
> > > someone
> > > > has other suggestions for streams query optimization.
> > > >
> > > > Best,
> > > > Alireza Samadian
> > > >
> > > > [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
> > > > conjunctive queries with sliding windows over infinite streams."
> > > > *Proceedings
> > > > of the 2004 ACM SIGMOD international conference on Management of
> data*.
> > > > ACM, 2004.
> > > >
> > >
> >
>

Re: Streams Query Optimization Using Rate and Window Size

Posted by Alireza Samadian <as...@google.com.INVALID>.
Dear Stamatis,
Thank you for your reply. I will probably go with overriding
computeSelfCost() as the first step. I checked it, and it seems to be
working.

Dear Kenn,
The cited paper estimates those two values for each node and passes it up
but they are not the cost. The cost of a node depends on the operation we
are performing on the input and the rate of the input (input to that
relational node). So for all of the nodes the cost is modeled as c*rate
where c is the number of operations per tuple and rate is the rate of the
input. It might be possible to have some other factors in the calculation
of the cost for each node. So at the end there will be always a single
scalar as the cost of a node. This single scalar can be a function of
number of rows accessed, number of I/O access, and etc.. In calcite it is
assumed any value that we are getting as a cost is going to be function of
(row_count, CPU, I/O). Note that in the streaming model there is no need
for window to be in the cost (the cost does not depend on it), I am
including it in the cost model only because estimating the output rate of
the nodes depends on it, and I don't know any other way to get it from the
inputs of the RelNodes.

Best,
Alireza




On Wed, Jul 10, 2019 at 12:40 PM Kenneth Knowles <ke...@apache.org> wrote:

> Following this discussion, I have a question which I think is on topic.
> Seems like there's two places that from my brief reading are not quite
> extensible enough.
>
> 1. RelNode.computeSelfCost returns RelOptCost has particular measures built
> in. Would Alireza's proposal require extensibility here to add/remove
> measures? The planner seems to depend on them being CPU, IO, rows.
> 2. But the VolcanoPlanner also just adds the above together to a single
> scalar. Does the cited paper avoid this practice and instead retain both
> measures?
>
> Again, I'm just jumping around in the code to educate myself.
>
> Kenn
>
> On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <za...@gmail.com>
> wrote:
>
> > Hi Alireza,
> >
> > Cost models for streams is a very cool topic but I don't have much
> > knowledge in the domain.
> >
> > Regarding the implementation details if you have custom physical
> operators
> > then it makes sense to implement computeSelfCost() function as you see
> fit.
> >
> > Another option is to plug in your custom RelMetadataProvider [1]; you can
> > find a few examples in RelMetadataTest [2].
> > That way you can also change the cost function of existing operators
> > (logical or not) without changing the operators themselves.
> >
> > As far as it concerns the cost of logical operators the behavior of the
> > planner can be customized [3].
> > The most common configuration is to ignore the cost of logical operators
> so
> > leaving it as infinite.
> >
> > Best,
> > Stamatis
> >
> > [1]
> >
> >
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
> > [2]
> >
> >
> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005
> >
> > On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
> > <as...@google.com.invalid> wrote:
> >
> > > Dear Members of Calcite Community,
> > >
> > > I'm working on Apache Beam SQL and we use Calcite for query
> optimization.
> > > We represent both tables and streams as a subclass of
> > > AbstractQueryableTable. In calcite implementation of cost model and
> > > statistics, one of the key elements is row count. Also all the nodes
> can
> > > present a rowcount estimation based on their inputs. For instance, in
> the
> > > case of joins, the rowcount is estimated by:
> > > left.rowCount*right.rowCount*selectivity_estimate.
> > >
> > > My first question is, what is the correct way of representing streams
> in
> > > Calcite's Optimizer? Does calcite still uses row_count for streams? If
> > so,
> > > what does row count represent in case of streams?
> > >
> > > In [1] they are suggesting to use both window (size of window in terms
> of
> > > tuples) and rate to represent output of all nodes in stream processing
> > > systems, and for every node these two values are estimated. For
> instance,
> > > they suggest to estimate window and rate of the joins using:
> > > join_rate = (left_rate*right_window +
> > right_rate*left_window)*selectivitiy
> > > join_window = (left_window*right_window)*selectivitiy
> > >
> > > We were thinking about using this approach for Beam SQL; however, I am
> > > wondering where would be the point of extension? I was thinking to
> > > implement computeSelfCost() using a different cost model (rate,
> > > window_size) for our physical Rel Nodes, in which we don't call
> > > estimate_row_count and instead we use inputs' non cumulative cost to
> > > estimate the node's cost. However, I am not sure if this is a good
> > approach
> > > and whether this can potentially cause problems in the optimization
> > > (because there will still be logical nodes that are implemented in
> > calcite
> > > and may use row count estimation). Does calcite uses cost estimation
> for
> > > logical nodes such as logical join? or it only calculates the cost when
> > the
> > > nodes are physical?
> > >
> > > I will appreciate if someone can help me. I will also appreciate if
> > someone
> > > has other suggestions for streams query optimization.
> > >
> > > Best,
> > > Alireza Samadian
> > >
> > > [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
> > > conjunctive queries with sliding windows over infinite streams."
> > > *Proceedings
> > > of the 2004 ACM SIGMOD international conference on Management of data*.
> > > ACM, 2004.
> > >
> >
>

Re: Streams Query Optimization Using Rate and Window Size

Posted by Kenneth Knowles <ke...@apache.org>.
Following this discussion, I have a question which I think is on topic.
Seems like there's two places that from my brief reading are not quite
extensible enough.

1. RelNode.computeSelfCost returns RelOptCost has particular measures built
in. Would Alireza's proposal require extensibility here to add/remove
measures? The planner seems to depend on them being CPU, IO, rows.
2. But the VolcanoPlanner also just adds the above together to a single
scalar. Does the cited paper avoid this practice and instead retain both
measures?

Again, I'm just jumping around in the code to educate myself.

Kenn

On Mon, Jul 8, 2019 at 4:02 PM Stamatis Zampetakis <za...@gmail.com>
wrote:

> Hi Alireza,
>
> Cost models for streams is a very cool topic but I don't have much
> knowledge in the domain.
>
> Regarding the implementation details if you have custom physical operators
> then it makes sense to implement computeSelfCost() function as you see fit.
>
> Another option is to plug in your custom RelMetadataProvider [1]; you can
> find a few examples in RelMetadataTest [2].
> That way you can also change the cost function of existing operators
> (logical or not) without changing the operators themselves.
>
> As far as it concerns the cost of logical operators the behavior of the
> planner can be customized [3].
> The most common configuration is to ignore the cost of logical operators so
> leaving it as infinite.
>
> Best,
> Stamatis
>
> [1]
>
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
> [2]
>
> https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005
>
> On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
> <as...@google.com.invalid> wrote:
>
> > Dear Members of Calcite Community,
> >
> > I'm working on Apache Beam SQL and we use Calcite for query optimization.
> > We represent both tables and streams as a subclass of
> > AbstractQueryableTable. In calcite implementation of cost model and
> > statistics, one of the key elements is row count. Also all the nodes can
> > present a rowcount estimation based on their inputs. For instance, in the
> > case of joins, the rowcount is estimated by:
> > left.rowCount*right.rowCount*selectivity_estimate.
> >
> > My first question is, what is the correct way of representing streams in
> > Calcite's Optimizer? Does calcite still uses row_count for streams? If
> so,
> > what does row count represent in case of streams?
> >
> > In [1] they are suggesting to use both window (size of window in terms of
> > tuples) and rate to represent output of all nodes in stream processing
> > systems, and for every node these two values are estimated. For instance,
> > they suggest to estimate window and rate of the joins using:
> > join_rate = (left_rate*right_window +
> right_rate*left_window)*selectivitiy
> > join_window = (left_window*right_window)*selectivitiy
> >
> > We were thinking about using this approach for Beam SQL; however, I am
> > wondering where would be the point of extension? I was thinking to
> > implement computeSelfCost() using a different cost model (rate,
> > window_size) for our physical Rel Nodes, in which we don't call
> > estimate_row_count and instead we use inputs' non cumulative cost to
> > estimate the node's cost. However, I am not sure if this is a good
> approach
> > and whether this can potentially cause problems in the optimization
> > (because there will still be logical nodes that are implemented in
> calcite
> > and may use row count estimation). Does calcite uses cost estimation for
> > logical nodes such as logical join? or it only calculates the cost when
> the
> > nodes are physical?
> >
> > I will appreciate if someone can help me. I will also appreciate if
> someone
> > has other suggestions for streams query optimization.
> >
> > Best,
> > Alireza Samadian
> >
> > [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
> > conjunctive queries with sliding windows over infinite streams."
> > *Proceedings
> > of the 2004 ACM SIGMOD international conference on Management of data*.
> > ACM, 2004.
> >
>

Re: Streams Query Optimization Using Rate and Window Size

Posted by Stamatis Zampetakis <za...@gmail.com>.
Hi Alireza,

Cost models for streams is a very cool topic but I don't have much
knowledge in the domain.

Regarding the implementation details if you have custom physical operators
then it makes sense to implement computeSelfCost() function as you see fit.

Another option is to plug in your custom RelMetadataProvider [1]; you can
find a few examples in RelMetadataTest [2].
That way you can also change the cost function of existing operators
(logical or not) without changing the operators themselves.

As far as it concerns the cost of logical operators the behavior of the
planner can be customized [3].
The most common configuration is to ignore the cost of logical operators so
leaving it as infinite.

Best,
Stamatis

[1]
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/rel/metadata/RelMetadataProvider.java
[2]
https://github.com/apache/calcite/blob/e8d598a434e8dbadaf756f8c57c748f4d7e16fdf/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java#L1005

On Tue, Jul 2, 2019 at 11:23 PM Alireza Samadian
<as...@google.com.invalid> wrote:

> Dear Members of Calcite Community,
>
> I'm working on Apache Beam SQL and we use Calcite for query optimization.
> We represent both tables and streams as a subclass of
> AbstractQueryableTable. In calcite implementation of cost model and
> statistics, one of the key elements is row count. Also all the nodes can
> present a rowcount estimation based on their inputs. For instance, in the
> case of joins, the rowcount is estimated by:
> left.rowCount*right.rowCount*selectivity_estimate.
>
> My first question is, what is the correct way of representing streams in
> Calcite's Optimizer? Does calcite still uses row_count for streams? If so,
> what does row count represent in case of streams?
>
> In [1] they are suggesting to use both window (size of window in terms of
> tuples) and rate to represent output of all nodes in stream processing
> systems, and for every node these two values are estimated. For instance,
> they suggest to estimate window and rate of the joins using:
> join_rate = (left_rate*right_window + right_rate*left_window)*selectivitiy
> join_window = (left_window*right_window)*selectivitiy
>
> We were thinking about using this approach for Beam SQL; however, I am
> wondering where would be the point of extension? I was thinking to
> implement computeSelfCost() using a different cost model (rate,
> window_size) for our physical Rel Nodes, in which we don't call
> estimate_row_count and instead we use inputs' non cumulative cost to
> estimate the node's cost. However, I am not sure if this is a good approach
> and whether this can potentially cause problems in the optimization
> (because there will still be logical nodes that are implemented in calcite
> and may use row count estimation). Does calcite uses cost estimation for
> logical nodes such as logical join? or it only calculates the cost when the
> nodes are physical?
>
> I will appreciate if someone can help me. I will also appreciate if someone
> has other suggestions for streams query optimization.
>
> Best,
> Alireza Samadian
>
> [1] Ayad, Ahmed M., and Jeffrey F. Naughton. "Static optimization of
> conjunctive queries with sliding windows over infinite streams."
> *Proceedings
> of the 2004 ACM SIGMOD international conference on Management of data*.
> ACM, 2004.
>