You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Wong (Jira)" <ji...@apache.org> on 2020/02/04 05:58:00 UTC

[jira] [Created] (FLINK-15876) The alternative code for GroupedProcessingTimeWindowExample don't compile pass current version

Wong created FLINK-15876:
----------------------------

             Summary: The alternative code for GroupedProcessingTimeWindowExample don't compile pass current version
                 Key: FLINK-15876
                 URL: https://issues.apache.org/jira/browse/FLINK-15876
             Project: Flink
          Issue Type: Bug
          Components: Examples
    Affects Versions: 1.9.2, 1.7.2, 1.6.3
         Environment: Mac osx 10.14

JDK 1.8.202
            Reporter: Wong
             Fix For: 1.10.0


stream
 .keyBy(0)
 .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
 .reduce(new SummingReducer())

 // alternative: use a apply function which does not pre-aggregate
// .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
// .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
// .apply(new SummingWindowFunction())

 .addSink(new SinkFunction<Tuple2<Long, Long>>() {
 @Override
 public void invoke(Tuple2<Long, Long> value) {
 }
 });

 

 

if use The alternative code ,the compile doe'st comile it successfully. The api is used a serveral major version ago.

I change it to this 

.keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
 @Override
 public Long getKey(Tuple2<Long, Long> value) throws Exception {
 return value.f0;
 }
})
.timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))

 

private static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, TimeWindow> {

 @Override
 public void apply(Long key, TimeWindow window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
 long sum = 0L;
 for (Tuple2<Long, Long> value : values) {
 sum += value.f1;
 }

 out.collect(new Tuple2<>(key, sum));
 }
} 

And it passed


.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)