You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 沉醉寒風 <10...@qq.com> on 2021/02/07 06:40:43 UTC
关于flink窗口state
import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; ... DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream.join(greenStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply (new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });简写这个例子, 我在richJoinFunction使用valueState,给每个窗口做一个汇总 发现state会一直增长不会随窗口的销毁而销毁, 请问下有什么办法实现?
Re: 关于flink窗口state
Posted by HunterXHunter <13...@qq.com>.
你这代码贴的乱七八糟。。。
你需要再richjoinfunction里面设置valuestate的生命周期,他不随着窗口而销毁,窗口只会销毁自己设定的state,有空你可以看看window的源码,里面有清理state的逻辑
--
Sent from: http://apache-flink.147419.n8.nabble.com/