You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2019/09/02 13:25:07 UTC

Re: OVER operator filtering out records

Hi Vinod,

The metrics that Flink collects are not consistent across the job, for
example the reporting intervals of operators are not synchronized.
The records might be currently in some send or receive buffer or "on the
wire" or still buffered in state because the watermark was not advanced yet.

I'd recommend to compare the actual input and output data.
The metrics are good for rough checks (is the application processing data,
are watermarks as expected, etc.).

Best, Fabian




Am Do., 29. Aug. 2019 um 00:38 Uhr schrieb Vinod Mehra <vm...@lyft.com>:

> Thanks Fabian for your detailed reply!
>
> > My recommendation would be to ensure that you have only one source that
> reads the records and assigns watermarks (maybe even keep the parallelism
> of the whole query to 1 if possible).
>
> Actually I have already experimented by reducing the parallelism to 1 as
> you can see in the following snapshot:
>
> [image: image.png]
> records received = 122
> records sent = 115
> parallelism = 1
>
> This implies there are 7 stuck records (unless it's a stats issue???). We
> are using event time based watermarking and keep it behind by 5 seconds
> from the latest event-time. I do see that watermark advances properly
> whenever a new event arrives. For this specific job the events arrive once
> in several hours, I was hoping to see only one undelivered event max, but
> the above stats give me a larger number. I am experimenting by reducing the
> 5 second delay to 0 to see if that changes anything.
>
> > Moreover, you might want to think about a more aggressive watermarking
> strategy that advances even if there is no data received based on
> processing time.
>
> Yes I am planning to experiment with this as well.
>
> Thank,
> Vinod
>
> On Mon, Aug 26, 2019 at 2:31 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vinod,
>>
>> This sounds like a watermark issue to me.
>> The commonly used watermark strategies (like bounded out-of-order) are
>> only advancing when there is a new record.
>> Moreover, the current watermark is the minimum of the current watermarks
>> of all input partitions.
>> So, the watermark only moves forward if the watermark of the
>> "most-behind" partition advances.
>> If you have many parallel partitions and only very few records every
>> hour, it might take a long time until "the right" partition processes a new
>> record and hence advances its watermark.
>>
>> My recommendation would be to ensure that you have only one source that
>> reads the records and assigns watermarks (maybe even keep the parallelism
>> of the whole query to 1 if possible).
>> Moreover, you might want to think about a more aggressive watermarking
>> strategy that advances even if there is no data received based on
>> processing time.
>>
>> Best, Fabian
>>
>> Am So., 25. Aug. 2019 um 20:51 Uhr schrieb Vinod Mehra <vm...@lyft.com>:
>>
>>> [image: image.png]
>>>
>>> When there are new events the old events just get stuck for many hours
>>> (more than a day). So if there is a buffering going on it seems it is not
>>> time based but size based (?). Looks like unless the buffered events exceed
>>> a certain threshold they don't get flushed out (?). Is that what is going
>>> on? Can someone confirm? Is there a way to flush out periodically?
>>>
>>> Thanks,
>>> Vinod
>>>
>>> On Fri, Aug 23, 2019 at 10:37 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>
>>>> Although things improved during bootstrapping and when even volume was
>>>> larger. As soon as the traffic slowed down the events are getting stuck
>>>> (buffered?) at the OVER operator for a very long time. Several hours.
>>>>
>>>> On Fri, Aug 23, 2019 at 5:04 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>>
>>>>> (Forgot to mention that we are using Flink 1.4)
>>>>>
>>>>> Update: Earlier the OVER operator was assigned a parallelism of 64. I
>>>>> reduced it to 1 and the problem went away! Now the OVER operator is not
>>>>> filtering/buffering the events anymore.
>>>>>
>>>>> Can someone explain this please?
>>>>>
>>>>> Thanks,
>>>>> Vinod
>>>>>
>>>>> On Fri, Aug 23, 2019 at 3:09 PM Vinod Mehra <vm...@lyft.com> wrote:
>>>>>
>>>>>> We have a SQL based flink job which is consume a very low volume
>>>>>> stream (1 or 2 events in few hours):
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *SELECT user_id,    COUNT(*) OVER (PARTITION BY user_id ORDER BY
>>>>>> rowtime RANGE INTERVAL '30' DAY PRECEDING) as count_30_days,
>>>>>> COALESCE(occurred_at, logged_at) AS latency_marker,    rowtimeFROM
>>>>>> event_fooWHERE user_id IS NOT NULL*
>>>>>>
>>>>>> The OVER operator seems to filter out events as per the flink
>>>>>> dashboard (records received = <non-zero-number> records sent = 0)
>>>>>>
>>>>>> The operator looks like this:
>>>>>>
>>>>>> *over: (PARTITION BY: $1, ORDER BY: rowtime, RANGEBETWEEN 2592000000
>>>>>> PRECEDING AND CURRENT ROW, select: (rowtime, $1, $2, COUNT(*) AS w0$o0)) ->
>>>>>> select: ($1 AS user_id, w0$o0 AS count_30_days, $2 AS latency_marker,
>>>>>> rowtime) -> to: Tuple2 -> Filter -> group_counter_count_30d.1.sqlRecords ->
>>>>>> sample_without_formatter*
>>>>>>
>>>>>> I know that the OVER operator can discard late arriving events, but
>>>>>> these events are not arriving late for sure. The watermark for all
>>>>>> operators stay at 0 because the output events is 0.
>>>>>>
>>>>>> We have an exactly same SQL job against a high volume stream that is
>>>>>> working fine. Watermarks progress in timely manner and events are delivered
>>>>>> in timely manner as well.
>>>>>>
>>>>>> Any idea what could be going wrong? Are the events getting buffered
>>>>>> waiting for certain number of events? If so, what is the threshold?
>>>>>>
>>>>>> Thanks,
>>>>>> Vinod
>>>>>>
>>>>>