You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Reem Razak via user <us...@flink.apache.org> on 2023/03/29 15:17:58 UTC

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Hey there!

We are seeing a second Flink pipeline encountering similar issues when
configuring both `withWatermarkAlignment` and `withIdleness`. The
unexpected behaviour gets triggered after a Kafka cluster failover. Any
thoughts on there being an incompatibility between the two?

Thanks!

On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com> wrote:

> Hi there,
>
> We are integrating the watermark alignment feature into a pipeline with a
> Kafka source during a "backfill"- i.e. playing from an earlier Kafka
> offset. While testing, we noticed some unexpected behaviour in the
> watermark advancement which was resolved by removing `withIdleness` from
> our watermark strategy.
>
>
>     val watermarkStrategy = WatermarkStrategy
>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>       .withTimestampAssigner(new TimestampedEventTimestampAssigner[Event])
>       .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(1))
>       .withIdleness(Duration.ofMinutes(5))
>
> I have attached a couple of screenshots of the watermarkAlignmentDrift
> metric. As you can see, the behaviour seems normal until a sudden drop in
> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
> records completely from the source. Furthermore, the logs originating from
> from
> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>
> We found that modifying the `updateInterval` passed into the alignment
> parameters seemed to correlate with how long the pipeline would operate
> before stopping - a larger interval of 20 minutes would encounter the issue
> later than an interval of 1 second.
>
> We are wondering if a bug exists when using both `withIdleness` and
> `withWatermarkAlignment`. Might it be related to
> https://issues.apache.org/jira/browse/FLINK-28975, or is there possibly a
> race condition in the watermark emission? We do not necessarily need to
> have both configured at the same time, but we were also surprised by the
> behaviour of the application. Has anyone run into a similar issue or have
> further insight?
>
> Much Appreciated,
> - Reem
>
>
>
>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Reem Razak via user <us...@flink.apache.org>.
This sounds very much like our issue, thank you! We will follow along with
the bug.

Much appreciated,
- Reem

On Thu, Mar 30, 2023 at 9:20 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Reem
>
> My thinking is that this might be related to recently reported
> https://issues.apache.org/jira/browse/FLINK-31632.
>
> Best regards,
>
> Martijn
>
> On Wed, Mar 29, 2023 at 7:07 PM Reem Razak via user <us...@flink.apache.org>
> wrote:
>
>> Hey Martijn,
>>
>> The version is 1.16.0
>>
>> On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <ma...@apache.org>
>> wrote:
>>
>>> Hi Reem,
>>>
>>> What's the Flink version where you're encountering this issue?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <
>>> user@flink.apache.org> wrote:
>>>
>>>> Hey there!
>>>>
>>>> We are seeing a second Flink pipeline encountering similar issues when
>>>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>>>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>>>> thoughts on there being an incompatibility between the two?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> We are integrating the watermark alignment feature into a pipeline
>>>>> with a Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>>>>> offset. While testing, we noticed some unexpected behaviour in the
>>>>> watermark advancement which was resolved by removing `withIdleness` from
>>>>> our watermark strategy.
>>>>>
>>>>>
>>>>>     val watermarkStrategy = WatermarkStrategy
>>>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>>>       .withTimestampAssigner(new
>>>>> TimestampedEventTimestampAssigner[Event])
>>>>>       .withWatermarkAlignment("alignment-group-1",
>>>>> Duration.ofMinutes(1))
>>>>>       .withIdleness(Duration.ofMinutes(5))
>>>>>
>>>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>>>> records completely from the source. Furthermore, the logs originating from
>>>>> from
>>>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>>>
>>>>> We found that modifying the `updateInterval` passed into the alignment
>>>>> parameters seemed to correlate with how long the pipeline would operate
>>>>> before stopping - a larger interval of 20 minutes would encounter the issue
>>>>> later than an interval of 1 second.
>>>>>
>>>>> We are wondering if a bug exists when using both `withIdleness` and
>>>>> `withWatermarkAlignment`. Might it be related to
>>>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there
>>>>> possibly a race condition in the watermark emission? We do not necessarily
>>>>> need to have both configured at the same time, but we were also surprised
>>>>> by the behaviour of the application. Has anyone run into a similar issue or
>>>>> have further insight?
>>>>>
>>>>> Much Appreciated,
>>>>> - Reem
>>>>>
>>>>>
>>>>>
>>>>>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Martijn Visser <ma...@apache.org>.
Hi Reem

My thinking is that this might be related to recently reported
https://issues.apache.org/jira/browse/FLINK-31632.

Best regards,

Martijn

On Wed, Mar 29, 2023 at 7:07 PM Reem Razak via user <us...@flink.apache.org>
wrote:

