You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "xuefli@outlook.com" <xu...@outlook.com> on 2020/06/01 14:22:41 UTC
关于使用IngressTime,window过小的问题
Flink 1.10,windows 10 flink api验证
代码如下
```
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
import java.util.List;
public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setParallelism(3);
Tuple2<String, Integer> item = null;
List<Tuple2<String, Integer>> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 10);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 11);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items);
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++++++++++++++++++++++++++");
env.execute("keyedSteamJob");
}
}
```
输出
```
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,1)
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:2> (k1,21)
```
如果把
window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集
为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别
如果k1=1已经在ValueState中(2>(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;
如果window改成1秒也是按照正常结果输出
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
Re:回复: 关于使用IngressTime,window过小的问题
Posted by chaojianok <ch...@163.com>.
把 Time.milliseconds(10L) 改成 Time.seconds(10L) 后,其实是改变了时间窗口的大小,这会使以前在同一个窗口的数据现在被分在了两个窗口里,而聚合的时候是按照窗口进行聚合的,所以结果变了。
在 2020-06-01 22:41:14,"xuefli@outlook.com" <xu...@outlook.com> 写道:
>如果说window的10毫秒的状态ValueState被超时逐出了,可以理解。但不带window的聚合操作是否意味着所有的key的
>ValueState都存在与StateBackend中,是否会无限制增长,超过集群的一些限制,比如内存、slot等会怎么样。
>即使ValueState在window中被逐出,但代码中未明确指定TimeService和逐出器。
>
>如果假设相同的key在被处理时在时空上间隔足够远,不带windows和带windows的输出结果是否还会不同?
>
>
>发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>发件人: xuefli@outlook.com<ma...@outlook.com>
>发送时间: 2020年6月1日 22:27
>收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
>主题: 回复: 关于使用IngressTime,window过小的问题
>
>如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
>输出
>```
>+++++++++++++++++++++++++++:2> (k1,1)
>+++++++++++++++++++++++++++:1> (k3,10)
>+++++++++++++++++++++++++++:2> (k1,11)
>+++++++++++++++++++++++++++:8> (k2,2)
>+++++++++++++++++++++++++++:2> (k1,22)
>+++++++++++++++++++++++++++:8> (k2,22)
>```
>
>发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>发件人: xuefli@outlook.com<ma...@outlook.com>
>发送时间: 2020年6月1日 22:22
>收件人: user-zh<ma...@flink.apache.org>
>主题: 关于使用IngressTime,window过小的问题
>
>Flink 1.10,windows 10 flink api验证
>
>代码如下
>```
>
>import org.apache.flink.api.java.functions.KeySelector;
>import org.apache.flink.api.java.tuple.Tuple2;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import org.apache.flink.streaming.api.datastream.DataStreamSource;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
>import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>
>import java.util.ArrayList;
>import java.util.List;
>
>public class KeyedStreamJob {
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>// env.setParallelism(3);
>
> Tuple2<String, Integer> item = null;
> List<Tuple2<String, Integer>> items = new ArrayList<>();
> item = new Tuple2<>("k1", 1);
> items.add(item);
> item = new Tuple2<>("k3", 10);
> items.add(item);
> item = new Tuple2<>("k1", 10);
> items.add(item);
> item = new Tuple2<>("k2", 2);
> items.add(item);
> item = new Tuple2<>("k1", 11);
> items.add(item);
> item = new Tuple2<>("k2", 20);
> items.add(item);
> DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items);
> streamSource
> //by 1
> //.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
> .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
> @Override
> public String getKey(Tuple2<String, Integer> value) throws Exception {
> return value.f0;
> }
> })
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
> .sum(1)
> .print("+++++++++++++++++++++++++++");
>
> env.execute("keyedSteamJob");
> }
>}
>
>```
>输出
>```
>+++++++++++++++++++++++++++:1> (k3,10)
>+++++++++++++++++++++++++++:2> (k1,1)
>+++++++++++++++++++++++++++:8> (k2,22)
>+++++++++++++++++++++++++++:2> (k1,21)
>```
>如果把
>
>window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
>改成
>
>.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
>输出
>```
>+++++++++++++++++++++++++++:8> (k2,22)
>+++++++++++++++++++++++++++:1> (k3,10)
>+++++++++++++++++++++++++++:2> (k1,22)
>```
>两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集
>
>为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别
>
>如果k1=1已经在ValueState中(2>(k1,1)),
>那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;
>
>
>如果window改成1秒也是按照正常结果输出
>
>
>
>
>
>发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
回复: 关于使用IngressTime,window过小的问题
Posted by "xuefli@outlook.com" <xu...@outlook.com>.
如果说window的10毫秒的状态ValueState被超时逐出了,可以理解。但不带window的聚合操作是否意味着所有的key的
ValueState都存在与StateBackend中,是否会无限制增长,超过集群的一些限制,比如内存、slot等会怎么样。
即使ValueState在window中被逐出,但代码中未明确指定TimeService和逐出器。
如果假设相同的key在被处理时在时空上间隔足够远,不带windows和带windows的输出结果是否还会不同?
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
发件人: xuefli@outlook.com<ma...@outlook.com>
发送时间: 2020年6月1日 22:27
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
主题: 回复: 关于使用IngressTime,window过小的问题
如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
输出
```
+++++++++++++++++++++++++++:2> (k1,1)
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,11)
+++++++++++++++++++++++++++:8> (k2,2)
+++++++++++++++++++++++++++:2> (k1,22)
+++++++++++++++++++++++++++:8> (k2,22)
```
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
发件人: xuefli@outlook.com<ma...@outlook.com>
发送时间: 2020年6月1日 22:22
收件人: user-zh<ma...@flink.apache.org>
主题: 关于使用IngressTime,window过小的问题
Flink 1.10,windows 10 flink api验证
代码如下
```
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
import java.util.List;
public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setParallelism(3);
Tuple2<String, Integer> item = null;
List<Tuple2<String, Integer>> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 10);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 11);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items);
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++++++++++++++++++++++++++");
env.execute("keyedSteamJob");
}
}
```
输出
```
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,1)
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:2> (k1,21)
```
如果把
window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集
为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别
如果k1=1已经在ValueState中(2>(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;
如果window改成1秒也是按照正常结果输出
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
回复: 关于使用IngressTime,window过小的问题
Posted by "xuefli@outlook.com" <xu...@outlook.com>.
如果不使用window,那么输出会按照ValueState的存量的key的ValueState聚合
输出
```
+++++++++++++++++++++++++++:2> (k1,1)
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,11)
+++++++++++++++++++++++++++:8> (k2,2)
+++++++++++++++++++++++++++:2> (k1,22)
+++++++++++++++++++++++++++:8> (k2,22)
```
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
发件人: xuefli@outlook.com<ma...@outlook.com>
发送时间: 2020年6月1日 22:22
收件人: user-zh<ma...@flink.apache.org>
主题: 关于使用IngressTime,window过小的问题
Flink 1.10,windows 10 flink api验证
代码如下
```
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.ArrayList;
import java.util.List;
public class KeyedStreamJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setParallelism(3);
Tuple2<String, Integer> item = null;
List<Tuple2<String, Integer>> items = new ArrayList<>();
item = new Tuple2<>("k1", 1);
items.add(item);
item = new Tuple2<>("k3", 10);
items.add(item);
item = new Tuple2<>("k1", 10);
items.add(item);
item = new Tuple2<>("k2", 2);
items.add(item);
item = new Tuple2<>("k1", 11);
items.add(item);
item = new Tuple2<>("k2", 20);
items.add(item);
DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromCollection(items);
streamSource
//by 1
//.assignTimestampsAndWatermarks(new IngestionTimeExtractor())
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
.sum(1)
.print("+++++++++++++++++++++++++++");
env.execute("keyedSteamJob");
}
}
```
输出
```
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,1)
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:2> (k1,21)
```
如果把
window(TumblingEventTimeWindows.of(Time.milliseconds(10L)))
改成
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
输出
```
+++++++++++++++++++++++++++:8> (k2,22)
+++++++++++++++++++++++++++:1> (k3,10)
+++++++++++++++++++++++++++:2> (k1,22)
```
两次不同的windows窗口,第一次输出对于key=‘k1‘不聚集,第二次输出聚集
为什么会这样,如何验证怎么样的过程处理流程导致这样的结果区别
如果k1=1已经在ValueState中(2>(k1,1)),
那么再次输出时currentKey=k1时,这个时候ValueState的value是1,那么输出应该是10+11+1,而不是10+11;
如果window改成1秒也是按照正常结果输出
发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用