You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alberto Ramón <a....@gmail.com> on 2016/11/07 20:33:50 UTC

Memory on Aggr

From "Relational Queries on Data Stream in Apache Flink" > Bounday Memory
Requirements
(
https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU/edit#
)


*SELECT user, page, COUNT(page) AS pCntFROM pageviews*

*GROUP BY user, page*

*-Versus-*


*SELECT user, page, COUNT(page) AS pCntFROM pageviews*

*WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last
hour*

*GROUP BY user, page*

I understand:

   - Not use WaterMark to pre-calculate agrr, and save memory
   - Store all events "as is" until the end of window

are My assumptions true ?

Re: Memory on Aggr

Posted by Alberto Ramón <a....@gmail.com>.
thanks ¡¡
Now its clear for me


2016-11-08 9:23 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> Given the semantics described in the document the query can be computed in
> principle.
> However, if the query is not bounded by time, the required state might
> grow very large if the number of distinct xx values grows over time.
> That's why we will probably enforce a time predicate or meta data that the
> value domain of xx is of constant size.
>
>
>
> 2016-11-08 9:04 GMT+01:00 Alberto Ramón <a....@gmail.com>:
>
>> Yes thanks
>>
>> Perhaps my example is too simple
>>
>> *select xx, count(), sum() from ttt group by xx*
>> Why the querie value can't be calculated each 2 secs / waterMark arrive ?
>>
>> I'm try to find the video of: http://flink-forward.org/kb_se
>> ssions/scaling-stream-processing-with-apache-flink-to-very-large-state/
>>
>> 2016-11-07 22:02 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>
>>> First of all, the document only proposes semantics for Flink's support
>>> of relational queries on streams.
>>> It does not describe the implementation and in fact most of it is not
>>> implemented.
>>>
>>> How the queries will be executed would depend on the definition of the
>>> table, i.e., whether the tables are derived in append or replace mode.
>>> For the second query we do not necessarily need to "store all events as
>>> is" but could do some pre-aggregation depending on the configured update
>>> rate.
>>> Watermarks will be used to track time in a query, i.e., to evaluate a
>>> predicate like "*BETWEEN now() - INTERVAL '1' HOUR AND now()"* where
>>> now() would be the current watermark time.
>>>
>>> There are a couple of tricks one can play to reduce the memory
>>> requirements and the implementation should try to optimize for that.
>>> However, it is true that for some queries we will need to keep the
>>> complete input relation (within its time bounds) as state.
>>> The good news is that Flink is very good a managing large state and can
>>> easily scale to hundreds of nodes.
>>>
>>> Did that answer your questions?
>>>
>>> 2016-11-07 21:33 GMT+01:00 Alberto Ramón <a....@gmail.com>:
>>>
>>>> From "Relational Queries on Data Stream in Apache Flink" > Bounday
>>>> Memory Requirements
>>>> (https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4kon
>>>> QPW4tnl8THw6rzGUdaqU/edit#)
>>>>
>>>>
>>>> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>>>>
>>>> *GROUP BY user, page*
>>>>
>>>> *-Versus-*
>>>>
>>>>
>>>> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>>>>
>>>> *WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last
>>>> hour*
>>>>
>>>> *GROUP BY user, page*
>>>>
>>>> I understand:
>>>>
>>>>    - Not use WaterMark to pre-calculate agrr, and save memory
>>>>    - Store all events "as is" until the end of window
>>>>
>>>> are My assumptions true ?
>>>>
>>>>
>>>
>>
>

Re: Memory on Aggr

Posted by Fabian Hueske <fh...@gmail.com>.
Given the semantics described in the document the query can be computed in
principle.
However, if the query is not bounded by time, the required state might grow
very large if the number of distinct xx values grows over time.
That's why we will probably enforce a time predicate or meta data that the
value domain of xx is of constant size.



2016-11-08 9:04 GMT+01:00 Alberto Ramón <a....@gmail.com>:

> Yes thanks
>
> Perhaps my example is too simple
>
> *select xx, count(), sum() from ttt group by xx*
> Why the querie value can't be calculated each 2 secs / waterMark arrive ?
>
> I'm try to find the video of: http://flink-forward.org/kb_se
> ssions/scaling-stream-processing-with-apache-flink-to-very-large-state/
>
> 2016-11-07 22:02 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>
>> First of all, the document only proposes semantics for Flink's support of
>> relational queries on streams.
>> It does not describe the implementation and in fact most of it is not
>> implemented.
>>
>> How the queries will be executed would depend on the definition of the
>> table, i.e., whether the tables are derived in append or replace mode.
>> For the second query we do not necessarily need to "store all events as
>> is" but could do some pre-aggregation depending on the configured update
>> rate.
>> Watermarks will be used to track time in a query, i.e., to evaluate a
>> predicate like "*BETWEEN now() - INTERVAL '1' HOUR AND now()"* where
>> now() would be the current watermark time.
>>
>> There are a couple of tricks one can play to reduce the memory
>> requirements and the implementation should try to optimize for that.
>> However, it is true that for some queries we will need to keep the
>> complete input relation (within its time bounds) as state.
>> The good news is that Flink is very good a managing large state and can
>> easily scale to hundreds of nodes.
>>
>> Did that answer your questions?
>>
>> 2016-11-07 21:33 GMT+01:00 Alberto Ramón <a....@gmail.com>:
>>
>>> From "Relational Queries on Data Stream in Apache Flink" > Bounday
>>> Memory Requirements
>>> (https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4kon
>>> QPW4tnl8THw6rzGUdaqU/edit#)
>>>
>>>
>>> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>>>
>>> *GROUP BY user, page*
>>>
>>> *-Versus-*
>>>
>>>
>>> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>>>
>>> *WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last
>>> hour*
>>>
>>> *GROUP BY user, page*
>>>
>>> I understand:
>>>
>>>    - Not use WaterMark to pre-calculate agrr, and save memory
>>>    - Store all events "as is" until the end of window
>>>
>>> are My assumptions true ?
>>>
>>>
>>
>

Re: Memory on Aggr

Posted by Alberto Ramón <a....@gmail.com>.
Yes thanks

Perhaps my example is too simple

*select xx, count(), sum() from ttt group by xx*
Why the querie value can't be calculated each 2 secs / waterMark arrive ?

I'm try to find the video of: http://flink-forward.org/kb_
sessions/scaling-stream-processing-with-apache-flink-to-very-large-state/

2016-11-07 22:02 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> First of all, the document only proposes semantics for Flink's support of
> relational queries on streams.
> It does not describe the implementation and in fact most of it is not
> implemented.
>
> How the queries will be executed would depend on the definition of the
> table, i.e., whether the tables are derived in append or replace mode.
> For the second query we do not necessarily need to "store all events as
> is" but could do some pre-aggregation depending on the configured update
> rate.
> Watermarks will be used to track time in a query, i.e., to evaluate a
> predicate like "*BETWEEN now() - INTERVAL '1' HOUR AND now()"* where
> now() would be the current watermark time.
>
> There are a couple of tricks one can play to reduce the memory
> requirements and the implementation should try to optimize for that.
> However, it is true that for some queries we will need to keep the
> complete input relation (within its time bounds) as state.
> The good news is that Flink is very good a managing large state and can
> easily scale to hundreds of nodes.
>
> Did that answer your questions?
>
> 2016-11-07 21:33 GMT+01:00 Alberto Ramón <a....@gmail.com>:
>
>> From "Relational Queries on Data Stream in Apache Flink" > Bounday
>> Memory Requirements
>> (https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4kon
>> QPW4tnl8THw6rzGUdaqU/edit#)
>>
>>
>> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>>
>> *GROUP BY user, page*
>>
>> *-Versus-*
>>
>>
>> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>>
>> *WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last
>> hour*
>>
>> *GROUP BY user, page*
>>
>> I understand:
>>
>>    - Not use WaterMark to pre-calculate agrr, and save memory
>>    - Store all events "as is" until the end of window
>>
>> are My assumptions true ?
>>
>>
>

Re: Memory on Aggr

Posted by Fabian Hueske <fh...@gmail.com>.
First of all, the document only proposes semantics for Flink's support of
relational queries on streams.
It does not describe the implementation and in fact most of it is not
implemented.

How the queries will be executed would depend on the definition of the
table, i.e., whether the tables are derived in append or replace mode.
For the second query we do not necessarily need to "store all events as is"
but could do some pre-aggregation depending on the configured update rate.
Watermarks will be used to track time in a query, i.e., to evaluate a
predicate like "*BETWEEN now() - INTERVAL '1' HOUR AND now()"* where now()
would be the current watermark time.

There are a couple of tricks one can play to reduce the memory requirements
and the implementation should try to optimize for that.
However, it is true that for some queries we will need to keep the complete
input relation (within its time bounds) as state.
The good news is that Flink is very good a managing large state and can
easily scale to hundreds of nodes.

Did that answer your questions?

2016-11-07 21:33 GMT+01:00 Alberto Ramón <a....@gmail.com>:

> From "Relational Queries on Data Stream in Apache Flink" > Bounday Memory
> Requirements
> (https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_
> f4konQPW4tnl8THw6rzGUdaqU/edit#)
>
>
> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>
> *GROUP BY user, page*
>
> *-Versus-*
>
>
> *SELECT user, page, COUNT(page) AS pCntFROM pageviews*
>
> *WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last
> hour*
>
> *GROUP BY user, page*
>
> I understand:
>
>    - Not use WaterMark to pre-calculate agrr, and save memory
>    - Store all events "as is" until the end of window
>
> are My assumptions true ?
>
>