> Hey Martijn,
>
> The version is 1.16.0
>
> On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi Reem,
>>
>> What's the Flink version where you're encountering this issue?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <
>> user@flink.apache.org> wrote:
>>
>>> Hey there!
>>>
>>> We are seeing a second Flink pipeline encountering similar issues when
>>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>>> thoughts on there being an incompatibility between the two?
>>>
>>> Thanks!
>>>
>>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> We are integrating the watermark alignment feature into a pipeline with
>>>> a Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>>>> offset. While testing, we noticed some unexpected behaviour in the
>>>> watermark advancement which was resolved by removing `withIdleness` from
>>>> our watermark strategy.
>>>>
>>>>
>>>>     val watermarkStrategy = WatermarkStrategy
>>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>>       .withTimestampAssigner(new
>>>> TimestampedEventTimestampAssigner[Event])
>>>>       .withWatermarkAlignment("alignment-group-1",
>>>> Duration.ofMinutes(1))
>>>>       .withIdleness(Duration.ofMinutes(5))
>>>>
>>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>>> records completely from the source. Furthermore, the logs originating from
>>>> from
>>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>>
>>>> We found that modifying the `updateInterval` passed into the alignment
>>>> parameters seemed to correlate with how long the pipeline would operate
>>>> before stopping - a larger interval of 20 minutes would encounter the issue
>>>> later than an interval of 1 second.
>>>>
>>>> We are wondering if a bug exists when using both `withIdleness` and
>>>> `withWatermarkAlignment`. Might it be related to
>>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there
>>>> possibly a race condition in the watermark emission? We do not necessarily
>>>> need to have both configured at the same time, but we were also surprised
>>>> by the behaviour of the application. Has anyone run into a similar issue or
>>>> have further insight?
>>>>
>>>> Much Appreciated,
>>>> - Reem
>>>>
>>>>
>>>>
>>>>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Reem Razak via user <us...@flink.apache.org>.
Hey Martijn,

The version is 1.16.0

On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <ma...@apache.org>
wrote:

> Hi Reem,
>
> What's the Flink version where you're encountering this issue?
>
> Best regards,
>
> Martijn
>
> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <us...@flink.apache.org>
> wrote:
>
>> Hey there!
>>
>> We are seeing a second Flink pipeline encountering similar issues when
>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>> thoughts on there being an incompatibility between the two?
>>
>> Thanks!
>>
>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com> wrote:
>>
>>> Hi there,
>>>
>>> We are integrating the watermark alignment feature into a pipeline with
>>> a Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>>> offset. While testing, we noticed some unexpected behaviour in the
>>> watermark advancement which was resolved by removing `withIdleness` from
>>> our watermark strategy.
>>>
>>>
>>>     val watermarkStrategy = WatermarkStrategy
>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>       .withTimestampAssigner(new
>>> TimestampedEventTimestampAssigner[Event])
>>>       .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(1))
>>>       .withIdleness(Duration.ofMinutes(5))
>>>
>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>> records completely from the source. Furthermore, the logs originating from
>>> from
>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>
>>> We found that modifying the `updateInterval` passed into the alignment
>>> parameters seemed to correlate with how long the pipeline would operate
>>> before stopping - a larger interval of 20 minutes would encounter the issue
>>> later than an interval of 1 second.
>>>
>>> We are wondering if a bug exists when using both `withIdleness` and
>>> `withWatermarkAlignment`. Might it be related to
>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there possibly
>>> a race condition in the watermark emission? We do not necessarily need to
>>> have both configured at the same time, but we were also surprised by the
>>> behaviour of the application. Has anyone run into a similar issue or have
>>> further insight?
>>>
>>> Much Appreciated,
>>> - Reem
>>>
>>>
>>>
>>>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Martijn Visser <ma...@apache.org>.
Hi Reem,

What's the Flink version where you're encountering this issue?

Best regards,

Martijn

On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <us...@flink.apache.org>
wrote:

> Hey there!
>
> We are seeing a second Flink pipeline encountering similar issues when
> configuring both `withWatermarkAlignment` and `withIdleness`. The
> unexpected behaviour gets triggered after a Kafka cluster failover. Any
> thoughts on there being an incompatibility between the two?
>
> Thanks!
>
> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com> wrote:
>
>> Hi there,
>>
>> We are integrating the watermark alignment feature into a pipeline with a
>> Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>> offset. While testing, we noticed some unexpected behaviour in the
>> watermark advancement which was resolved by removing `withIdleness` from
>> our watermark strategy.
>>
>>
>>     val watermarkStrategy = WatermarkStrategy
>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>       .withTimestampAssigner(new TimestampedEventTimestampAssigner[Event])
>>       .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(1))
>>       .withIdleness(Duration.ofMinutes(5))
>>
>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>> metric. As you can see, the behaviour seems normal until a sudden drop in
>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>> records completely from the source. Furthermore, the logs originating from
>> from
>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>
>> We found that modifying the `updateInterval` passed into the alignment
>> parameters seemed to correlate with how long the pipeline would operate
>> before stopping - a larger interval of 20 minutes would encounter the issue
>> later than an interval of 1 second.
>>
>> We are wondering if a bug exists when using both `withIdleness` and
>> `withWatermarkAlignment`. Might it be related to
>> https://issues.apache.org/jira/browse/FLINK-28975, or is there possibly
>> a race condition in the watermark emission? We do not necessarily need to
>> have both configured at the same time, but we were also surprised by the
>> behaviour of the application. Has anyone run into a similar issue or have
>> further insight?
>>
>> Much Appreciated,
>> - Reem
>>
>>
>>
>>