You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Reem Razak via user <us...@flink.apache.org> on 2022/11/09 16:42:29 UTC

Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Hi there,

We are integrating the watermark alignment feature into a pipeline with a
Kafka source during a "backfill"- i.e. playing from an earlier Kafka
offset. While testing, we noticed some unexpected behaviour in the
watermark advancement which was resolved by removing `withIdleness` from
our watermark strategy.


    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness(Duration.ofMinutes(1))
      .withTimestampAssigner(new TimestampedEventTimestampAssigner[Event])
      .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(1))
      .withIdleness(Duration.ofMinutes(5))

I have attached a couple of screenshots of the watermarkAlignmentDrift
metric. As you can see, the behaviour seems normal until a sudden drop in
the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
records completely from the source. Furthermore, the logs originating from
from
https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.

We found that modifying the `updateInterval` passed into the alignment
parameters seemed to correlate with how long the pipeline would operate
before stopping - a larger interval of 20 minutes would encounter the issue
later than an interval of 1 second.

We are wondering if a bug exists when using both `withIdleness` and
`withWatermarkAlignment`. Might it be related to
https://issues.apache.org/jira/browse/FLINK-28975, or is there possibly a
race condition in the watermark emission? We do not necessarily need to
have both configured at the same time, but we were also surprised by the
behaviour of the application. Has anyone run into a similar issue or have
further insight?

Much Appreciated,
- Reem

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Reem Razak via user <us...@flink.apache.org>.
This sounds very much like our issue, thank you! We will follow along with
the bug.

Much appreciated,
- Reem

On Thu, Mar 30, 2023 at 9:20 AM Martijn Visser <ma...@apache.org>
wrote:

> Hi Reem
>
> My thinking is that this might be related to recently reported
> https://issues.apache.org/jira/browse/FLINK-31632.
>
> Best regards,
>
> Martijn
>
> On Wed, Mar 29, 2023 at 7:07 PM Reem Razak via user <us...@flink.apache.org>
> wrote:
>
>> Hey Martijn,
>>
>> The version is 1.16.0
>>
>> On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <ma...@apache.org>
>> wrote:
>>
>>> Hi Reem,
>>>
>>> What's the Flink version where you're encountering this issue?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <
>>> user@flink.apache.org> wrote:
>>>
>>>> Hey there!
>>>>
>>>> We are seeing a second Flink pipeline encountering similar issues when
>>>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>>>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>>>> thoughts on there being an incompatibility between the two?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> We are integrating the watermark alignment feature into a pipeline
>>>>> with a Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>>>>> offset. While testing, we noticed some unexpected behaviour in the
>>>>> watermark advancement which was resolved by removing `withIdleness` from
>>>>> our watermark strategy.
>>>>>
>>>>>
>>>>>     val watermarkStrategy = WatermarkStrategy
>>>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>>>       .withTimestampAssigner(new
>>>>> TimestampedEventTimestampAssigner[Event])
>>>>>       .withWatermarkAlignment("alignment-group-1",
>>>>> Duration.ofMinutes(1))
>>>>>       .withIdleness(Duration.ofMinutes(5))
>>>>>
>>>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>>>> records completely from the source. Furthermore, the logs originating from
>>>>> from
>>>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>>>
>>>>> We found that modifying the `updateInterval` passed into the alignment
>>>>> parameters seemed to correlate with how long the pipeline would operate
>>>>> before stopping - a larger interval of 20 minutes would encounter the issue
>>>>> later than an interval of 1 second.
>>>>>
>>>>> We are wondering if a bug exists when using both `withIdleness` and
>>>>> `withWatermarkAlignment`. Might it be related to
>>>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there
>>>>> possibly a race condition in the watermark emission? We do not necessarily
>>>>> need to have both configured at the same time, but we were also surprised
>>>>> by the behaviour of the application. Has anyone run into a similar issue or
>>>>> have further insight?
>>>>>
>>>>> Much Appreciated,
>>>>> - Reem
>>>>>
>>>>>
>>>>>
>>>>>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Martijn Visser <ma...@apache.org>.
Hi Reem

My thinking is that this might be related to recently reported
https://issues.apache.org/jira/browse/FLINK-31632.

