You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by ゞ野蠻遊戲χ <zh...@vip.qq.com> on 2020/09/12 09:57:01 UTC

时间窗口和reduce算子问题

大家好


&nbsp; &nbsp; &nbsp; &nbsp;在window算子之后使用reduce算子,是否是把当前窗口的所有元素根据reduce算子计算完成之后,仅仅输出一条到下游,还是当前窗口前后2个元素每次进入reduce算子,计算完成之后就往下游输出一条?







Thanks
嘉治

Re:时间窗口和reduce算子问题

Posted by allanqinjy <al...@163.com>.
你好,
       reduce在没有开窗口的时候,是一条一条来处理的。因为keyby以后是根据key分组以后的,不开窗口是无限流的形式走的。当开了window窗口以后,你可以理解为一个batch,然后对这一块数据进行了keyby后就会有一条数据了,如果你reduce里面再有个规则,比如按照time进行大小比较,只要最近的那一条重复的,那么最后就是那一条最新的数据了。这个自己也可以做个demo。如下是自己本地的测试,你也可以体验一下。希望可以帮助到你,能力有限,哪儿说的不对请见谅!!
public class ReduceTest {

private static final Logger logger = LoggerFactory.getLogger(ReduceTest.class);

    public static void main(String[] args) {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

DataStream<String> source = env.readTextFile("/Users/allanqin/myprojects/spend-report/demo/src/main/resources/reduce.txt");

source
                .keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String s) throws Exception {
                        ObjectMapper mapper = new ObjectMapper();
ReduceEntity reduceEntity = mapper.readValue(s, ReduceEntity.class);
                        return reduceEntity.getName();
}
                }, TypeInformation.of(new TypeHint<String>() {
                }))

                .timeWindow(Time.seconds(5))
                .reduce(new ReduceFunction<String>() {

                    ReduceEntity entity = new ReduceEntity();

@Override
public String reduce(String v1, String v2) throws Exception {

//                        ObjectMapper mapper = new ObjectMapper();
//                        ReduceEntity reduceEntity = mapper.readValue(v1, ReduceEntity.class);
//                        ReduceEntity reduceEntity2 = mapper.readValue(v2, ReduceEntity.class);
//                        if (reduceEntity.getAge() > reduceEntity2.getAge()) {
//                            return v1;
//                        }

return v2;

}
                })

                .print();

env.execute("test");

}
}
txt文件内容:
{ "name" : "allanqinjy", "age" : 4 }
{ "name" : "allanqinjy", "age" : 45 }
{ "name" : "allanqinjy", "age" : 6 }
{ "name" : "allanqinjy", "age" : 9 }










在 2020-09-12 17:57:01,"ゞ野蠻遊戲χ" <zh...@vip.qq.com> 写道:

大家好



       在window算子之后使用reduce算子,是否是把当前窗口的所有元素根据reduce算子计算完成之后,仅仅输出一条到下游,还是当前窗口前后2个元素每次进入reduce算子,计算完成之后就往下游输出一条?







Thanks
嘉治