You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 1048262223 <10...@qq.com> on 2020/06/01 15:03:34 UTC

回复: 关于使用IngressTime,window过小的问题

Hi
sum的实现是aggregatefunction,每次输出只会输出当前窗口的聚合结果,结果不同可能是因为上游时间戳提取是按照ingestion提取的把1 10 11分配到了两个窗口中1一个窗口,10和11一个窗口


Best
Yichao Yang



发自我的iPhone


------------------ 原始邮件 ------------------
发件人: xuefli@outlook.com <xuefli@outlook.com&gt;
发送时间: 2020年6月1日 22:41
收件人: user-zh@flink.apache.org <user-zh@flink.apache.org&gt;
主题: 回复: 关于使用IngressTime,window过小的问题



如果说window的10毫秒的状态ValueState被超时逐出了,可以理解。但不带window的聚合操作是否意味着所有的key的
ValueState都存在与StateBackend中,是否会无限制增长,超过集群的一些限制,比如内存、slot等会怎么样。
即使ValueState在window中被逐出,但代码中未明确指定TimeService和逐出器。

如果假设相同的key在被处理时在时空上间隔足够远,不带windows和带windows的输出结果是否还会不同?


发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986&gt;应用

发件人: xuefli@outlook.com<mailto:xuefli@outlook.com&gt;
发送时间: 2020年6月1日 22:27
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org&gt;
主题: 回复: 关于使用IngressTime,window过小的问题

如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
输出
```
+++++++++++++++++++++++++++:2&gt; (k1,1)
+++++++++++++++++++++++++++:1&gt; (k3,10)
+++++++++++++++++++++++++++:2&gt; (k1,11)
+++++++++++++++++++++++++++:8&gt; (k2,2)
+++++++++++++++++++++++++++:2&gt; (k1,22)
+++++++++++++++++++++++++++:8&gt; (k2,22)
```

发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986&gt;应用

发件人: xuefli@outlook.com<mailto:xuefli@outlook.com&gt;
发送时间: 2020年6月1日 22:22
收件人: user-zh<mailto:user-zh@flink.apache.org&gt;
主题: 关于使用IngressTime,window过小的问题

Flink 1.10,windows 10 flink api验证

代码如下
```

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.ArrayList;
import java.util.List;

public class KeyedStreamJob {
&nbsp;&nbsp;&nbsp; public static void main(String[] args) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; env.setParallelism(3);

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Tuple2<String, Integer&gt; item = null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<Tuple2<String, Integer&gt;&gt; items = new ArrayList<&gt;();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; item = new Tuple2<&gt;("k1", 1);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; items.add(item);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; item = new Tuple2<&gt;("k3", 10);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; items.add(item);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; item = new Tuple2<&gt;("k1", 10);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; items.add(item);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; item = new Tuple2<&gt;("k2", 2);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; items.add(item);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; item = new Tuple2<&gt;("k1", 11);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; items.add(item);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; item = new Tuple2<&gt;("k2", 20);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; items.add(item);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; DataStreamSource<Tuple2<String, Integer&gt;&gt; streamSource = env.fromCollection(items);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamSource
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //by 1
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; //.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .keyBy(new KeySelector<Tuple2<String, Integer&gt;, String&gt;() {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public String getKey(Tuple2<String, Integer&gt; value) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return value.f0;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; })
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .sum(1)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .print("+++++++++++++++++++++++++++");

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; env.execute("keyedSteamJob");
&nbsp;&nbsp;&nbsp; }
}

```
输出
```
+++++++++++++++++++++++++++:1&gt; (k3,10)
+++++++++++++++++++++++++++:2&gt; (k1,1)
+++++++++++++++++++++++++++:8&gt; (k2,22)
+++++++++++++++++++++++++++:2&gt; (k1,21)
```
如果把

window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成

.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++++++++++++++++++++++++++:8&gt; (k2,22)
+++++++++++++++++++++++++++:1&gt; (k3,10)
+++++++++++++++++++++++++++:2&gt; (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集

为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别

如果k1=1已经在ValueState中(2&gt;(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;


如果window改成1秒也是按照正常结果输出





发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986&gt;应用