Best regards,

Martijn

On Wed, Mar 29, 2023 at 7:07 PM Reem Razak via user <us...@flink.apache.org>
wrote:

> Hey Martijn,
>
> The version is 1.16.0
>
> On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <ma...@apache.org>
> wrote:
>
>> Hi Reem,
>>
>> What's the Flink version where you're encountering this issue?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <
>> user@flink.apache.org> wrote:
>>
>>> Hey there!
>>>
>>> We are seeing a second Flink pipeline encountering similar issues when
>>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>>> thoughts on there being an incompatibility between the two?
>>>
>>> Thanks!
>>>
>>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> We are integrating the watermark alignment feature into a pipeline with
>>>> a Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>>>> offset. While testing, we noticed some unexpected behaviour in the
>>>> watermark advancement which was resolved by removing `withIdleness` from
>>>> our watermark strategy.
>>>>
>>>>
>>>>     val watermarkStrategy = WatermarkStrategy
>>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>>       .withTimestampAssigner(new
>>>> TimestampedEventTimestampAssigner[Event])
>>>>       .withWatermarkAlignment("alignment-group-1",
>>>> Duration.ofMinutes(1))
>>>>       .withIdleness(Duration.ofMinutes(5))
>>>>
>>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>>> records completely from the source. Furthermore, the logs originating from
>>>> from
>>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>>
>>>> We found that modifying the `updateInterval` passed into the alignment
>>>> parameters seemed to correlate with how long the pipeline would operate
>>>> before stopping - a larger interval of 20 minutes would encounter the issue
>>>> later than an interval of 1 second.
>>>>
>>>> We are wondering if a bug exists when using both `withIdleness` and
>>>> `withWatermarkAlignment`. Might it be related to
>>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there
>>>> possibly a race condition in the watermark emission? We do not necessarily
>>>> need to have both configured at the same time, but we were also surprised
>>>> by the behaviour of the application. Has anyone run into a similar issue or
>>>> have further insight?
>>>>
>>>> Much Appreciated,
>>>> - Reem
>>>>
>>>>
>>>>
>>>>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Reem Razak via user <us...@flink.apache.org>.
Hey Martijn,

The version is 1.16.0

On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <ma...@apache.org>
wrote:

> Hi Reem,
>
> What's the Flink version where you're encountering this issue?
>
> Best regards,
>
> Martijn
>
> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <us...@flink.apache.org>
> wrote:
>
>> Hey there!
>>
>> We are seeing a second Flink pipeline encountering similar issues when
>> configuring both `withWatermarkAlignment` and `withIdleness`. The
>> unexpected behaviour gets triggered after a Kafka cluster failover. Any
>> thoughts on there being an incompatibility between the two?
>>
>> Thanks!
>>
>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com> wrote:
>>
>>> Hi there,
>>>
>>> We are integrating the watermark alignment feature into a pipeline with
>>> a Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>>> offset. While testing, we noticed some unexpected behaviour in the
>>> watermark advancement which was resolved by removing `withIdleness` from
>>> our watermark strategy.
>>>
>>>
>>>     val watermarkStrategy = WatermarkStrategy
>>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>>       .withTimestampAssigner(new
>>> TimestampedEventTimestampAssigner[Event])
>>>       .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(1))
>>>       .withIdleness(Duration.ofMinutes(5))
>>>
>>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>>> metric. As you can see, the behaviour seems normal until a sudden drop in
>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>>> records completely from the source. Furthermore, the logs originating from
>>> from
>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>>
>>> We found that modifying the `updateInterval` passed into the alignment
>>> parameters seemed to correlate with how long the pipeline would operate
>>> before stopping - a larger interval of 20 minutes would encounter the issue
>>> later than an interval of 1 second.
>>>
>>> We are wondering if a bug exists when using both `withIdleness` and
>>> `withWatermarkAlignment`. Might it be related to
>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there possibly
>>> a race condition in the watermark emission? We do not necessarily need to
>>> have both configured at the same time, but we were also surprised by the
>>> behaviour of the application. Has anyone run into a similar issue or have
>>> further insight?
>>>
>>> Much Appreciated,
>>> - Reem
>>>
>>>
>>>
>>>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Martijn Visser <ma...@apache.org>.
Hi Reem,

