You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ever <43...@qq.com> on 2019/07/13 07:37:12 UTC

[flink 1.8.1]window closed unexpectedly and data drop

I have a streaming job based on Event time,  which has a 60 seconds window and 10 seconds sliding window.
Data will come in batches every 10 second.


Here's the code.
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
      .map(json => {
          new InvokingInfoWrapper(xxx)
        })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
        override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
          invoking.timestamp
        }
      })
      .keyBy(invokingInfo => {
        s"${invokingInfo.caller}_${invokingInfo.callee}"
      })
      .timeWindow(Time.seconds(60), Time.seconds(10))
      .reduce(innerReducer).map(invokingInfo => {
 // ##2map =================================================
      //some mapping code
      invokingInfo
      })
      .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

```




And I Have noticed that something wrong:
1. the first data came with timestamp:03:15:48
2. the second data came with timestamp:03:15:59, and triggered the reduce operation(5 reduce operations,there should be 5 window)
3. the third one: 03:16:06,   and also triggered reduce opertaions.
4. now the fourth data came with timestamp:03:17:55,   at that time, a new window should be open, and the previous window should closed and the result should enter line "##2map" above. But it didn't.
5. the fifth data came with timestamp:03:18:01, and triggered the reduce operation with the fourth data.


So it seems that the top three datas had drop silently.


Somebody help on this?

回复: [flink 1.8.1]window closed unexpectedly and data drop

Posted by Ever <43...@qq.com>.
timestamp of the fourth data is (03:17:55), and the watermark time should be 03:17:50(water mark is 5). 
That time, window of the first data(ts:03:15:48) should be closed. 


What's more, there're so many sliding windows, and some of them should be closed to.





------------------ 原始邮件 ------------------
发件人: "Hequn Cheng"<ch...@gmail.com>;
发送时间: 2019年7月14日(星期天) 中午11:49
收件人: "Ever"<43...@qq.com>;
抄送: "user"<us...@flink.apache.org>;
主题: Re: [flink 1.8.1]window closed unexpectedly and data drop



Hi Ever,

The window only fires when the watermark passes the end of a window. 


> now the fourth data came with timestamp:03:17:55,   at that time, a new window should be open, and the previous window should closed
The previous window may not close if the watermark hasn't passed the end of the window. More info about watermark here[1]. 
Furthermore, we can monitor event time by checking the watermarks in the web dashboard[2].


Best, 
Hequn


[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time




On Sat, Jul 13, 2019 at 3:37 PM Ever <43...@qq.com> wrote:

I have a streaming job based on Event time,  which has a 60 seconds window and 10 seconds sliding window.
Data will come in batches every 10 second.


Here's the code.
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
      .map(json => {
          new InvokingInfoWrapper(xxx)
        })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
        override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
          invoking.timestamp
        }
      })
      .keyBy(invokingInfo => {
        s"${invokingInfo.caller}_${invokingInfo.callee}"
      })
      .timeWindow(Time.seconds(60), Time.seconds(10))
      .reduce(innerReducer).map(invokingInfo => {
 // ##2map =================================================
      //some mapping code
      invokingInfo
      })
      .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

```




And I Have noticed that something wrong:
1. the first data came with timestamp:03:15:48
2. the second data came with timestamp:03:15:59, and triggered the reduce operation(5 reduce operations,there should be 5 window)
3. the third one: 03:16:06,   and also triggered reduce opertaions.
4. now the fourth data came with timestamp:03:17:55,   at that time, a new window should be open, and the previous window should closed and the result should enter line "##2map" above. But it didn't.
5. the fifth data came with timestamp:03:18:01, and triggered the reduce operation with the fourth data.


So it seems that the top three datas had drop silently.


Somebody help on this?

Re: [flink 1.8.1]window closed unexpectedly and data drop

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Ever,

The window only fires when the watermark passes the end of a window.

> now the fourth data came with timestamp:03:17:55,   at that time, a new
window should be open, and the previous window should closed
The previous window may not close if the watermark hasn't passed the end of
the window. More info about watermark here[1].
Furthermore, we can monitor event time by checking the watermarks in the
web dashboard[2].

Best,
Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time


On Sat, Jul 13, 2019 at 3:37 PM Ever <43...@qq.com> wrote:

> I have a streaming job based on Event time,  which has a 60 seconds window
> and 10 seconds sliding window.
> Data will come in batches every 10 second.
>
> Here's the code.
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
> env.setParallelism(parallel)
>
> env.addSource(source)
>       .map(json => {
>           new InvokingInfoWrapper(xxx)
>         })
>       .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5))
> {
>         override def extractTimestamp(invoking: InvokingInfoWrapper): Long
> = {
>           invoking.timestamp
>         }
>       })
>       .keyBy(invokingInfo => {
>         s"${invokingInfo.caller}_${invokingInfo.callee}"
>       })
>       .timeWindow(Time.seconds(60), Time.seconds(10))
>       .reduce(innerReducer).map(invokingInfo => {
>  // ##2map =================================================
>       //some mapping code
>       invokingInfo
>       })
>       .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")
> ```
>
> And I Have noticed that something wrong:
> 1. the first data came with timestamp:03:15:48
> 2. the second data came with timestamp:03:15:59, and triggered the reduce
> operation(5 reduce operations,there should be 5 window)
> 3. the third one: 03:16:06,   and also triggered reduce opertaions.
> 4. now the fourth data came with timestamp:03:17:55,   at that time, a new
> window should be open, and the previous window should closed and the result
> should enter line "##2map" above. But it didn't.
> 5. the fifth data came with timestamp:03:18:01, and triggered the reduce
> operation with the fourth data.
>
> So it seems that the top three datas had drop silently.
>
> Somebody help on this?
>