You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sofya T. Irwin" <so...@gmail.com> on 2020/07/30 14:42:48 UTC

Options for limiting state size in TableAPI&SQL

Hi,
I'm trying to investigate a SQL job using a time-windowed join that is
exhibiting a large, growing state. The join syntax is most similar to
the interval join (
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
).

A few questions:
1. Am I correct in understanding that State TTL is generally not applicable
for TableAPI&SQL? So we cannot use State TTL to limit state size for a join?

2. It seems that Flink should be able to expire state even without explicit
settings based on this: "In TableAPI&SQL and DataStream, the window
aggregation and time-windowed join will clear expired state using Timers
which is triggered by watermark."  (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
)

To clarify: Does the above mean that Flink is expected to detect expired
state and clear it without explicit configuration to allow it to do so?

3. I've looked into setting the idle state retention time. From what I can
understand, this particular setting is appropriate for my use case.
"TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level
configuration which will enable state ttl for all non-time-based operator
states." (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
)

To clarify: Would enabling this setting control state growth? Is this only
available for blink planner? Currently we are using the StreamPlanner. Is
there any way to ensure that idle state has limited retention for
applications using the StreamPlanner?

Thanks ahead,
Sofya

Re: Options for limiting state size in TableAPI&SQL

Posted by Danny Chan <yu...@gmail.com>.
Thanks for the share ~

The query you gave is actually an interval join[1] , a windowed join is two windowed stream join together, see [2].

Theoretically, for interval join, the state would be cleaned periodically based on the watermark and allowed lateness when the range of RHS had been considered “late”.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan
在 2020年8月29日 +0800 AM12:59,Sofya T. Irwin <so...@gmail.com>,写道:
> Hi Danny,
>
> Thank you for your response.
> I'm trying to join two streams that are both fairly high volume. My join looks like this:
>
>   SELECT
>     A.rowtime as rowtime,
>     A.foo,
>     B.bar
>   FROM A
>   LEFT JOIN B
>     ON A.foo = B.foo
>     AND A.rowtime BETWEEN B.rowtime - INTERVAL  '1' HOUR AND B.rowtime
>
> When I run this SQL, the state size metric looks like a sawtooth that gradually keeps growing.
> Currently I disabled this query because of a concern it could impact other jobs.
>
> Based on your statement above, the SQL timed window is not supported?
> Is there another way I can make sure that the state only has data that is only more recent?
>
> Thank you,
> Sofya
>
> > On Thu, Aug 27, 2020 at 10:49 PM Danny Chan <da...@apache.org> wrote:
> > > Hi, Sofya T. Irwin ~
> > >
> > > Can you share your case why you need a timed-window join there ?
> > >
> > > Now the sql timed window join is not supported yet, and i want to hear your voice if it is necessary to support in SQL.
> > >
> > >
> > > > Sofya T. Irwin <so...@gmail.com> 于2020年7月30日周四 下午10:44写道:
> > > > > Hi,
> > > > > I'm trying to investigate a SQL job using a time-windowed join that is exhibiting a large, growing state. The join syntax is most similar to the interval join (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).
> > > > >
> > > > > A few questions:
> > > > > 1. Am I correct in understanding that State TTL is generally not applicable for TableAPI&SQL? So we cannot use State TTL to limit state size for a join?
> > > > >
> > > > > 2. It seems that Flink should be able to expire state even without explicit settings based on this: "In TableAPI&SQL and DataStream, the window aggregation and time-windowed join will clear expired state using Timers which is triggered by watermark."  (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Does the above mean that Flink is expected to detect expired state and clear it without explicit configuration to allow it to do so?
> > > > >
> > > > > 3. I've looked into setting the idle state retention time. From what I can understand, this particular setting is appropriate for my use case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level configuration which will enable state ttl for all non-time-based operator states." (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Would enabling this setting control state growth? Is this only available for blink planner? Currently we are using the StreamPlanner. Is there any way to ensure that idle state has limited retention for applications using the StreamPlanner?
> > > > >
> > > > > Thanks ahead,
> > > > > Sofya

Re: Options for limiting state size in TableAPI&SQL

Posted by "Sofya T. Irwin" <so...@gmail.com>.
Hi Danny,

Thank you for your response.
I'm trying to join two streams that are both fairly high volume. My join
looks like this:

  SELECT
    A.rowtime as rowtime,
    A.foo,
    B.bar
  FROM A
  LEFT JOIN B
    ON A.foo = B.foo
    AND A.rowtime BETWEEN B.rowtime - INTERVAL  '1' HOUR AND B.rowtime

When I run this SQL, the state size metric looks like a sawtooth that
gradually keeps growing.
Currently I disabled this query because of a concern it could impact other
jobs.

Based on your statement above, the SQL timed window is not supported?
Is there another way I can make sure that the state only has data that is
only more recent?

Thank you,
Sofya

On Thu, Aug 27, 2020 at 10:49 PM Danny Chan <da...@apache.org> wrote:

> Hi, Sofya T. Irwin ~
>
> Can you share your case why you need a timed-window join there ?
>
> Now the sql timed window join is not supported yet, and i want to hear
> your voice if it is necessary to support in SQL.
>
>
> Sofya T. Irwin <so...@gmail.com> 于2020年7月30日周四 下午10:44写道:
>
>> Hi,
>> I'm trying to investigate a SQL job using a time-windowed join that is
>> exhibiting a large, growing state. The join syntax is most similar to
>> the interval join (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
>> ).
>>
>> A few questions:
>> 1. Am I correct in understanding that State TTL is generally not
>> applicable for TableAPI&SQL? So we cannot use State TTL to limit state size
>> for a join?
>>
>> 2. It seems that Flink should be able to expire state even without
>> explicit settings based on this: "In TableAPI&SQL and DataStream, the
>> window aggregation and time-windowed join will clear expired state using
>> Timers which is triggered by watermark."  (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
>> )
>>
>> To clarify: Does the above mean that Flink is expected to detect expired
>> state and clear it without explicit configuration to allow it to do so?
>>
>> 3. I've looked into setting the idle state retention time. From what I
>> can understand, this particular setting is appropriate for my use case.
>> "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level
>> configuration which will enable state ttl for all non-time-based operator
>> states." (
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
>> )
>>
>> To clarify: Would enabling this setting control state growth? Is this
>> only available for blink planner? Currently we are using the StreamPlanner.
>> Is there any way to ensure that idle state has limited retention for
>> applications using the StreamPlanner?
>>
>> Thanks ahead,
>> Sofya
>>
>

