You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2018/03/01 13:07:00 UTC

[jira] [Closed] (FLINK-8815) EventTime won't work as reduce

     [ https://issues.apache.org/jira/browse/FLINK-8815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek closed FLINK-8815.
-----------------------------------
    Resolution: Not A Problem

Please use the mailing lists for questions about Flink in the future: http://flink.apache.org/community.html#mailing-lists

I think in your case the problem is that the watermark doesn't match the time in your stream. You emit the current time as the watermark but your events have some timestamps. I'm guessing, that the timestamps are way older than the current time, so all of those events are considered late and are being dropped.

> EventTime won't work as reduce
> ------------------------------
>
>                 Key: FLINK-8815
>                 URL: https://issues.apache.org/jira/browse/FLINK-8815
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.1
>         Environment:  
> Main Code 
>  
> {code:java}
> public class StreamingJob {
>   public static void main(String[] args) throws Exception {
>     // set up the streaming execution environment
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     ObjectMapper jsonMapper = new ObjectMapper();
>     Properties properties = new Properties();
> //    String brokers = "172.27.138.8:9092";
>     String brokers = "localhost:9092";
>     properties.setProperty("bootstrap.servers", brokers);
>     properties.setProperty("group.id", "test_fink");
>     String topic = "stream_test";
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     FlinkKafkaConsumer010<BitRate> myConsumer =
>         new FlinkKafkaConsumer010(topic, new BitRate.BitRateDeserializtionSchema(), properties);
>     DataStream<BitRate> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
>     DataStream<BitRate>
>         reduceItems =
>         stream
>             .keyBy(a -> a.gameId)
>             .timeWindow(Time.seconds(1))
>             .reduce((a, b) -> a.add(b));
>     reduceItems.print(); //never print
>     reduceItems.addSink(new FlinkKafkaProducer010<>(brokers, "stream_sink", (tuple) -> {
>       try {
>         tuple.end();
>         System.out.println(tuple.rate + "\t" + tuple.user); //never print
>         return jsonMapper.writeValueAsBytes(tuple);
>       } catch (JsonProcessingException e) {
>         e.printStackTrace();
>         return "".getBytes();
>       }
>     }));
>     env.execute("Flink Streaming Java API Skeleton");
>   }
> {code}
>  
> The reduceItems will never print, But  the bitrate add method print logs.
> My log is simple like this, all the log are generated in time.
> {code:java}
> 4281_783_1519827320460
> 7347_939_1519827320460
> 3281_984_1519827320460
> 8225_810_1519827320460
> 3956_920_1519827320460
> 6594_815_1519827320460
> 5962_925_1519827320460
> 4028_854_1519827320460
> 811_875_1519827320460
> 3837_974_1519827320460
> {code}
>  
>  
>  
> My Event BitRate
>  
> {code:java}
> public class BitRate {
>   public long eventTime;
>   public long gameId;
>   public long rate;
>   public long user;
>   public long startTs;
>   public long endTs;
>   public int type;
>   public BitRate() {
>   }
>   public BitRate(long eventTime, long gameId, long rate, long user) {
>     this.eventTime = eventTime;
>     this.gameId = gameId;
>     this.rate = rate;
>     this.user = user;
>     this.startTs = System.currentTimeMillis();
>     this.type = 0;
>   }
>   public void end() {
>     this.endTs = System.currentTimeMillis();
> //    System.out.println("end" + this.user);
>   }
>   public BitRate add(BitRate b) {
>     this.rate += b.rate;
>     this.user += b.user;
> //    System.out.println("add" + b.user);
>     return this;
>   }
> {code}
>  
>  
> My CustomWatermarkEmitter
> {code:java}
> public class CustomWatermarkEmitter implements AssignerWithPeriodicWatermarks<BitRate> {
> @Nullable
> @Override
> public Watermark getCurrentWatermark() {
> // System.out.println("get=>" + currentTs +maxTimeLag);
> return new Watermark(System.currentTimeMillis());
> }
> @Override
> public long extractTimestamp(BitRate bitRate, long l) {
> // System.out.println("extract"+bitRate.startTs + ":" + l);
> return bitRate.eventTime;
> }
> }
> {code}
>  
>  
>            Reporter: benwang li
>            Priority: Major
>
> I Use the EventTime option to do the window reduce operation.But the reduce result stream got nothing.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)