You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 酷酷的浑蛋 <ap...@163.com> on 2020/04/15 09:40:42 UTC

关于状态TTL


我在flink sql中设置了        
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a
当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明

Re: 关于状态TTL

Posted by Benchao Li <li...@gmail.com>.
而且https://issues.apache.org/jira/browse/FLINK-15938 和
https://issues.apache.org/jira/browse/FLINK-16581
这两个issue现在已经都merge了,你也可以cherry-pick过去。

Benchao Li <li...@gmail.com> 于2020年4月17日周五 下午2:54写道:

> 嗯,blink planner跟legacy planner是有一些实现上的差异。
> 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
>
> static StateTtlConfig createTtlConfig(long retentionTime, boolean stateCleaningEnabled) {
>    if (stateCleaningEnabled) {
>       checkArgument(retentionTime > 0);
>       return StateTtlConfig
>          .newBuilder(Time.milliseconds(retentionTime))
>          .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>          .cleanupInBackground() // added this line
>          .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // changed this line
>          .build();
>    } else {
>       return StateTtlConfig.DISABLED;
>    }
> }
>
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:
>
>> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>>
>>
>>
>>
>> 在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
>> 这是两个问题,
>>
>> - 状态只访问一次,可能不会清理。
>>
>>
>> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
>> - 状态已经过期了,但是会被使用到。
>> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16581
>>
>> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:
>>
>> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>>
>>
>>
>>
>> 在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
>> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
>> 所以这个问题现在是不能完全避免了。
>> 我已经建了一个jira[1]来跟踪和改进这一点。
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17199
>>
>> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:
>>
>>
>>
>>
>>
>> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
>> 在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
>> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>>
>>
>>
>>
>> 在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
>> Hi,
>>
>> 你用的是哪个版本呢?
>> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
>> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15938
>>
>> 酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:
>>
>>
>>
>> 我在flink sql中设置了
>> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
>> sql: select * from test t join test2 t2 on t.a=t2.a
>>
>>
>>
>>
>> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 关于状态TTL

Posted by LakeShen <sh...@gmail.com>.
社区版的 Planner 针对 Key 状态的清理,使用的 Timer 来进行清理。
1.9.1 Blink planner 最底层状态清理 还是使用的 StateTTLConfig 来进行清理(不是
Background),所以存在部分状态后面没读,
状态没有清理的情况

Benchao Li <li...@gmail.com> 于2020年4月21日周二 下午11:15写道:

> 我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月21日周二 下午10:37写道:
>
> > hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
> > java.lang.RuntimeException: Error while getting state
> >     at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> >     at
> >
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> >     at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221)
> >     at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205)
> >     at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
> >     at
> >
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
> >     at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> >     at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> >     at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.StateMigrationException: The new state
> > serializer cannot be incompatible.
> >     at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
> >     at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
> >     at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
> >     at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> >     at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> >     at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> >     at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> >     at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> >     at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> >     at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> >     at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> >     ... 10 more
> >
> >
> >
> >
> > 在2020年4月17日 15:27,酷酷的浑蛋<ap...@163.com> 写道:
> > 好的,非常感谢您,我去按照您说的代码改下,非常感谢
> >
> >
> >
> >
> > 在2020年4月17日 15:17,Benchao Li<li...@gmail.com> 写道:
> > 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。
> >
> > 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午3:09写道:
> >
> > 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
> > tableConfig.setIdleStateRetentionTime(Time.minutes(1),
> > Time.minutes(6));这种方式设置ttl
> >
> >
> >
> >
> > 在2020年4月17日 14:54,Benchao Li<li...@gmail.com> 写道:
> > 嗯,blink planner跟legacy planner是有一些实现上的差异。
> > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
> >
> > static StateTtlConfig createTtlConfig(long retentionTime, boolean
> > stateCleaningEnabled) {
> > if (stateCleaningEnabled) {
> > checkArgument(retentionTime > 0);
> > return StateTtlConfig
> > .newBuilder(Time.milliseconds(retentionTime))
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > .cleanupInBackground() // added this line
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > // changed this line
> > .build();
> > } else {
> > return StateTtlConfig.DISABLED;
> > }
> > }
> >
> >
> > 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:
> >
> > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
> >
> >
> >
> >
> > 在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
> > 这是两个问题,
> >
> > - 状态只访问一次,可能不会清理。
> >
> >
> >
> >
> >
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> > - 状态已经过期了,但是会被使用到。
> > 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-16581
> >
> > 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:
> >
> > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
> >
> >
> >
> >
> > 在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
> > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> > 所以这个问题现在是不能完全避免了。
> > 我已经建了一个jira[1]来跟踪和改进这一点。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-17199
> >
> > 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:
> >
> >
> >
> >
> >
> > 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> > 在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
> > 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
> >
> >
> >
> >
> > 在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
> > Hi,
> >
> > 你用的是哪个版本呢?
> > 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> > [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-15938
> >
> > 酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:
> >
> >
> >
> > 我在flink sql中设置了
> > tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> > sql: select * from test t join test2 t2 on t.a=t2.a
> >
> >
> >
> >
> >
> >
> >
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
> >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenchao@gmail.com; libenchao@pku.edu.cn
> >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenchao@gmail.com; libenchao@pku.edu.cn
> >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenchao@gmail.com; libenchao@pku.edu.cn
> >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenchao@gmail.com; libenchao@pku.edu.cn
> >
> >
> >
> > --
> >
> > Benchao Li
> > School of Electronics Engineering and Computer Science, Peking University
> > Tel:+86-15650713730
> > Email: libenchao@gmail.com; libenchao@pku.edu.cn
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>

Re: 关于状态TTL

Posted by Benchao Li <li...@gmail.com>.
我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。

酷酷的浑蛋 <ap...@163.com> 于2020年4月21日周二 下午10:37写道:

> hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
> java.lang.RuntimeException: Error while getting state
>     at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
>     at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
>     at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221)
>     at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205)
>     at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
>     at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible.
>     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
>     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
>     at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
>     at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
>     at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
>     at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
>     at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>     at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
>     at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
>     at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>     at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
>     ... 10 more
>
>
>
>
> 在2020年4月17日 15:27,酷酷的浑蛋<ap...@163.com> 写道:
> 好的,非常感谢您,我去按照您说的代码改下,非常感谢
>
>
>
>
> 在2020年4月17日 15:17,Benchao Li<li...@gmail.com> 写道:
> 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午3:09写道:
>
> 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
> tableConfig.setIdleStateRetentionTime(Time.minutes(1),
> Time.minutes(6));这种方式设置ttl
>
>
>
>
> 在2020年4月17日 14:54,Benchao Li<li...@gmail.com> 写道:
> 嗯,blink planner跟legacy planner是有一些实现上的差异。
> 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
>
> static StateTtlConfig createTtlConfig(long retentionTime, boolean
> stateCleaningEnabled) {
> if (stateCleaningEnabled) {
> checkArgument(retentionTime > 0);
> return StateTtlConfig
> .newBuilder(Time.milliseconds(retentionTime))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .cleanupInBackground() // added this line
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> // changed this line
> .build();
> } else {
> return StateTtlConfig.DISABLED;
> }
> }
>
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:
>
> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>
>
>
>
> 在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
> 这是两个问题,
>
> - 状态只访问一次,可能不会清理。
>
>
>
>
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> - 状态已经过期了,但是会被使用到。
> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16581
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:
>
> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
>
>
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
    at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221)
    at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205)
    at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
    at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
    ... 10 more




