You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Yuan,Youjun" <yu...@baidu.com> on 2018/07/12 07:35:03 UTC

TumblingProcessingTimeWindow emits extra results for a same window

Hi community,

I have a job which counts event number every 2 minutes, with TumblingWindow in ProcessingTime. However, it occasionally produces extra DUPLICATED records. For instance, for timestamp 1531368480000 below, it emits a normal result (cnt=1641161), and then followed by a few more records with very small result (2, 3, etc).

Can anyone shed some light on the possible reason, or how to fix it?

Below are the sample output.
-----------------------------------------------------------
{"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
{"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
{"timestamp":1531368480000,"cnt":1641161,"userId":"user01"}
{"timestamp":1531368480000,"cnt":2,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}

And here is the job SQL:
-----------------------------------------------------------
INSERT INTO sink
SELECT
                TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
                count(vehicleId) AS cnt,
                userId
FROM source
                GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
                userId

Thanks,
Youjun Yuan

Re: TumblingProcessingTimeWindow emits extra results for a same window

Posted by Hequn Cheng <ch...@gmail.com>.
Ah, cool. I was thinking register a timer at T and will be triggered at
T+1ms.

On Mon, Jul 16, 2018 at 7:26 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> On a side note: even if we change this off-by-one bug, I think there can
> still be races because current processing time is queried using
> System.currentTimeMillis() and we set timers using a ScheduledThreadPoolExecutor
> (currently). If there's any race between those two you can also get weird
> results.
>
> For these reasons, I would always suggest to go with event time or
> ingestion time, but I think the latter is currently not possible with the
> Table API/SQL.
>
>
> On 16. Jul 2018, at 11:39, Aljoscha Krettek <al...@apache.org> wrote:
>
> I think there is a bug in how processing-time timers work. For event-time,
> we fire timers when the watermark is >= the timestamp, this is correct
> because a watermark T says that we will not see elements with a timestamp *smaller
> or equal* to T. For processing time, a time of T does not say that we
> won't see an element with timestamp T. Therefore the triggering behaviour
> is wrong for processing time. I created a Jira issue for this:
> https://issues.apache.org/jira/browse/FLINK-9857
>
> Best,
> Aljoscha
>
> On 16. Jul 2018, at 07:36, Yuan,Youjun <yu...@baidu.com> wrote:
>
> Hi Hequn,
>
> To my understand, a processing time window is fired at the last
> millisecond of the window(maxTimestamp). Then what will happen if more
> elements arrive *at the last millisecond, but AFTER the window is fired*?
>
> Thanks,
> Youjun
> *发件人**:* Hequn Cheng <ch...@gmail.com>
> *发送时间:* Friday, July 13, 2018 9:44 PM
> *收件人:* Yuan,Youjun <yu...@baidu.com>
> *抄送:* Timo Walther <tw...@apache.org>; user@flink.apache.org
> *主题:* Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a
> same window
>
> Hi Youjun,
>
> The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the
> rowtime value of window. Sql will be parsed and translated into some nodes,
> Source -> Calc -> Window -> Sink. The Calc is the input node of Window and
> the udf is part of Calc instead of Window. So the max_ts and min_ts is
> actually the time before entering the window, i.e, not the time in window.
>
> However, I still can't find anything valuable to solve the problem. It
> seems the window has been triggered many times for the same key. I will
> think more about it.
>
> Best, Hequn.
>
> On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun <yu...@baidu.com>
> wrote:
>
> Hi Hequn,
>
> I am using Flink 1.4. The job was running with  1 parallelism.
>
> I don’t think the extra records are caused by different keys, because:
>
>    1. I ran 2 jobs consuming the same source, jobA with 2-minute window,
>    and job with 4-minute window. If there are wired keys, then jobA will get
>    no more records than jobB, for the same period. But that not true, *jobA
>    got 17* records while *jobB got 11*. Relevant results could be found
>    below.
>    2. For each window, I output the *min and max timestamp*, and found
>    that those extra records always start at the last few milliseconds of the 2
>    or 4-minte windows, just before window got closed. I also noticed the
>    windows did not have a clear cut between minutes, as we can see in jobA’s
>    output, ts *1531448399978* appears in 18 result records, either as
>    start, or end, or both.
>
>
> jobA(2-minute window) output
> {"timestamp":1531448040000,"cnt":1668052,"userId":"user01"
> ,"min_ts":1531448040003,"max_ts":1531448159985}
> {"timestamp":1531448160000,"cnt":1613188,"userId":"user01"
> ,"min_ts":1531448159985,"max_ts":1531448279979}
> {"timestamp":1531448280000,"cnt":1664652,"userId":"user01"
> ,"min_ts":1531448280004,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
> {"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":1531448519978}
>
> jobB(4-minute window) output
> {"timestamp":1531447920000,"cnt":3306838,"userId":"user01"
> ,"min_ts":1531447919981,"max_ts":1531448159975}
> {"timestamp":1531448160000,"cnt":3278178,"userId":"user01"
> ,"min_ts":1531448159098,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":4,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":5,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":8,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":7,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":2,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448400000,"cnt":3226735,"userId":"user01"
> ,"min_ts":1531448399978,"max_ts":1531448639916}
>
> Thanks
> Youjun
>
> *发件人**:* Hequn Cheng <ch...@gmail.com>
> *发送时间:* Thursday, July 12, 2018 11:31 PM
> *收件人:* Yuan,Youjun <yu...@baidu.com>
> *抄送:* Timo Walther <tw...@apache.org>; user@flink.apache.org
> *主题:* Re: 答复: TumblingProcessingTimeWindow emits extra results for a same
> window
>
> Hi Yuan,
>
> Haven't heard about this before. Which flink version do you use? The
> cause may be:
> 1. userId not 100% identical, for example contains invisible characters.
> 2. The machine clock vibrated.
>
> Otherwise,  there are some bugs we don't know.
>
> Best, Hequn
>
> On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yu...@baidu.com> wrote:
>
> Hi Timo,
>
> This problem happens 4-5 times a day on our online server, with ~15k
> events per second load, and it is using PROCESSING time. So I don’t think I
> can stably reproduce the issue on my local machine.
> The user ids are actually the same, I have doubled checked that.
>
> Now, I am wondering could it possible that, after a window fires, some
> last events came but that still fall to the time range of the just fired
> window, hence new windows are assigned, and fired later. This can explain
> why the extra records always contain only a few events (cnt is small).
>
> To verify that, I just modified the SQL to also output the MIN timestamp
> of each windows, and I found the MIN timestamp of the*extra records are
> always point to the LAST second of the window*.
> Here is the output I just got, note *1531395119 *is the last second of a
> 2-minute window start from* 1531395000.*
> ------------------------------------------------------------
> --------------------------------------------------------------------
> {"timestamp":1531394760000,"cnt":1536013,"userId":"user01"
> ,"min_sec":1531394760}
> {"timestamp":1531394880000,"cnt":1459623,"userId":"user01"
> ,"min_sec":1531394879}
> {"timestamp":*1531395000000*,"cnt":*1446010*,"userId":"user01","min_sec":
> *1531395000*}
> {"timestamp":*1531395000000*,"cnt":*7*,"userId":"user01","min_sec":
> *1531395119*}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
> The modified SQL:
> INSERT INTO sink
> SELECT
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>                 count(vehicleId) AS cnt, userId,
>                 *MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec*
> FROM source
> GROUP BY
>                 TUMBLE(rowtime, INTERVAL '2' MINUTE),
>                 userId
>
> thanks
> Youjun
>
> *发件人**:* Timo Walther <tw...@apache.org>
> *发送时间:* Thursday, July 12, 2018 5:02 PM
> *收件人:* user@flink.apache.org
> *主题:* Re: TumblingProcessingTimeWindow emits extra results for a same
> window
>
> Hi Yuan,
>
> this sounds indeed weird. The SQL API uses regular DataStream API windows
> underneath so this problem should have come up earlier if this is problem
> in the implementation. Does this behavior reproducible on your local
> machine?
>
> One thing that comes to my mind is that the "userId"s might not be 100%
> identical (same hashCode/equals method) because otherwise they would be
> properly grouped.
>
> Regards,
> Timo
>
> Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
>
> Hi community,
>
> I have a job which counts event number every 2 minutes, with
> TumblingWindow in ProcessingTime. However, it occasionally produces extra
> DUPLICATED records. For instance, for timestamp 1531368480000 below, it
> emits a normal result (cnt=1641161), and then followed by a few more
> records with very small result (2, 3, etc).
>
> Can anyone shed some light on the possible reason, or how to fix it?
>
> Below are the sample output.
> -----------------------------------------------------------
> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":1641161,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":2,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
> And here is the job SQL:
> -----------------------------------------------------------
> INSERT INTO sink
> SELECT
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>                 count(vehicleId) AS cnt,
>                 userId
> FROM source
>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>                 userId
>
> Thanks,
> Youjun Yuan
>
>
>
>

Re: TumblingProcessingTimeWindow emits extra results for a same window

Posted by Aljoscha Krettek <al...@apache.org>.
On a side note: even if we change this off-by-one bug, I think there can still be races because current processing time is queried using System.currentTimeMillis() and we set timers using a ScheduledThreadPoolExecutor (currently). If there's any race between those two you can also get weird results.

For these reasons, I would always suggest to go with event time or ingestion time, but I think the latter is currently not possible with the Table API/SQL.


> On 16. Jul 2018, at 11:39, Aljoscha Krettek <al...@apache.org> wrote:
> 
> I think there is a bug in how processing-time timers work. For event-time, we fire timers when the watermark is >= the timestamp, this is correct because a watermark T says that we will not see elements with a timestamp smaller or equal to T. For processing time, a time of T does not say that we won't see an element with timestamp T. Therefore the triggering behaviour is wrong for processing time. I created a Jira issue for this: https://issues.apache.org/jira/browse/FLINK-9857 <https://issues.apache.org/jira/browse/FLINK-9857>
> 
> Best,
> Aljoscha
> 
>> On 16. Jul 2018, at 07:36, Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>> wrote:
>> 
>> Hi Hequn,
>>  
>> To my understand, a processing time window is fired at the last millisecond of the window(maxTimestamp). Then what will happen if more elements arrive at the last millisecond, but AFTER the window is fired?
>>  
>> Thanks,
>> Youjun
>> 发件人: Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 
>> 发送时间: Friday, July 13, 2018 9:44 PM
>> 收件人: Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>>
>> 抄送: Timo Walther <twalthr@apache.org <ma...@apache.org>>; user@flink.apache.org <ma...@flink.apache.org>
>> 主题: Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window
>>  
>> Hi Youjun,
>>  
>> The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the rowtime value of window. Sql will be parsed and translated into some nodes, Source -> Calc -> Window -> Sink. The Calc is the input node of Window and the udf is part of Calc instead of Window. So the max_ts and min_ts is actually the time before entering the window, i.e, not the time in window.
>>  
>> However, I still can't find anything valuable to solve the problem. It seems the window has been triggered many times for the same key. I will think more about it.
>>  
>> Best, Hequn.
>>  
>> On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>> wrote:
>> Hi Hequn,
>>  
>> I am using Flink 1.4. The job was running with  1 parallelism.
>>  
>> I don’t think the extra records are caused by different keys, because:
>> I ran 2 jobs consuming the same source, jobA with 2-minute window, and job with 4-minute window. If there are wired keys, then jobA will get no more records than jobB, for the same period. But that not true, jobA got 17 records while jobB got 11. Relevant results could be found below.
>> For each window, I output the min and max timestamp, and found that those extra records always start at the last few milliseconds of the 2 or 4-minte windows, just before window got closed. I also noticed the windows did not have a clear cut between minutes, as we can see in jobA’s output, ts 1531448399978 appears in 18 result records, either as start, or end, or both.
>>  
>> jobA(2-minute window) output
>> {"timestamp":1531448040000,"cnt":1668052,"userId":"user01","min_ts":1531448040003,"max_ts":1531448159985}
>> {"timestamp":1531448160000,"cnt":1613188,"userId":"user01","min_ts":1531448159985,"max_ts":1531448279979}
>> {"timestamp":1531448280000,"cnt":1664652,"userId":"user01","min_ts":1531448280004,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":1531448399978,"max_ts":1531448519978}
>>  
>> jobB(4-minute window) output
>> {"timestamp":1531447920000,"cnt":3306838,"userId":"user01","min_ts":1531447919981,"max_ts":1531448159975}
>> {"timestamp":1531448160000,"cnt":3278178,"userId":"user01","min_ts":1531448159098,"max_ts":1531448399977}
>> {"timestamp":1531448160000,"cnt":4,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
>> {"timestamp":1531448160000,"cnt":5,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
>> {"timestamp":1531448160000,"cnt":8,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":7,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
>> {"timestamp":1531448400000,"cnt":3226735,"userId":"user01","min_ts":1531448399978,"max_ts":1531448639916}
>>  
>> Thanks
>> Youjun
>>  
>> 发件人: Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 
>> 发送时间: Thursday, July 12, 2018 11:31 PM
>> 收件人: Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>>
>> 抄送: Timo Walther <twalthr@apache.org <ma...@apache.org>>; user@flink.apache.org <ma...@flink.apache.org>
>> 主题: Re: 答复: TumblingProcessingTimeWindow emits extra results for a same window
>>  
>> Hi Yuan,
>>  
>> Haven't heard about this before. Which flink version do you use? The cause may be:
>> 1. userId not 100% identical, for example contains invisible characters.
>> 2. The machine clock vibrated.
>>  
>> Otherwise,  there are some bugs we don't know.
>>  
>> Best, Hequn
>>  
>> On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>> wrote:
>> Hi Timo,
>>  
>> This problem happens 4-5 times a day on our online server, with ~15k events per second load, and it is using PROCESSING time. So I don’t think I can stably reproduce the issue on my local machine.
>> The user ids are actually the same, I have doubled checked that.
>>  
>> Now, I am wondering could it possible that, after a window fires, some last events came but that still fall to the time range of the just fired window, hence new windows are assigned, and fired later. This can explain why the extra records always contain only a few events (cnt is small).
>>  
>> To verify that, I just modified the SQL to also output the MIN timestamp of each windows, and I found the MIN timestamp of theextra records are always point to the LAST second of the window.
>> Here is the output I just got, note 1531395119 is the last second of a 2-minute window start from 1531395000.
>> --------------------------------------------------------------------------------------------------------------------------------
>> {"timestamp":1531394760000,"cnt":1536013,"userId":"user01","min_sec":1531394760}
>> {"timestamp":1531394880000,"cnt":1459623,"userId":"user01","min_sec":1531394879}
>> {"timestamp":1531395000000,"cnt":1446010,"userId":"user01","min_sec":1531395000}
>> {"timestamp":1531395000000,"cnt":7,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>>  
>> The modified SQL:
>> INSERT INTO sink
>> SELECT
>>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`, 
>>                 count(vehicleId) AS cnt, userId,
>>                 MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec
>> FROM source
>> GROUP BY
>>                 TUMBLE(rowtime, INTERVAL '2' MINUTE),
>>                 userId
>>  
>> thanks
>> Youjun
>>  
>> 发件人: Timo Walther <twalthr@apache.org <ma...@apache.org>> 
>> 发送时间: Thursday, July 12, 2018 5:02 PM
>> 收件人: user@flink.apache.org <ma...@flink.apache.org>
>> 主题: Re: TumblingProcessingTimeWindow emits extra results for a same window
>>  
>> Hi Yuan,
>> 
>> this sounds indeed weird. The SQL API uses regular DataStream API windows underneath so this problem should have come up earlier if this is problem in the implementation. Does this behavior reproducible on your local machine?
>> 
>> One thing that comes to my mind is that the "userId"s might not be 100% identical (same hashCode/equals method) because otherwise they would be properly grouped.
>> 
>> Regards,
>> Timo
>> 
>> Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
>> Hi community,
>>  
>> I have a job which counts event number every 2 minutes, with TumblingWindow in ProcessingTime. However, it occasionally produces extra DUPLICATED records. For instance, for timestamp 1531368480000 below, it emits a normal result (cnt=1641161), and then followed by a few more records with very small result (2, 3, etc).
>>  
>> Can anyone shed some light on the possible reason, or how to fix it?
>>  
>> Below are the sample output.
>> -----------------------------------------------------------
>> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
>> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":1641161,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":2,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":3,"userId":"user01"}
>> {"timestamp":1531368480000,"cnt":3,"userId":"user01"}
>>  
>> And here is the job SQL:
>> -----------------------------------------------------------
>> INSERT INTO sink
>> SELECT
>>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>>                 count(vehicleId) AS cnt,
>>                 userId
>> FROM source
>>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>>                 userId
>>  
>> Thanks,
>> Youjun Yuan
> 


Re: TumblingProcessingTimeWindow emits extra results for a same window

Posted by Aljoscha Krettek <al...@apache.org>.
I think there is a bug in how processing-time timers work. For event-time, we fire timers when the watermark is >= the timestamp, this is correct because a watermark T says that we will not see elements with a timestamp smaller or equal to T. For processing time, a time of T does not say that we won't see an element with timestamp T. Therefore the triggering behaviour is wrong for processing time. I created a Jira issue for this: https://issues.apache.org/jira/browse/FLINK-9857

Best,
Aljoscha

> On 16. Jul 2018, at 07:36, Yuan,Youjun <yu...@baidu.com> wrote:
> 
> Hi Hequn,
>  
> To my understand, a processing time window is fired at the last millisecond of the window(maxTimestamp). Then what will happen if more elements arrive at the last millisecond, but AFTER the window is fired?
>  
> Thanks,
> Youjun
> 发件人: Hequn Cheng <ch...@gmail.com> 
> 发送时间: Friday, July 13, 2018 9:44 PM
> 收件人: Yuan,Youjun <yu...@baidu.com>
> 抄送: Timo Walther <tw...@apache.org>; user@flink.apache.org
> 主题: Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window
>  
> Hi Youjun,
>  
> The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the rowtime value of window. Sql will be parsed and translated into some nodes, Source -> Calc -> Window -> Sink. The Calc is the input node of Window and the udf is part of Calc instead of Window. So the max_ts and min_ts is actually the time before entering the window, i.e, not the time in window.
>  
> However, I still can't find anything valuable to solve the problem. It seems the window has been triggered many times for the same key. I will think more about it.
>  
> Best, Hequn.
>  
> On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>> wrote:
> Hi Hequn,
>  
> I am using Flink 1.4. The job was running with  1 parallelism.
>  
> I don’t think the extra records are caused by different keys, because:
> I ran 2 jobs consuming the same source, jobA with 2-minute window, and job with 4-minute window. If there are wired keys, then jobA will get no more records than jobB, for the same period. But that not true, jobA got 17 records while jobB got 11. Relevant results could be found below.
> For each window, I output the min and max timestamp, and found that those extra records always start at the last few milliseconds of the 2 or 4-minte windows, just before window got closed. I also noticed the windows did not have a clear cut between minutes, as we can see in jobA’s output, ts 1531448399978 appears in 18 result records, either as start, or end, or both.
>  
> jobA(2-minute window) output
> {"timestamp":1531448040000,"cnt":1668052,"userId":"user01","min_ts":1531448040003,"max_ts":1531448159985}
> {"timestamp":1531448160000,"cnt":1613188,"userId":"user01","min_ts":1531448159985,"max_ts":1531448279979}
> {"timestamp":1531448280000,"cnt":1664652,"userId":"user01","min_ts":1531448280004,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":1531448399978,"max_ts":1531448519978}
>  
> jobB(4-minute window) output
> {"timestamp":1531447920000,"cnt":3306838,"userId":"user01","min_ts":1531447919981,"max_ts":1531448159975}
> {"timestamp":1531448160000,"cnt":3278178,"userId":"user01","min_ts":1531448159098,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":4,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":5,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
> {"timestamp":1531448160000,"cnt":8,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":7,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
> {"timestamp":1531448400000,"cnt":3226735,"userId":"user01","min_ts":1531448399978,"max_ts":1531448639916}
>  
> Thanks
> Youjun
>  
> 发件人: Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> 
> 发送时间: Thursday, July 12, 2018 11:31 PM
> 收件人: Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>>
> 抄送: Timo Walther <twalthr@apache.org <ma...@apache.org>>; user@flink.apache.org <ma...@flink.apache.org>
> 主题: Re: 答复: TumblingProcessingTimeWindow emits extra results for a same window
>  
> Hi Yuan,
>  
> Haven't heard about this before. Which flink version do you use? The cause may be:
> 1. userId not 100% identical, for example contains invisible characters.
> 2. The machine clock vibrated.
>  
> Otherwise,  there are some bugs we don't know.
>  
> Best, Hequn
>  
> On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yuanyoujun@baidu.com <ma...@baidu.com>> wrote:
> Hi Timo,
>  
> This problem happens 4-5 times a day on our online server, with ~15k events per second load, and it is using PROCESSING time. So I don’t think I can stably reproduce the issue on my local machine.
> The user ids are actually the same, I have doubled checked that.
>  
> Now, I am wondering could it possible that, after a window fires, some last events came but that still fall to the time range of the just fired window, hence new windows are assigned, and fired later. This can explain why the extra records always contain only a few events (cnt is small).
>  
> To verify that, I just modified the SQL to also output the MIN timestamp of each windows, and I found the MIN timestamp of theextra records are always point to the LAST second of the window.
> Here is the output I just got, note 1531395119 is the last second of a 2-minute window start from 1531395000.
> --------------------------------------------------------------------------------------------------------------------------------
> {"timestamp":1531394760000,"cnt":1536013,"userId":"user01","min_sec":1531394760}
> {"timestamp":1531394880000,"cnt":1459623,"userId":"user01","min_sec":1531394879}
> {"timestamp":1531395000000,"cnt":1446010,"userId":"user01","min_sec":1531395000}
> {"timestamp":1531395000000,"cnt":7,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>  
> The modified SQL:
> INSERT INTO sink
> SELECT
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`, 
>                 count(vehicleId) AS cnt, userId,
>                 MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec
> FROM source
> GROUP BY
>                 TUMBLE(rowtime, INTERVAL '2' MINUTE),
>                 userId
>  
> thanks
> Youjun
>  
> 发件人: Timo Walther <twalthr@apache.org <ma...@apache.org>> 
> 发送时间: Thursday, July 12, 2018 5:02 PM
> 收件人: user@flink.apache.org <ma...@flink.apache.org>
> 主题: Re: TumblingProcessingTimeWindow emits extra results for a same window
>  
> Hi Yuan,
> 
> this sounds indeed weird. The SQL API uses regular DataStream API windows underneath so this problem should have come up earlier if this is problem in the implementation. Does this behavior reproducible on your local machine?
> 
> One thing that comes to my mind is that the "userId"s might not be 100% identical (same hashCode/equals method) because otherwise they would be properly grouped.
> 
> Regards,
> Timo
> 
> Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
> Hi community,
>  
> I have a job which counts event number every 2 minutes, with TumblingWindow in ProcessingTime. However, it occasionally produces extra DUPLICATED records. For instance, for timestamp 1531368480000 below, it emits a normal result (cnt=1641161), and then followed by a few more records with very small result (2, 3, etc).
>  
> Can anyone shed some light on the possible reason, or how to fix it?
>  
> Below are the sample output.
> -----------------------------------------------------------
> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
> {"timestamp":1531368480000,"cnt":1641161,"userId":"user01"}
> {"timestamp":1531368480000,"cnt":2,"userId":"user01"}
> {"timestamp":1531368480000,"cnt":3,"userId":"user01"}
> {"timestamp":1531368480000,"cnt":3,"userId":"user01"}
>  
> And here is the job SQL:
> -----------------------------------------------------------
> INSERT INTO sink
> SELECT
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>                 count(vehicleId) AS cnt,
>                 userId
> FROM source
>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>                 userId
>  
> Thanks,
> Youjun Yuan


答复: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window

Posted by "Yuan,Youjun" <yu...@baidu.com>.
Hi Hequn,

To my understand, a processing time window is fired at the last millisecond of the window(maxTimestamp). Then what will happen if more elements arrive at the last millisecond, but AFTER the window is fired?

Thanks,
Youjun
发件人: Hequn Cheng <ch...@gmail.com>
发送时间: Friday, July 13, 2018 9:44 PM
收件人: Yuan,Youjun <yu...@baidu.com>
抄送: Timo Walther <tw...@apache.org>; user@flink.apache.org
主题: Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window

Hi Youjun,

The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the rowtime value of window. Sql will be parsed and translated into some nodes, Source -> Calc -> Window -> Sink. The Calc is the input node of Window and the udf is part of Calc instead of Window. So the max_ts and min_ts is actually the time before entering the window, i.e, not the time in window.

However, I still can't find anything valuable to solve the problem. It seems the window has been triggered many times for the same key. I will think more about it.

Best, Hequn.

On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun <yu...@baidu.com>> wrote:
Hi Hequn,

I am using Flink 1.4. The job was running with  1 parallelism.

I don’t think the extra records are caused by different keys, because:

  1.  I ran 2 jobs consuming the same source, jobA with 2-minute window, and job with 4-minute window. If there are wired keys, then jobA will get no more records than jobB, for the same period. But that not true, jobA got 17 records while jobB got 11. Relevant results could be found below.
  2.  For each window, I output the min and max timestamp, and found that those extra records always start at the last few milliseconds of the 2 or 4-minte windows, just before window got closed. I also noticed the windows did not have a clear cut between minutes, as we can see in jobA’s output, ts 1531448399978 appears in 18 result records, either as start, or end, or both.

jobA(2-minute window) output
{"timestamp":1531448040000,"cnt":1668052,"userId":"user01","min_ts":1531448040003,"max_ts":1531448159985}
{"timestamp":1531448160000,"cnt":1613188,"userId":"user01","min_ts":1531448159985,"max_ts":1531448279979}
{"timestamp":1531448280000,"cnt":1664652,"userId":"user01","min_ts":1531448280004,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":1531448399978,"max_ts":1531448519978}

jobB(4-minute window) output
{"timestamp":1531447920000,"cnt":3306838,"userId":"user01","min_ts":1531447919981,"max_ts":1531448159975}
{"timestamp":1531448160000,"cnt":3278178,"userId":"user01","min_ts":1531448159098,"max_ts":1531448399977}
{"timestamp":1531448160000,"cnt":4,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
{"timestamp":1531448160000,"cnt":5,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
{"timestamp":1531448160000,"cnt":8,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":7,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448400000,"cnt":3226735,"userId":"user01","min_ts":1531448399978,"max_ts":1531448639916}

Thanks
Youjun

发件人: Hequn Cheng <ch...@gmail.com>>
发送时间: Thursday, July 12, 2018 11:31 PM
收件人: Yuan,Youjun <yu...@baidu.com>>
抄送: Timo Walther <tw...@apache.org>>; user@flink.apache.org<ma...@flink.apache.org>
主题: Re: 答复: TumblingProcessingTimeWindow emits extra results for a same window

Hi Yuan,

Haven't heard about this before. Which flink version do you use? The cause may be:
1. userId not 100% identical, for example contains invisible characters.
2. The machine clock vibrated.

Otherwise,  there are some bugs we don't know.

Best, Hequn

On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yu...@baidu.com>> wrote:
Hi Timo,

This problem happens 4-5 times a day on our online server, with ~15k events per second load, and it is using PROCESSING time. So I don’t think I can stably reproduce the issue on my local machine.
The user ids are actually the same, I have doubled checked that.

Now, I am wondering could it possible that, after a window fires, some last events came but that still fall to the time range of the just fired window, hence new windows are assigned, and fired later. This can explain why the extra records always contain only a few events (cnt is small).

To verify that, I just modified the SQL to also output the MIN timestamp of each windows, and I found the MIN timestamp of the extra records are always point to the LAST second of the window.
Here is the output I just got, note 1531395119 is the last second of a 2-minute window start from 1531395000.
--------------------------------------------------------------------------------------------------------------------------------
{"timestamp":1531394760000,"cnt":1536013,"userId":"user01","min_sec":1531394760}
{"timestamp":1531394880000,"cnt":1459623,"userId":"user01","min_sec":1531394879}
{"timestamp":1531395000000,"cnt":1446010,"userId":"user01","min_sec":1531395000}
{"timestamp":1531395000000,"cnt":7,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}

The modified SQL:
INSERT INTO sink
SELECT
                TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
                count(vehicleId) AS cnt, userId,
                MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec
FROM source
GROUP BY
                TUMBLE(rowtime, INTERVAL '2' MINUTE),
                userId

thanks
Youjun

发件人: Timo Walther <tw...@apache.org>>
发送时间: Thursday, July 12, 2018 5:02 PM
收件人: user@flink.apache.org<ma...@flink.apache.org>
主题: Re: TumblingProcessingTimeWindow emits extra results for a same window

Hi Yuan,

this sounds indeed weird. The SQL API uses regular DataStream API windows underneath so this problem should have come up earlier if this is problem in the implementation. Does this behavior reproducible on your local machine?

One thing that comes to my mind is that the "userId"s might not be 100% identical (same hashCode/equals method) because otherwise they would be properly grouped.

Regards,
Timo

Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
Hi community,

I have a job which counts event number every 2 minutes, with TumblingWindow in ProcessingTime. However, it occasionally produces extra DUPLICATED records. For instance, for timestamp 1531368480000 below, it emits a normal result (cnt=1641161), and then followed by a few more records with very small result (2, 3, etc).

Can anyone shed some light on the possible reason, or how to fix it?

Below are the sample output.
-----------------------------------------------------------
{"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
{"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
{"timestamp":1531368480000,"cnt":1641161,"userId":"user01"}
{"timestamp":1531368480000,"cnt":2,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}

And here is the job SQL:
-----------------------------------------------------------
INSERT INTO sink
SELECT
                TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
                count(vehicleId) AS cnt,
                userId
FROM source
                GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
                userId

Thanks,
Youjun Yuan





Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Youjun,

The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the
rowtime value of window. Sql will be parsed and translated into some nodes,
Source -> Calc -> Window -> Sink. The Calc is the input node of Window and
the udf is part of Calc instead of Window. So the max_ts and min_ts is
actually the time before entering the window, i.e, not the time in window.

However, I still can't find anything valuable to solve the problem. It
seems the window has been triggered many times for the same key. I will
think more about it.

Best, Hequn.

On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun <yu...@baidu.com> wrote:

> Hi Hequn,
>
>
>
> I am using Flink 1.4. The job was running with  1 parallelism.
>
>
>
> I don’t think the extra records are caused by different keys, because:
>
>    1. I ran 2 jobs consuming the same source, jobA with 2-minute window,
>    and job with 4-minute window. If there are wired keys, then jobA will get
>    no more records than jobB, for the same period. But that not true, *jobA
>    got 17* records while *jobB got 11*. Relevant results could be found
>    below.
>    2. For each window, I output the *min and max timestamp*, and found
>    that those extra records always start at the last few milliseconds of the 2
>    or 4-minte windows, just before window got closed. I also noticed the
>    windows did not have a clear cut between minutes, as we can see in jobA’s
>    output, ts *1531448399978* appears in 18 result records, either as
>    start, or end, or both.
>
>
>
> jobA(2-minute window) output
>
> {"timestamp":1531448040000,"cnt":1668052,"userId":"user01"
> ,"min_ts":1531448040003,"max_ts":1531448159985}
>
> {"timestamp":1531448160000,"cnt":1613188,"userId":"user01"
> ,"min_ts":1531448159985,"max_ts":1531448279979}
>
> {"timestamp":1531448280000,"cnt":1664652,"userId":"user01"
> ,"min_ts":1531448280004,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":*1531448399978*}
>
> {"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":
> *1531448399978*,"max_ts":1531448519978}
>
>
>
> jobB(4-minute window) output
>
> {"timestamp":1531447920000,"cnt":3306838,"userId":"user01"
> ,"min_ts":1531447919981,"max_ts":1531448159975}
>
> {"timestamp":1531448160000,"cnt":3278178,"userId":"user01"
> ,"min_ts":1531448159098,"max_ts":1531448399977}
>
> {"timestamp":1531448160000,"cnt":4,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
>
> {"timestamp":1531448160000,"cnt":5,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399977}
>
> {"timestamp":1531448160000,"cnt":8,"userId":"user01","min_
> ts":1531448399977,"max_ts":1531448399978}
>
> {"timestamp":1531448160000,"cnt":7,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":1531448160000,"cnt":2,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":1531448160000,"cnt":3,"userId":"user01","min_
> ts":1531448399978,"max_ts":1531448399978}
>
> {"timestamp":1531448400000,"cnt":3226735,"userId":"user01"
> ,"min_ts":1531448399978,"max_ts":1531448639916}
>
>
>
> Thanks
>
> Youjun
>
>
>
> *发件人**:* Hequn Cheng <ch...@gmail.com>
> *发送时间:* Thursday, July 12, 2018 11:31 PM
> *收件人:* Yuan,Youjun <yu...@baidu.com>
> *抄送:* Timo Walther <tw...@apache.org>; user@flink.apache.org
> *主题:* Re: 答复: TumblingProcessingTimeWindow emits extra results for a same
> window
>
>
>
> Hi Yuan,
>
>
>
> Haven't heard about this before. Which flink version do you use? The
> cause may be:
>
> 1. userId not 100% identical, for example contains invisible characters.
>
> 2. The machine clock vibrated.
>
>
>
> Otherwise,  there are some bugs we don't know.
>
>
>
> Best, Hequn
>
>
>
> On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yu...@baidu.com> wrote:
>
> Hi Timo,
>
>
>
> This problem happens 4-5 times a day on our online server, with ~15k
> events per second load, and it is using PROCESSING time. So I don’t think I
> can stably reproduce the issue on my local machine.
>
> The user ids are actually the same, I have doubled checked that.
>
>
>
> Now, I am wondering could it possible that, after a window fires, some
> last events came but that still fall to the time range of the just fired
> window, hence new windows are assigned, and fired later. This can explain
> why the extra records always contain only a few events (cnt is small).
>
>
>
> To verify that, I just modified the SQL to also output the MIN timestamp
> of each windows, and I found the MIN timestamp of the *extra records are
> always point to the LAST second of the window*.
>
> Here is the output I just got, note *1531395119 *is the last second of a
> 2-minute window start from* 1531395000.*
>
> ------------------------------------------------------------
> --------------------------------------------------------------------
>
> {"timestamp":1531394760000,"cnt":1536013,"userId":"user01"
> ,"min_sec":1531394760}
>
> {"timestamp":1531394880000,"cnt":1459623,"userId":"user01"
> ,"min_sec":1531394879}
>
> {"timestamp":*1531395000000*,"cnt":*1446010*,"userId":"user01","min_sec":
> *1531395000*}
>
> {"timestamp":*1531395000000*,"cnt":*7*,"userId":"user01","min_sec":
> *1531395119*}
>
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
>
>
> The modified SQL:
>
> INSERT INTO sink
>
> SELECT
>
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS
> `timestamp`,
>
>                 count(vehicleId) AS cnt, userId,
>
>                 *MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec*
>
> FROM source
>
> GROUP BY
>
>                 TUMBLE(rowtime, INTERVAL '2' MINUTE),
>
>                 userId
>
>
>
> thanks
>
> Youjun
>
>
>
> *发件人**:* Timo Walther <tw...@apache.org>
> *发送时间:* Thursday, July 12, 2018 5:02 PM
> *收件人:* user@flink.apache.org
> *主题:* Re: TumblingProcessingTimeWindow emits extra results for a same
> window
>
>
>
> Hi Yuan,
>
> this sounds indeed weird. The SQL API uses regular DataStream API windows
> underneath so this problem should have come up earlier if this is problem
> in the implementation. Does this behavior reproducible on your local
> machine?
>
> One thing that comes to my mind is that the "userId"s might not be 100%
> identical (same hashCode/equals method) because otherwise they would be
> properly grouped.
>
> Regards,
> Timo
>
> Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
>
> Hi community,
>
>
>
> I have a job which counts event number every 2 minutes, with
> TumblingWindow in ProcessingTime. However, it occasionally produces extra
> DUPLICATED records. For instance, for timestamp 1531368480000 below, it
> emits a normal result (cnt=1641161), and then followed by a few more
> records with very small result (2, 3, etc).
>
>
>
> Can anyone shed some light on the possible reason, or how to fix it?
>
>
>
> Below are the sample output.
>
> -----------------------------------------------------------
>
> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
>
> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":1641161,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":2,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
>
>
> And here is the job SQL:
>
> -----------------------------------------------------------
>
> INSERT INTO sink
>
> SELECT
>
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>
>                 count(vehicleId) AS cnt,
>
>                 userId
>
> FROM source
>
>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>
>                 userId
>
>
>
> Thanks,
>
> Youjun Yuan
>
>
>
>
>

答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window

Posted by "Yuan,Youjun" <yu...@baidu.com>.
Hi Hequn,

I am using Flink 1.4. The job was running with  1 parallelism.

I don’t think the extra records are caused by different keys, because:

  1.  I ran 2 jobs consuming the same source, jobA with 2-minute window, and job with 4-minute window. If there are wired keys, then jobA will get no more records than jobB, for the same period. But that not true, jobA got 17 records while jobB got 11. Relevant results could be found below.
  2.  For each window, I output the min and max timestamp, and found that those extra records always start at the last few milliseconds of the 2 or 4-minte windows, just before window got closed. I also noticed the windows did not have a clear cut between minutes, as we can see in jobA’s output, ts 1531448399978 appears in 18 result records, either as start, or end, or both.

jobA(2-minute window) output
{"timestamp":1531448040000,"cnt":1668052,"userId":"user01","min_ts":1531448040003,"max_ts":1531448159985}
{"timestamp":1531448160000,"cnt":1613188,"userId":"user01","min_ts":1531448159985,"max_ts":1531448279979}
{"timestamp":1531448280000,"cnt":1664652,"userId":"user01","min_ts":1531448280004,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":4,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448280000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448400000,"cnt":1593435,"userId":"user01","min_ts":1531448399978,"max_ts":1531448519978}

jobB(4-minute window) output
{"timestamp":1531447920000,"cnt":3306838,"userId":"user01","min_ts":1531447919981,"max_ts":1531448159975}
{"timestamp":1531448160000,"cnt":3278178,"userId":"user01","min_ts":1531448159098,"max_ts":1531448399977}
{"timestamp":1531448160000,"cnt":4,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
{"timestamp":1531448160000,"cnt":5,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
{"timestamp":1531448160000,"cnt":8,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":7,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448160000,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":1531448400000,"cnt":3226735,"userId":"user01","min_ts":1531448399978,"max_ts":1531448639916}

Thanks
Youjun

发件人: Hequn Cheng <ch...@gmail.com>
发送时间: Thursday, July 12, 2018 11:31 PM
收件人: Yuan,Youjun <yu...@baidu.com>
抄送: Timo Walther <tw...@apache.org>; user@flink.apache.org
主题: Re: 答复: TumblingProcessingTimeWindow emits extra results for a same window

Hi Yuan,

Haven't heard about this before. Which flink version do you use? The cause may be:
1. userId not 100% identical, for example contains invisible characters.
2. The machine clock vibrated.

Otherwise,  there are some bugs we don't know.

Best, Hequn

On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yu...@baidu.com>> wrote:
Hi Timo,

This problem happens 4-5 times a day on our online server, with ~15k events per second load, and it is using PROCESSING time. So I don’t think I can stably reproduce the issue on my local machine.
The user ids are actually the same, I have doubled checked that.

Now, I am wondering could it possible that, after a window fires, some last events came but that still fall to the time range of the just fired window, hence new windows are assigned, and fired later. This can explain why the extra records always contain only a few events (cnt is small).

To verify that, I just modified the SQL to also output the MIN timestamp of each windows, and I found the MIN timestamp of the extra records are always point to the LAST second of the window.
Here is the output I just got, note 1531395119 is the last second of a 2-minute window start from 1531395000.
--------------------------------------------------------------------------------------------------------------------------------
{"timestamp":1531394760000,"cnt":1536013,"userId":"user01","min_sec":1531394760}
{"timestamp":1531394880000,"cnt":1459623,"userId":"user01","min_sec":1531394879}
{"timestamp":1531395000000,"cnt":1446010,"userId":"user01","min_sec":1531395000}
{"timestamp":1531395000000,"cnt":7,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}

The modified SQL:
INSERT INTO sink
SELECT
                TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
                count(vehicleId) AS cnt, userId,
                MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec
FROM source
GROUP BY
                TUMBLE(rowtime, INTERVAL '2' MINUTE),
                userId

thanks
Youjun

发件人: Timo Walther <tw...@apache.org>>
发送时间: Thursday, July 12, 2018 5:02 PM
收件人: user@flink.apache.org<ma...@flink.apache.org>
主题: Re: TumblingProcessingTimeWindow emits extra results for a same window

Hi Yuan,

this sounds indeed weird. The SQL API uses regular DataStream API windows underneath so this problem should have come up earlier if this is problem in the implementation. Does this behavior reproducible on your local machine?

One thing that comes to my mind is that the "userId"s might not be 100% identical (same hashCode/equals method) because otherwise they would be properly grouped.

Regards,
Timo

Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
Hi community,

I have a job which counts event number every 2 minutes, with TumblingWindow in ProcessingTime. However, it occasionally produces extra DUPLICATED records. For instance, for timestamp 1531368480000 below, it emits a normal result (cnt=1641161), and then followed by a few more records with very small result (2, 3, etc).

Can anyone shed some light on the possible reason, or how to fix it?

Below are the sample output.
-----------------------------------------------------------
{"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
{"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
{"timestamp":1531368480000,"cnt":1641161,"userId":"user01"}
{"timestamp":1531368480000,"cnt":2,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}

And here is the job SQL:
-----------------------------------------------------------
INSERT INTO sink
SELECT
                TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
                count(vehicleId) AS cnt,
                userId
FROM source
                GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
                userId

Thanks,
Youjun Yuan




Re: 答复: TumblingProcessingTimeWindow emits extra results for a same window

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Yuan,

Haven't heard about this before. Which flink version do you use? The cause
may be:
1. userId not 100% identical, for example contains invisible characters.
2. The machine clock vibrated.

Otherwise,  there are some bugs we don't know.

Best, Hequn

On Thu, Jul 12, 2018 at 8:00 PM, Yuan,Youjun <yu...@baidu.com> wrote:

> Hi Timo,
>
>
>
> This problem happens 4-5 times a day on our online server, with ~15k
> events per second load, and it is using PROCESSING time. So I don’t think I
> can stably reproduce the issue on my local machine.
>
> The user ids are actually the same, I have doubled checked that.
>
>
>
> Now, I am wondering could it possible that, after a window fires, some
> last events came but that still fall to the time range of the just fired
> window, hence new windows are assigned, and fired later. This can explain
> why the extra records always contain only a few events (cnt is small).
>
>
>
> To verify that, I just modified the SQL to also output the MIN timestamp
> of each windows, and I found the MIN timestamp of the *extra records are
> always point to the LAST second of the window*.
>
> Here is the output I just got, note *1531395119 *is the last second of a
> 2-minute window start from* 1531395000.*
>
> ------------------------------------------------------------
> --------------------------------------------------------------------
>
> {"timestamp":1531394760000,"cnt":1536013,"userId":"user01"
> ,"min_sec":1531394760}
>
> {"timestamp":1531394880000,"cnt":1459623,"userId":"user01"
> ,"min_sec":1531394879}
>
> {"timestamp":*1531395000000*,"cnt":*1446010*,"userId":"user01","min_sec":
> *1531395000*}
>
> {"timestamp":*1531395000000*,"cnt":*7*,"userId":"user01","min_sec":
> *1531395119*}
>
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
> {"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
>
>
>
> The modified SQL:
>
> INSERT INTO sink
>
> SELECT
>
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS
> `timestamp`,
>
>                 count(vehicleId) AS cnt, userId,
>
>                 *MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec*
>
> FROM source
>
> GROUP BY
>
>                 TUMBLE(rowtime, INTERVAL '2' MINUTE),
>
>                 userId
>
>
>
> thanks
>
> Youjun
>
>
>
> *发件人**:* Timo Walther <tw...@apache.org>
> *发送时间:* Thursday, July 12, 2018 5:02 PM
> *收件人:* user@flink.apache.org
> *主题:* Re: TumblingProcessingTimeWindow emits extra results for a same
> window
>
>
>
> Hi Yuan,
>
> this sounds indeed weird. The SQL API uses regular DataStream API windows
> underneath so this problem should have come up earlier if this is problem
> in the implementation. Does this behavior reproducible on your local
> machine?
>
> One thing that comes to my mind is that the "userId"s might not be 100%
> identical (same hashCode/equals method) because otherwise they would be
> properly grouped.
>
> Regards,
> Timo
>
> Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
>
> Hi community,
>
>
>
> I have a job which counts event number every 2 minutes, with
> TumblingWindow in ProcessingTime. However, it occasionally produces extra
> DUPLICATED records. For instance, for timestamp 1531368480000 below, it
> emits a normal result (cnt=1641161), and then followed by a few more
> records with very small result (2, 3, etc).
>
>
>
> Can anyone shed some light on the possible reason, or how to fix it?
>
>
>
> Below are the sample output.
>
> -----------------------------------------------------------
>
> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
>
> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":1641161,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":2,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
>
>
> And here is the job SQL:
>
> -----------------------------------------------------------
>
> INSERT INTO sink
>
> SELECT
>
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
>
>                 count(vehicleId) AS cnt,
>
>                 userId
>
> FROM source
>
>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>
>                 userId
>
>
>
> Thanks,
>
> Youjun Yuan
>
>
>

答复: TumblingProcessingTimeWindow emits extra results for a same window

Posted by "Yuan,Youjun" <yu...@baidu.com>.
Hi Timo,

This problem happens 4-5 times a day on our online server, with ~15k events per second load, and it is using PROCESSING time. So I don’t think I can stably reproduce the issue on my local machine.
The user ids are actually the same, I have doubled checked that.

Now, I am wondering could it possible that, after a window fires, some last events came but that still fall to the time range of the just fired window, hence new windows are assigned, and fired later. This can explain why the extra records always contain only a few events (cnt is small).

To verify that, I just modified the SQL to also output the MIN timestamp of each windows, and I found the MIN timestamp of the extra records are always point to the LAST second of the window.
Here is the output I just got, note 1531395119 is the last second of a 2-minute window start from 1531395000.
--------------------------------------------------------------------------------------------------------------------------------
{"timestamp":1531394760000,"cnt":1536013,"userId":"user01","min_sec":1531394760}
{"timestamp":1531394880000,"cnt":1459623,"userId":"user01","min_sec":1531394879}
{"timestamp":1531395000000,"cnt":1446010,"userId":"user01","min_sec":1531395000}
{"timestamp":1531395000000,"cnt":7,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":6,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":1531395000000,"cnt":2,"userId":"user01","min_sec":1531395119}

The modified SQL:
INSERT INTO sink
SELECT
                TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
                count(vehicleId) AS cnt, userId,
                MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec
FROM source
GROUP BY
                TUMBLE(rowtime, INTERVAL '2' MINUTE),
                userId

thanks
Youjun

发件人: Timo Walther <tw...@apache.org>
发送时间: Thursday, July 12, 2018 5:02 PM
收件人: user@flink.apache.org
主题: Re: TumblingProcessingTimeWindow emits extra results for a same window

Hi Yuan,

this sounds indeed weird. The SQL API uses regular DataStream API windows underneath so this problem should have come up earlier if this is problem in the implementation. Does this behavior reproducible on your local machine?

One thing that comes to my mind is that the "userId"s might not be 100% identical (same hashCode/equals method) because otherwise they would be properly grouped.

Regards,
Timo

Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
Hi community,

I have a job which counts event number every 2 minutes, with TumblingWindow in ProcessingTime. However, it occasionally produces extra DUPLICATED records. For instance, for timestamp 1531368480000 below, it emits a normal result (cnt=1641161), and then followed by a few more records with very small result (2, 3, etc).

Can anyone shed some light on the possible reason, or how to fix it?

Below are the sample output.
-----------------------------------------------------------
{"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
{"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
{"timestamp":1531368480000,"cnt":1641161,"userId":"user01"}
{"timestamp":1531368480000,"cnt":2,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}
{"timestamp":1531368480000,"cnt":3,"userId":"user01"}

And here is the job SQL:
-----------------------------------------------------------
INSERT INTO sink
SELECT
                TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
                count(vehicleId) AS cnt,
                userId
FROM source
                GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
                userId

Thanks,
Youjun Yuan



Re: TumblingProcessingTimeWindow emits extra results for a same window

Posted by Timo Walther <tw...@apache.org>.
Hi Yuan,

this sounds indeed weird. The SQL API uses regular DataStream API 
windows underneath so this problem should have come up earlier if this 
is problem in the implementation. Does this behavior reproducible on 
your local machine?

One thing that comes to my mind is that the "userId"s might not be 100% 
identical (same hashCode/equals method) because otherwise they would be 
properly grouped.

Regards,
Timo

Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
>
> Hi community,
>
> I have a job which counts event number every 2 minutes, with 
> TumblingWindow in ProcessingTime. However, it occasionally produces 
> extra DUPLICATED records. For instance, for timestamp 1531368480000 
> below, it emits a normal result (cnt=1641161), and then followed by a 
> few more records with very small result (2, 3, etc).
>
> Can anyone shed some light on the possible reason, or how to fix it?
>
> Below are the sample output.
>
> -----------------------------------------------------------
>
> {"timestamp":1531368240000,"cnt":1537821,"userId":"user01"}
>
> {"timestamp":1531368360000,"cnt":1521464,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":1641161,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":2,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
> {"timestamp":*1531368480000*,"cnt":3,"userId":"user01"}
>
> And here is the job SQL:
>
> -----------------------------------------------------------
>
> INSERT INTO sink
>
> SELECT
>
>                 TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS 
> `timestamp`,
>
>                 count(vehicleId) AS cnt,
>
>                 userId
>
> FROM source
>
>                 GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
>
>                 userId
>
> Thanks,
>
> Youjun Yuan
>