You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexis Sarda-Espinosa <sa...@gmail.com> on 2023/05/30 14:07:41 UTC

Interaction between idling sources and watermark alignment

Hello,

I see that, in Flink 1.17.1, watermark alignment will be supported (as
beta) within a single source's splits and across different sources. I don't
see this explicitly mentioned in the documentation, but I assume that the
concept of "maximal drift" used for alignment also takes idleness into
account, resuming any readers that were paused due to an idle split or
source. Is my understanding correct?

Also, something that isn't 100% clear to me when comparing to the previous
watermark alignment documentation, even if I only wanted alignment within a
single source's splits, I still need to call withWatermarkAlignment in the
watermark strategy, right? Otherwise alignment will not take place,
regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Regards,
Alexis.

Re: Interaction between idling sources and watermark alignment

Posted by Alexis Sarda-Espinosa <sa...@gmail.com>.
Thank you very much for the explanation, Hong.

On Thu, 15 Jun 2023, 15:55 Teoh, Hong, <li...@amazon.co.uk> wrote:

> Hi Alexis, below is my understanding:
>
>
> > I see that, in Flink 1.17.1, watermark alignment will be supported (as
> beta) within a single source's splits and across different sources. I don't
> see this explicitly mentioned in the documentation, but I assume that the
> concept of "maximal drift" used for alignment also takes idleness into
> account, resuming any readers that were paused due to an idle split or
> source. Is my understanding correct?
>
> As far as I understand, the evaluation to “unpause” a given split that
> might have been paused due to watermark alignment is evaluated at fixed
> intervals here. [1]
>
> We see that the SourceCoordinator calls announceCombinedWatermark() that
> calculates the global watermark and that subsequently sends
> a WatermarkAlignmentEvent to each subtask. On each subtask, there is an
> evaluation of whether to “wake up” the operator. [2] [3]
>
> This means that there is a periodic evaluation of whether to “wake up”,
> controlled by the update interval, which defaults to 1s [4]
>
> > Also, something that isn't 100% clear to me when comparing to the
> previous watermark alignment documentation, even if I only wanted alignment
> within a single source's splits, I still need to
> call withWatermarkAlignment in the watermark strategy, right? Otherwise
> alignment will not take place, regardless
> of pipeline.watermark-alignment.allow-unaligned-source-splits.
>
> Yes, this is correct. Watermark groups are used to check whether multiple
> sources need to coordinate watermarks. If two sources A and B both belong
> to the same watermark group, then their watermarks will be aligned.
>
> Hope the above helps.
>
>
> Cheers,
> Hong
>
>
> [1]
> https://github.com/apache/flink/blob/45ba7ee87caee63a0babfd421b7c5eabaa779baa/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L160
> [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L556-L559
> [3]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L659
> [4]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java#L29
>
>
>
> On 13 Jun 2023, at 21:08, Alexis Sarda-Espinosa <sa...@gmail.com>
> wrote:
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
> Hi again, I'm not a fan of bumping questions, but I think this might be
> relevant, maybe enough to include it in the official documentation?
>
> Regards,
> Alexis.
>
> On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, <
> sarda.espinosa@gmail.com> wrote:
>
>> Hello,
>>
>> I see that, in Flink 1.17.1, watermark alignment will be supported (as
>> beta) within a single source's splits and across different sources. I don't
>> see this explicitly mentioned in the documentation, but I assume that the
>> concept of "maximal drift" used for alignment also takes idleness into
>> account, resuming any readers that were paused due to an idle split or
>> source. Is my understanding correct?
>>
>> Also, something that isn't 100% clear to me when comparing to the
>> previous watermark alignment documentation, even if I only wanted alignment
>> within a single source's splits, I still need to
>> call withWatermarkAlignment in the watermark strategy, right? Otherwise
>> alignment will not take place, regardless
>> of pipeline.watermark-alignment.allow-unaligned-source-splits.
>>
>> Regards,
>> Alexis.
>>
>>
>

Re: Interaction between idling sources and watermark alignment

Posted by "Teoh, Hong" <li...@amazon.co.uk>.
Hi Alexis, below is my understanding:


> I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) within a single source's splits and across different sources. I don't see this explicitly mentioned in the documentation, but I assume that the concept of "maximal drift" used for alignment also takes idleness into account, resuming any readers that were paused due to an idle split or source. Is my understanding correct?

As far as I understand, the evaluation to “unpause” a given split that might have been paused due to watermark alignment is evaluated at fixed intervals here. [1]

We see that the SourceCoordinator calls announceCombinedWatermark() that calculates the global watermark and that subsequently sends a WatermarkAlignmentEvent to each subtask. On each subtask, there is an evaluation of whether to “wake up” the operator. [2] [3]

This means that there is a periodic evaluation of whether to “wake up”, controlled by the update interval, which defaults to 1s [4]

> Also, something that isn't 100% clear to me when comparing to the previous watermark alignment documentation, even if I only wanted alignment within a single source's splits, I still need to call withWatermarkAlignment in the watermark strategy, right? Otherwise alignment will not take place, regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Yes, this is correct. Watermark groups are used to check whether multiple sources need to coordinate watermarks. If two sources A and B both belong to the same watermark group, then their watermarks will be aligned.

Hope the above helps.


Cheers,
Hong


[1] https://github.com/apache/flink/blob/45ba7ee87caee63a0babfd421b7c5eabaa779baa/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L160
[2] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L556-L559
[3] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L659
[4] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java#L29



On 13 Jun 2023, at 21:08, Alexis Sarda-Espinosa <sa...@gmail.com> wrote:


CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.


Hi again, I'm not a fan of bumping questions, but I think this might be relevant, maybe enough to include it in the official documentation?

Regards,
Alexis.

On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, <sa...@gmail.com>> wrote:
Hello,

I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) within a single source's splits and across different sources. I don't see this explicitly mentioned in the documentation, but I assume that the concept of "maximal drift" used for alignment also takes idleness into account, resuming any readers that were paused due to an idle split or source. Is my understanding correct?

Also, something that isn't 100% clear to me when comparing to the previous watermark alignment documentation, even if I only wanted alignment within a single source's splits, I still need to call withWatermarkAlignment in the watermark strategy, right? Otherwise alignment will not take place, regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Regards,
Alexis.



Re: Interaction between idling sources and watermark alignment

Posted by Alexis Sarda-Espinosa <sa...@gmail.com>.
Hi again, I'm not a fan of bumping questions, but I think this might be
relevant, maybe enough to include it in the official documentation?

Regards,
Alexis.

On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, <sa...@gmail.com>
wrote:

> Hello,
>
> I see that, in Flink 1.17.1, watermark alignment will be supported (as
> beta) within a single source's splits and across different sources. I don't
> see this explicitly mentioned in the documentation, but I assume that the
> concept of "maximal drift" used for alignment also takes idleness into
> account, resuming any readers that were paused due to an idle split or
> source. Is my understanding correct?
>
> Also, something that isn't 100% clear to me when comparing to the previous
> watermark alignment documentation, even if I only wanted alignment within a
> single source's splits, I still need to call withWatermarkAlignment in the
> watermark strategy, right? Otherwise alignment will not take place,
> regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.
>
> Regards,
> Alexis.
>
>