You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (Jira)" <ji...@apache.org> on 2020/12/08 06:42:10 UTC
[jira] [Updated] (FLINK-15876) The alternative code for
GroupedProcessingTimeWindowExample don't compile pass current version
[ https://issues.apache.org/jira/browse/FLINK-15876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger updated FLINK-15876:
-----------------------------------
Fix Version/s: (was: 1.12.0)
1.13.0
> The alternative code for GroupedProcessingTimeWindowExample don't compile pass current version
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-15876
> URL: https://issues.apache.org/jira/browse/FLINK-15876
> Project: Flink
> Issue Type: Bug
> Components: Examples
> Affects Versions: 1.6.3, 1.7.2, 1.9.2
> Environment: Mac osx 10.14
> JDK 1.8.202
> Reporter: Wong
> Priority: Minor
> Fix For: 1.13.0
>
>
> stream
> .keyBy(0)
> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
> .reduce(new SummingReducer())
> // alternative: use a apply function which does not pre-aggregate
> // .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
> // .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
> // .apply(new SummingWindowFunction())
> .addSink(new SinkFunction<Tuple2<Long, Long>>() {
> @Override
> public void invoke(Tuple2<Long, Long> value) {
> }
> });
>
>
> if use The alternative code ,the compile doe'st comile it successfully. The api is used a serveral major version ago.
> I change it to this
> .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
> @Override
> public Long getKey(Tuple2<Long, Long> value) throws Exception {
> return value.f0;
> }
> })
> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>
> private static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, TimeWindow> {
> @Override
> public void apply(Long key, TimeWindow window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
> long sum = 0L;
> for (Tuple2<Long, Long> value : values) {
> sum += value.f1;
> }
> out.collect(new Tuple2<>(key, sum));
> }
> }
> And it passed
> .
--
This message was sent by Atlassian Jira
(v8.3.4#803005)