Re: Options for limiting state size in TableAPI&SQL

Posted by Danny Chan <da...@apache.org>.
Hi, Sofya T. Irwin ~

Can you share your case why you need a timed-window join there ?

Now the sql timed window join is not supported yet, and i want to hear your
voice if it is necessary to support in SQL.


Sofya T. Irwin <so...@gmail.com> 于2020年7月30日周四 下午10:44写道:

> Hi,
> I'm trying to investigate a SQL job using a time-windowed join that is
> exhibiting a large, growing state. The join syntax is most similar to
> the interval join (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
> ).
>
> A few questions:
> 1. Am I correct in understanding that State TTL is generally not
> applicable for TableAPI&SQL? So we cannot use State TTL to limit state size
> for a join?
>
> 2. It seems that Flink should be able to expire state even without
> explicit settings based on this: "In TableAPI&SQL and DataStream, the
> window aggregation and time-windowed join will clear expired state using
> Timers which is triggered by watermark."  (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
> )
>
> To clarify: Does the above mean that Flink is expected to detect expired
> state and clear it without explicit configuration to allow it to do so?
>
> 3. I've looked into setting the idle state retention time. From what I can
> understand, this particular setting is appropriate for my use case.
> "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a job level
> configuration which will enable state ttl for all non-time-based operator
> states." (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html
> )
>
> To clarify: Would enabling this setting control state growth? Is this only
> available for blink planner? Currently we are using the StreamPlanner. Is
> there any way to ensure that idle state has limited retention for
> applications using the StreamPlanner?
>
> Thanks ahead,
> Sofya
>