What's the Flink version where you're encountering this issue?

Best regards,

Martijn

On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user <us...@flink.apache.org>
wrote:

> Hey there!
>
> We are seeing a second Flink pipeline encountering similar issues when
> configuring both `withWatermarkAlignment` and `withIdleness`. The
> unexpected behaviour gets triggered after a Kafka cluster failover. Any
> thoughts on there being an incompatibility between the two?
>
> Thanks!
>
> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com> wrote:
>
>> Hi there,
>>
>> We are integrating the watermark alignment feature into a pipeline with a
>> Kafka source during a "backfill"- i.e. playing from an earlier Kafka
>> offset. While testing, we noticed some unexpected behaviour in the
>> watermark advancement which was resolved by removing `withIdleness` from
>> our watermark strategy.
>>
>>
>>     val watermarkStrategy = WatermarkStrategy
>>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>>       .withTimestampAssigner(new TimestampedEventTimestampAssigner[Event])
>>       .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(1))
>>       .withIdleness(Duration.ofMinutes(5))
>>
>> I have attached a couple of screenshots of the watermarkAlignmentDrift
>> metric. As you can see, the behaviour seems normal until a sudden drop in
>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
>> records completely from the source. Furthermore, the logs originating from
>> from
>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>>
>> We found that modifying the `updateInterval` passed into the alignment
>> parameters seemed to correlate with how long the pipeline would operate
>> before stopping - a larger interval of 20 minutes would encounter the issue
>> later than an interval of 1 second.
>>
>> We are wondering if a bug exists when using both `withIdleness` and
>> `withWatermarkAlignment`. Might it be related to
>> https://issues.apache.org/jira/browse/FLINK-28975, or is there possibly
>> a race condition in the watermark emission? We do not necessarily need to
>> have both configured at the same time, but we were also surprised by the
>> behaviour of the application. Has anyone run into a similar issue or have
>> further insight?
>>
>> Much Appreciated,
>> - Reem
>>
>>
>>
>>

Re: Unexpected behaviour when configuring both `withWatermarkAlignment` & `withIdleness`

Posted by Reem Razak via user <us...@flink.apache.org>.
Hey there!

We are seeing a second Flink pipeline encountering similar issues when
configuring both `withWatermarkAlignment` and `withIdleness`. The
unexpected behaviour gets triggered after a Kafka cluster failover. Any
thoughts on there being an incompatibility between the two?

Thanks!

On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <re...@shopify.com> wrote:

> Hi there,
>
> We are integrating the watermark alignment feature into a pipeline with a
> Kafka source during a "backfill"- i.e. playing from an earlier Kafka
> offset. While testing, we noticed some unexpected behaviour in the
> watermark advancement which was resolved by removing `withIdleness` from
> our watermark strategy.
>
>
>     val watermarkStrategy = WatermarkStrategy
>       .forBoundedOutOfOrderness(Duration.ofMinutes(1))
>       .withTimestampAssigner(new TimestampedEventTimestampAssigner[Event])
>       .withWatermarkAlignment("alignment-group-1", Duration.ofMinutes(1))
>       .withIdleness(Duration.ofMinutes(5))
>
> I have attached a couple of screenshots of the watermarkAlignmentDrift
> metric. As you can see, the behaviour seems normal until a sudden drop in
> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting
> records completely from the source. Furthermore, the logs originating from
> from
> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174
> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE.
>
> We found that modifying the `updateInterval` passed into the alignment
> parameters seemed to correlate with how long the pipeline would operate
> before stopping - a larger interval of 20 minutes would encounter the issue
> later than an interval of 1 second.
>
> We are wondering if a bug exists when using both `withIdleness` and
> `withWatermarkAlignment`. Might it be related to
> https://issues.apache.org/jira/browse/FLINK-28975, or is there possibly a
> race condition in the watermark emission? We do not necessarily need to
> have both configured at the same time, but we were also surprised by the
> behaviour of the application. Has anyone run into a similar issue or have
> further insight?
>
> Much Appreciated,
> - Reem
>
>
>
>