You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (Jira)" <ji...@apache.org> on 2020/12/08 06:42:10 UTC

[jira] [Updated] (FLINK-15876) The alternative code for GroupedProcessingTimeWindowExample don't compile pass current version

     [ https://issues.apache.org/jira/browse/FLINK-15876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger updated FLINK-15876:
-----------------------------------
    Fix Version/s:     (was: 1.12.0)
                   1.13.0

> The alternative code for GroupedProcessingTimeWindowExample don't compile pass current version
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15876
>                 URL: https://issues.apache.org/jira/browse/FLINK-15876
>             Project: Flink
>          Issue Type: Bug
>          Components: Examples
>    Affects Versions: 1.6.3, 1.7.2, 1.9.2
>         Environment: Mac osx 10.14
> JDK 1.8.202
>            Reporter: Wong
>            Priority: Minor
>             Fix For: 1.13.0
>
>
> stream
>  .keyBy(0)
>  .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>  .reduce(new SummingReducer())
>  // alternative: use a apply function which does not pre-aggregate
> // .keyBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
> // .window(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
> // .apply(new SummingWindowFunction())
>  .addSink(new SinkFunction<Tuple2<Long, Long>>() {
>  @Override
>  public void invoke(Tuple2<Long, Long> value) {
>  }
>  });
>  
>  
> if use The alternative code ,the compile doe'st comile it successfully. The api is used a serveral major version ago.
> I change it to this 
> .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
>  @Override
>  public Long getKey(Tuple2<Long, Long> value) throws Exception {
>  return value.f0;
>  }
> })
> .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
>  
> private static class SummingWindowFunction implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, TimeWindow> {
>  @Override
>  public void apply(Long key, TimeWindow window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
>  long sum = 0L;
>  for (Tuple2<Long, Long> value : values) {
>  sum += value.f1;
>  }
>  out.collect(new Tuple2<>(key, sum));
>  }
> } 
> And it passed
> .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)