在2020年4月17日 15:27,酷酷的浑蛋<ap...@163.com> 写道:
好的,非常感谢您,我去按照您说的代码改下,非常感谢




在2020年4月17日 15:17,Benchao Li<li...@gmail.com> 写道:
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午3:09写道:

我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
tableConfig.setIdleStateRetentionTime(Time.minutes(1),
Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li<li...@gmail.com> 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。



这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a





当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
    at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
    at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:221)
    at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.<init>(JoinRecordStateViews.java:205)
    at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
    at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
    at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
    at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
    ... 10 more




在2020年4月17日 15:27,酷酷的浑蛋<ap...@163.com> 写道:
好的,非常感谢您,我去按照您说的代码改下,非常感谢




在2020年4月17日 15:17,Benchao Li<li...@gmail.com> 写道:
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午3:09写道:

我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
tableConfig.setIdleStateRetentionTime(Time.minutes(1),
Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li<li...@gmail.com> 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。



这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a





当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.
好的,非常感谢您,我去按照您说的代码改下,非常感谢




在2020年4月17日 15:17,Benchao Li<li...@gmail.com> 写道:
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午3:09写道:

我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
tableConfig.setIdleStateRetentionTime(Time.minutes(1),
Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li<li...@gmail.com> 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。



这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a





当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 关于状态TTL

Posted by Benchao Li <li...@gmail.com>.
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午3:09写道:

> 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
>  tableConfig.setIdleStateRetentionTime(Time.minutes(1),
> Time.minutes(6));这种方式设置ttl
>
>
>
>
> 在2020年4月17日 14:54,Benchao Li<li...@gmail.com> 写道:
> 嗯,blink planner跟legacy planner是有一些实现上的差异。
> 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
>
> static StateTtlConfig createTtlConfig(long retentionTime, boolean
> stateCleaningEnabled) {
> if (stateCleaningEnabled) {
> checkArgument(retentionTime > 0);
> return StateTtlConfig
> .newBuilder(Time.milliseconds(retentionTime))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .cleanupInBackground() // added this line
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> // changed this line
> .build();
> } else {
> return StateTtlConfig.DISABLED;
> }
> }
>
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:
>
> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>
>
>
>
> 在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
> 这是两个问题,
>
> - 状态只访问一次,可能不会清理。
>
>
>
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> - 状态已经过期了,但是会被使用到。
> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16581
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:
>
> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
>
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.
我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有         tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li<li...@gmail.com> 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。


这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a




当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 关于状态TTL

Posted by Benchao Li <li...@gmail.com>.
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
   if (stateCleaningEnabled) {
      checkArgument(retentionTime > 0);
      return StateTtlConfig
         .newBuilder(Time.milliseconds(retentionTime))
         .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
         .cleanupInBackground() // added this line
         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
         .build();
   } else {
      return StateTtlConfig.DISABLED;
   }
}


酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:47写道:

> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>
>
>
>
> 在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
> 这是两个问题,
>
> - 状态只访问一次,可能不会清理。
>
>
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> - 状态已经过期了,但是会被使用到。
> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16581
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:
>
> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.
我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li<li...@gmail.com> 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。

这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a



当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 关于状态TTL

Posted by Benchao Li <li...@gmail.com>.
这是两个问题,

- 状态只访问一次,可能不会清理。

这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
  这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午2:06写道:

> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.
其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li<li...@gmail.com> 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a


当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 关于状态TTL

Posted by Benchao Li <li...@gmail.com>.
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋 <ap...@163.com> 于2020年4月17日周五 下午12:51写道:

>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenchao@gmail.com; libenchao@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.



我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋<ap...@163.com> 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a

当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

回复: 关于状态TTL

Posted by 酷酷的浑蛋 <ap...@163.com>.
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li<li...@gmail.com> 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a

当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: 关于状态TTL

Posted by Benchao Li <li...@gmail.com>.
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋 <ap...@163.com> 于2020年4月15日周三 下午5:40写道:

>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn