You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jin Yi <ji...@promoted.ai> on 2022/04/08 07:54:10 UTC

Re: Weird Flink Kafka source watermark behavior

how should the code look like to verify we're using per-partition
watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
1.14.4?

we currently have it looking like:

streamExecutionEnvironment.fromSource(
   KafkaSource.<T>builder().....build(),
   watermarkStrategy,
   "whatever",
   typeInfo);

when running this job with the streamExecutionEnviornment parallelism set
to 1, and the kafka source having 30 partitions, i'm seeing weird behaviors
where the first operator after this source consumes events out of order
(and therefore, violates watermarks).  the operator simply checks to see
what "type" of event something is and uses side outputs to output the
type-specific messages.  here's a snippet of the event timestamp going back
before the current watermark (first instance of going backwards in time in
*bold*):

2022-04-08 05:47:06,315 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
2022-04-08 05:47:06,315 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
2022-04-08 05:47:06,315 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
2022-04-08 05:47:06,315 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
2022-04-08 05:47:06,315 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140

*2022-04-08 05:47:06,315 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140*2022-04-08
05:47:06,316 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
2022-04-08 05:47:06,316 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
2022-04-08 05:47:06,316 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
2022-04-08 05:47:06,316 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
2022-04-08 05:47:06,317 WARN
 ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
[] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140



On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com> wrote:

> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank
> you for the help!
>
> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com> wrote:
>
>> Thanks, Thias and Dongwon.
>>
>> I'll keep debugging this with the idle watermark turned off.
>>
>> Next TODOs:
>> - Verify that we’re using per-partition watermarks.  Our code matches
>> the example but maybe something is disabling it.
>> - Enable logging of partition-consumer assignment, to see if that is the
>> cause of the problem.
>> - Look at adding flags to set the source parallelism to see if that fixes
>> the issue.
>>
>> Yes, I've seen Flink talks on creating our own watermarks through Kafka.
>> Sounds like a good idea.
>>
>> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> I totally agree with Schwalbe that per-partition watermarking allows #
>>> source tasks < # kafka partitions.
>>>
>>> Otherwise, Dan, you should suspect other possibilities like what
>>> Schwalbe said.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
>>> Matthias.Schwalbe@viseca.ch> wrote:
>>>
>>>> Hi San, Dongwon,
>>>>
>>>>
>>>>
>>>> I share the opinion that when per-partition watermarking is enabled,
>>>> you should observe correct behavior … would be interesting to see why it
>>>> does not work for you.
>>>>
>>>>
>>>>
>>>> I’d like to clear one tiny misconception here when you write:
>>>>
>>>>
>>>>
>>>> >> - The same issue happens even if I use an idle watermark.
>>>>
>>>>
>>>>
>>>> You would expect to see glitches with watermarking when you enable
>>>> idleness.
>>>>
>>>> Idleness sort of trades watermark correctness for reduces latency when
>>>> processing timers (much simplified).
>>>>
>>>> With idleness enabled you have no guaranties whatsoever as to the
>>>> quality of watermarks (which might be ok in some cases).
>>>>
>>>> BTW we dominantly use a mix of fast and slow sources (that only update
>>>> once a day) which hand-pimped watermarking and late event processing, and
>>>> enabling idleness would break everything.
>>>>
>>>>
>>>>
>>>> Oversight put aside things should work the way you implemented it.
>>>>
>>>>
>>>>
>>>> One thing I could imagine to be a cause is
>>>>
>>>>    - that over time the kafka partitions get reassigned  to different
>>>>    consumer subtasks which would probably stress correct recalculation of
>>>>    watermarks. Hence #partition == number subtask might reduce the problem
>>>>    - can you enable logging of partition-consumer assignment, to see
>>>>    if that is the cause of the problem
>>>>    - also involuntary restarts of the job can cause havoc as this
>>>>    resets watermarking
>>>>
>>>>
>>>>
>>>> I’ll be off next week, unable to take part in the active discussion …
>>>>
>>>>
>>>>
>>>> Sincere greetings
>>>>
>>>>
>>>>
>>>> Thias
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Dan Hill <qu...@gmail.com>
>>>> *Sent:* Freitag, 18. März 2022 08:23
>>>> *To:* Dongwon Kim <ea...@gmail.com>
>>>> *Cc:* user <us...@flink.apache.org>
>>>> *Subject:* Re: Weird Flink Kafka source watermark behavior
>>>>
>>>>
>>>>
>>>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>>>
>>>>
>>>>
>>>> I'll try forcing # source tasks = # partitions tomorrow.
>>>>
>>>>
>>>>
>>>> Thank you, Dongwon, for all of your help!
>>>>
>>>>
>>>>
>>>> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>> I believe your job with per-partition watermarking should be working
>>>> okay even in a backfill scenario.
>>>>
>>>>
>>>>
>>>> BTW, is the problem still observed even with # sour tasks = #
>>>> partitions?
>>>>
>>>>
>>>>
>>>> For committers:
>>>>
>>>> Is there a way to confirm that per-partition watermarking is used in TM
>>>> log?
>>>>
>>>>
>>>>
>>>> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>> I hit this using event processing and no idleness detection.  The same
>>>> issue happens if I enable idleness.
>>>>
>>>>
>>>>
>>>> My code matches the code example for per-partition watermarking
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>
>>>> .
>>>>
>>>>
>>>>
>>>> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Dan,
>>>>
>>>>
>>>>
>>>> I'm quite confused as you already use per-partition watermarking.
>>>>
>>>>
>>>>
>>>> What I meant in the reply is
>>>>
>>>> - If you don't use per-partition watermarking, # tasks < # partitions
>>>> can cause the problem for backfill jobs.
>>>>
>>>> - If you don't use per-partition watermarking, # tasks = # partitions
>>>> is going to be okay even for backfill jobs.
>>>>
>>>> - If you use per-partition watermarking, # tasks < # partitions
>>>> shouldn't cause any problems unless you turn on the idleness detection.
>>>>
>>>>
>>>>
>>>> Regarding the idleness detection which is based on processing time,
>>>> what is your setting? If you set the value to 10 seconds for example,
>>>> you'll face the same problem unless the watermark of your backfill job
>>>> catches up real-time within 10 seconds. If you increase the value to 1
>>>> minute, your backfill job should catch up real-time within 1 minute.
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>>
>>>>
>>>> Dongwon
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>> Thanks Dongwon!
>>>>
>>>>
>>>>
>>>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>>>> tasks < # kafka partitions.  This should be called out in the docs or the
>>>> bug should be fixed.
>>>>
>>>>
>>>>
>>>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Dan,
>>>>
>>>>
>>>>
>>>> Do you use the per-partition watermarking explained in [1]?
>>>>
>>>> I've also experienced a similar problem when running backfill jobs
>>>> specifically when # source tasks < # kafka partitions.
>>>>
>>>> - When # source tasks = # kafka partitions, the backfill job works as
>>>> expected.
>>>>
>>>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
>>>> multiple partitions. This case can destroying the per-partition patterns as
>>>> explained in [2].
>>>>
>>>>
>>>>
>>>> Hope this helps.
>>>>
>>>>
>>>>
>>>> p.s. If you plan to use the per-partition watermarking, be aware that
>>>> idleness detection [3] can cause another problem when you run a backfill
>>>> job. Kafka source tasks in a backfill job seem to read a batch of records
>>>> from Kafka and then wait for downstream tasks to catch up the progress,
>>>> which can be counted as idleness.
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>>>>
>>>> [2]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>>
>>>> [3]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>>
>>>>
>>>> Dongwon
>>>>
>>>>
>>>>
>>>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com> wrote:
>>>>
>>>> I'm following the example from this section:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>>
>>>>
>>>>
>>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com>
>>>> wrote:
>>>>
>>>> Other points
>>>>
>>>> - I'm using the kafka timestamp as event time.
>>>>
>>>> - The same issue happens even if I use an idle watermark.
>>>>
>>>>
>>>>
>>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com>
>>>> wrote:
>>>>
>>>> There are 12 Kafka partitions (to keep the structure similar to other
>>>> low traffic environments).
>>>>
>>>>
>>>>
>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi.
>>>>
>>>>
>>>>
>>>> I'm running a backfill from a kafka topic with very few records spread
>>>> across a few days.  I'm seeing a case where the records coming from a kafka
>>>> source have a watermark that's more recent (by hours) than the event time.
>>>> I haven't seen this before when running this.  This violates what I'd
>>>> assume the kafka source would do.
>>>>
>>>>
>>>>
>>>> Example problem:
>>>>
>>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
>>>> are separated by a longer time period.
>>>>
>>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>>
>>>>    context.timestamp() = 1000
>>>>
>>>>    context.timerService().currentWatermark() = 500000
>>>>
>>>>
>>>>
>>>> Details about how I'm running this:
>>>>
>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>>>>
>>>> - I'm using FlinkKafkaConsumer
>>>>
>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>>> idleness settings.
>>>>
>>>> - I'm running similar code in all the environments.  The main
>>>> difference is low traffic.  I have not been able to reproduce this out of
>>>> the environment.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> I put the following process function right after my kafka source.
>>>>
>>>>
>>>>
>>>> --------
>>>>
>>>>
>>>> AfterSource
>>>>
>>>> ts=1647274892728
>>>> watermark=1647575140007
>>>> record=...
>>>>
>>>>
>>>>
>>>>
>>>> public static class TextLog extends ProcessFunction<Record, Record> {
>>>>     private final String label;
>>>>     public TextLogDeliveryLog(String label) {
>>>>         this.label = label;
>>>>     }
>>>>     @Override
>>>>     public void processElement(Record record, Context context,
>>>> Collector<Record> collector) throws Exception {
>>>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>>                 label, context.timestamp(),
>>>> context.timerService().currentWatermark(), record);
>>>>         collector.collect(deliveryLog);
>>>>     }
>>>> }
>>>>
>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>> dieser Informationen ist streng verboten.
>>>>
>>>> This message is intended only for the named recipient and may contain
>>>> confidential or privileged information. As the confidentiality of email
>>>> communication cannot be guaranteed, we do not accept any responsibility for
>>>> the confidentiality and the intactness of this message. If you have
>>>> received it in error, please advise the sender by return e-mail and delete
>>>> this message and any attachments. Any unauthorised use or dissemination of
>>>> this information is strictly prohibited.
>>>>
>>>

Re: Weird Flink Kafka source watermark behavior

Posted by Martijn Visser <ma...@apache.org>.
Hi Jin,

Flink is an open source project, so the community works on best-effort.
There's no guaranteed/quick support available. There are companies that
provide commercial support if needed.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 8 Apr 2022 at 12:13, Jin Yi <ji...@promoted.ai> wrote:

> confirmed that moving back to FlinkKafkaConsumer fixes things.
>
> is there some notification channel/medium that highlights critical
> bugs/issues on the intended features like this pretty readily?
>
> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <ji...@promoted.ai> wrote:
>
>> based on symptoms/observations on the first operator (LogRequestFilter)
>> watermark and event timestamps, it does seem like it's the bug.  things
>> track fine (timestamp > watermark) for the first batch of events, then the
>> event timestamps go back into the past and are "late".
>>
>> looks like the 1.14 backport just got in 11 days ago (
>> https://github.com/apache/flink/pull/19128).  is there a way to easily
>> test this fix locally?  based on the threads, should i just move back to
>> FlinkKafkaConsumer until 1.14.5?
>>
>> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <re...@gmail.com> wrote:
>>
>>> Hi Jin,
>>>
>>> If you are using new FLIP-27 sources like KafkaSource, per-partition
>>> watermark (or per-split watermark) is a default feature integrated in
>>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
>>> happens during the first fetch of the source that records in the first
>>> split pushes the watermark far away, then records from other splits will be
>>> treated as late events.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>>>
>>> Best regards,
>>>
>>> Qingsheng
>>>
>>>
>>> > On Apr 8, 2022, at 15:54, Jin Yi <ji...@promoted.ai> wrote:
>>> >
>>> > how should the code look like to verify we're using per-partition
>>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
>>> 1.14.4?
>>> >
>>> > we currently have it looking like:
>>> >
>>> > streamExecutionEnvironment.fromSource(
>>> >    KafkaSource.<T>builder().....build(),
>>> >    watermarkStrategy,
>>> >    "whatever",
>>> >    typeInfo);
>>> >
>>> > when running this job with the streamExecutionEnviornment parallelism
>>> set to 1, and the kafka source having 30 partitions, i'm seeing weird
>>> behaviors where the first operator after this source consumes events out of
>>> order (and therefore, violates watermarks).  the operator simply checks to
>>> see what "type" of event something is and uses side outputs to output the
>>> type-specific messages.  here's a snippet of the event timestamp going back
>>> before the current watermark (first instance of going backwards in time in
>>> bold):
>>> >
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,315 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,317 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>>> >
>>> >
>>> >
>>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>>> Thank you for the help!
>>> >
>>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> > Thanks, Thias and Dongwon.
>>> >
>>> > I'll keep debugging this with the idle watermark turned off.
>>> >
>>> > Next TODOs:
>>> > - Verify that we’re using per-partition watermarks.  Our code matches
>>> the example but maybe something is disabling it.
>>> > - Enable logging of partition-consumer assignment, to see if that is
>>> the cause of the problem.
>>> > - Look at adding flags to set the source parallelism to see if that
>>> fixes the issue.
>>> >
>>> > Yes, I've seen Flink talks on creating our own watermarks through
>>> Kafka.  Sounds like a good idea.
>>> >
>>> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>> > I totally agree with Schwalbe that per-partition watermarking allows #
>>> source tasks < # kafka partitions.
>>> >
>>> > Otherwise, Dan, you should suspect other possibilities like what
>>> Schwalbe said.
>>> >
>>> > Best,
>>> >
>>> > Dongwon
>>> >
>>> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
>>> Matthias.Schwalbe@viseca.ch> wrote:
>>> > Hi San, Dongwon,
>>> >
>>> >
>>> >
>>> > I share the opinion that when per-partition watermarking is enabled,
>>> you should observe correct behavior … would be interesting to see why it
>>> does not work for you.
>>> >
>>> >
>>> >
>>> > I’d like to clear one tiny misconception here when you write:
>>> >
>>> >
>>> >
>>> > >> - The same issue happens even if I use an idle watermark.
>>> >
>>> >
>>> >
>>> > You would expect to see glitches with watermarking when you enable
>>> idleness.
>>> >
>>> > Idleness sort of trades watermark correctness for reduces latency when
>>> processing timers (much simplified).
>>> >
>>> > With idleness enabled you have no guaranties whatsoever as to the
>>> quality of watermarks (which might be ok in some cases).
>>> >
>>> > BTW we dominantly use a mix of fast and slow sources (that only update
>>> once a day) which hand-pimped watermarking and late event processing, and
>>> enabling idleness would break everything.
>>> >
>>> >
>>> >
>>> > Oversight put aside things should work the way you implemented it.
>>> >
>>> >
>>> >
>>> > One thing I could imagine to be a cause is
>>> >
>>> >       • that over time the kafka partitions get reassigned  to
>>> different consumer subtasks which would probably stress correct
>>> recalculation of watermarks. Hence #partition == number subtask might
>>> reduce the problem
>>> >       • can you enable logging of partition-consumer assignment, to
>>> see if that is the cause of the problem
>>> >       • also involuntary restarts of the job can cause havoc as this
>>> resets watermarking
>>> >
>>> >
>>> > I’ll be off next week, unable to take part in the active discussion …
>>> >
>>> >
>>> >
>>> > Sincere greetings
>>> >
>>> >
>>> >
>>> > Thias
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > From: Dan Hill <qu...@gmail.com>
>>> > Sent: Freitag, 18. März 2022 08:23
>>> > To: Dongwon Kim <ea...@gmail.com>
>>> > Cc: user <us...@flink.apache.org>
>>> > Subject: Re: Weird Flink Kafka source watermark behavior
>>> >
>>> >
>>> >
>>> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
>>> >
>>> >
>>> >
>>> > I'll try forcing # source tasks = # partitions tomorrow.
>>> >
>>> >
>>> >
>>> > Thank you, Dongwon, for all of your help!
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>> >
>>> > I believe your job with per-partition watermarking should be working
>>> okay even in a backfill scenario.
>>> >
>>> >
>>> >
>>> > BTW, is the problem still observed even with # sour tasks = #
>>> partitions?
>>> >
>>> >
>>> >
>>> > For committers:
>>> >
>>> > Is there a way to confirm that per-partition watermarking is used in
>>> TM log?
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> >
>>> > I hit this using event processing and no idleness detection.  The same
>>> issue happens if I enable idleness.
>>> >
>>> >
>>> >
>>> > My code matches the code example for per-partition watermarking.
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>> >
>>> > Hi Dan,
>>> >
>>> >
>>> >
>>> > I'm quite confused as you already use per-partition watermarking.
>>> >
>>> >
>>> >
>>> > What I meant in the reply is
>>> >
>>> > - If you don't use per-partition watermarking, # tasks < # partitions
>>> can cause the problem for backfill jobs.
>>> >
>>> > - If you don't use per-partition watermarking, # tasks = # partitions
>>> is going to be okay even for backfill jobs.
>>> >
>>> > - If you use per-partition watermarking, # tasks < # partitions
>>> shouldn't cause any problems unless you turn on the idleness detection.
>>> >
>>> >
>>> >
>>> > Regarding the idleness detection which is based on processing time,
>>> what is your setting? If you set the value to 10 seconds for example,
>>> you'll face the same problem unless the watermark of your backfill job
>>> catches up real-time within 10 seconds. If you increase the value to 1
>>> minute, your backfill job should catch up real-time within 1 minute.
>>> >
>>> >
>>> >
>>> > Best,
>>> >
>>> >
>>> >
>>> > Dongwon
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> >
>>> > Thanks Dongwon!
>>> >
>>> >
>>> >
>>> > Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>>> tasks < # kafka partitions.  This should be called out in the docs or the
>>> bug should be fixed.
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>> >
>>> > Hi Dan,
>>> >
>>> >
>>> >
>>> > Do you use the per-partition watermarking explained in [1]?
>>> >
>>> > I've also experienced a similar problem when running backfill jobs
>>> specifically when # source tasks < # kafka partitions.
>>> >
>>> > - When # source tasks = # kafka partitions, the backfill job works as
>>> expected.
>>> >
>>> > - When # source tasks < # kafka partitions, a Kafka consumer consumes
>>> multiple partitions. This case can destroying the per-partition patterns as
>>> explained in [2].
>>> >
>>> >
>>> >
>>> > Hope this helps.
>>> >
>>> >
>>> >
>>> > p.s. If you plan to use the per-partition watermarking, be aware that
>>> idleness detection [3] can cause another problem when you run a backfill
>>> job. Kafka source tasks in a backfill job seem to read a batch of records
>>> from Kafka and then wait for downstream tasks to catch up the progress,
>>> which can be counted as idleness.
>>> >
>>> >
>>> >
>>> > [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>>> >
>>> > [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> >
>>> > [3]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>> >
>>> >
>>> >
>>> > Best,
>>> >
>>> >
>>> >
>>> > Dongwon
>>> >
>>> >
>>> >
>>> > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> >
>>> > I'm following the example from this section:
>>> >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> >
>>> > Other points
>>> >
>>> > - I'm using the kafka timestamp as event time.
>>> >
>>> > - The same issue happens even if I use an idle watermark.
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> >
>>> > There are 12 Kafka partitions (to keep the structure similar to other
>>> low traffic environments).
>>> >
>>> >
>>> >
>>> > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com>
>>> wrote:
>>> >
>>> > Hi.
>>> >
>>> >
>>> >
>>> > I'm running a backfill from a kafka topic with very few records spread
>>> across a few days.  I'm seeing a case where the records coming from a kafka
>>> source have a watermark that's more recent (by hours) than the event time.
>>> I haven't seen this before when running this.  This violates what I'd
>>> assume the kafka source would do.
>>> >
>>> >
>>> >
>>> > Example problem:
>>> >
>>> > 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
>>> times are separated by a longer time period.
>>> >
>>> > 2.  My first operator after the FlinkKafkaConsumer sees:
>>> >
>>> >    context.timestamp() = 1000
>>> >
>>> >    context.timerService().currentWatermark() = 500000
>>> >
>>> >
>>> >
>>> > Details about how I'm running this:
>>> >
>>> > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>>> source.
>>> >
>>> > - I'm using FlinkKafkaConsumer
>>> >
>>> > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>> idleness settings.
>>> >
>>> > - I'm running similar code in all the environments.  The main
>>> difference is low traffic.  I have not been able to reproduce this out of
>>> the environment.
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > I put the following process function right after my kafka source.
>>> >
>>> >
>>> >
>>> > --------
>>> >
>>> >
>>> > AfterSource
>>> >
>>> > ts=1647274892728
>>> > watermark=1647575140007
>>> > record=...
>>> >
>>> >
>>> >
>>> >
>>> > public static class TextLog extends ProcessFunction<Record, Record> {
>>> >     private final String label;
>>> >     public TextLogDeliveryLog(String label) {
>>> >         this.label = label;
>>> >     }
>>> >     @Override
>>> >     public void processElement(Record record, Context context,
>>> Collector<Record> collector) throws Exception {
>>> >         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>> >                 label, context.timestamp(),
>>> context.timerService().currentWatermark(), record);
>>> >         collector.collect(deliveryLog);
>>> >     }
>>> > }
>>> >
>>> > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>> >
>>> > This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>>

Re: Weird Flink Kafka source watermark behavior

Posted by Jin Yi <ji...@promoted.ai>.
i ended up just going back to FlinkKafkaConsumer instead of the new
FlinkSource

On Wed, Apr 13, 2022 at 3:01 AM Qingsheng Ren <re...@gmail.com> wrote:

> Another solution would be setting the parallelism = #partitions, so that
> one parallelism would be responsible for reading exactly one partition.
>
> Qingsheng
>
> > On Apr 13, 2022, at 17:52, Qingsheng Ren <re...@gmail.com> wrote:
> >
> > Hi Jin,
> >
> > Unfortunately I don’t have any quick bypass in mind except increasing
> the tolerance of out of orderness.
> >
> > Best regards,
> >
> > Qingsheng
> >
> >> On Apr 8, 2022, at 18:12, Jin Yi <ji...@promoted.ai> wrote:
> >>
> >> confirmed that moving back to FlinkKafkaConsumer fixes things.
> >>
> >> is there some notification channel/medium that highlights critical
> bugs/issues on the intended features like this pretty readily?
> >>
> >> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <ji...@promoted.ai> wrote:
> >> based on symptoms/observations on the first operator (LogRequestFilter)
> watermark and event timestamps, it does seem like it's the bug.  things
> track fine (timestamp > watermark) for the first batch of events, then the
> event timestamps go back into the past and are "late".
> >>
> >> looks like the 1.14 backport just got in 11 days ago (
> https://github.com/apache/flink/pull/19128).  is there a way to easily
> test this fix locally?  based on the threads, should i just move back to
> FlinkKafkaConsumer until 1.14.5?
> >>
> >> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <re...@gmail.com>
> wrote:
> >> Hi Jin,
> >>
> >> If you are using new FLIP-27 sources like KafkaSource, per-partition
> watermark (or per-split watermark) is a default feature integrated in
> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
> happens during the first fetch of the source that records in the first
> split pushes the watermark far away, then records from other splits will be
> treated as late events.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-26018
> >>
> >> Best regards,
> >>
> >> Qingsheng
> >>
> >>
> >>> On Apr 8, 2022, at 15:54, Jin Yi <ji...@promoted.ai> wrote:
> >>>
> >>> how should the code look like to verify we're using per-partition
> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
> 1.14.4?
> >>>
> >>> we currently have it looking like:
> >>>
> >>> streamExecutionEnvironment.fromSource(
> >>>   KafkaSource.<T>builder().....build(),
> >>>   watermarkStrategy,
> >>>   "whatever",
> >>>   typeInfo);
> >>>
> >>> when running this job with the streamExecutionEnviornment parallelism
> set to 1, and the kafka source having 30 partitions, i'm seeing weird
> behaviors where the first operator after this source consumes events out of
> order (and therefore, violates watermarks).  the operator simply checks to
> see what "type" of event something is and uses side outputs to output the
> type-specific messages.  here's a snippet of the event timestamp going back
> before the current watermark (first instance of going backwards in time in
> bold):
> >>>
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,317 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> >>>
> >>>
> >>>
> >>> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com>
> wrote:
> >>> I dove deeper.  I wasn't actually using per-partition watermarks.
> Thank you for the help!
> >>>
> >>> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com>
> wrote:
> >>> Thanks, Thias and Dongwon.
> >>>
> >>> I'll keep debugging this with the idle watermark turned off.
> >>>
> >>> Next TODOs:
> >>> - Verify that we’re using per-partition watermarks.  Our code matches
> the example but maybe something is disabling it.
> >>> - Enable logging of partition-consumer assignment, to see if that is
> the cause of the problem.
> >>> - Look at adding flags to set the source parallelism to see if that
> fixes the issue.
> >>>
> >>> Yes, I've seen Flink talks on creating our own watermarks through
> Kafka.  Sounds like a good idea.
> >>>
> >>> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com>
> wrote:
> >>> I totally agree with Schwalbe that per-partition watermarking allows #
> source tasks < # kafka partitions.
> >>>
> >>> Otherwise, Dan, you should suspect other possibilities like what
> Schwalbe said.
> >>>
> >>> Best,
> >>>
> >>> Dongwon
> >>>
> >>> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
> Matthias.Schwalbe@viseca.ch> wrote:
> >>> Hi San, Dongwon,
> >>>
> >>>
> >>>
> >>> I share the opinion that when per-partition watermarking is enabled,
> you should observe correct behavior … would be interesting to see why it
> does not work for you.
> >>>
> >>>
> >>>
> >>> I’d like to clear one tiny misconception here when you write:
> >>>
> >>>
> >>>
> >>>>> - The same issue happens even if I use an idle watermark.
> >>>
> >>>
> >>>
> >>> You would expect to see glitches with watermarking when you enable
> idleness.
> >>>
> >>> Idleness sort of trades watermark correctness for reduces latency when
> processing timers (much simplified).
> >>>
> >>> With idleness enabled you have no guaranties whatsoever as to the
> quality of watermarks (which might be ok in some cases).
> >>>
> >>> BTW we dominantly use a mix of fast and slow sources (that only update
> once a day) which hand-pimped watermarking and late event processing, and
> enabling idleness would break everything.
> >>>
> >>>
> >>>
> >>> Oversight put aside things should work the way you implemented it.
> >>>
> >>>
> >>>
> >>> One thing I could imagine to be a cause is
> >>>
> >>>      • that over time the kafka partitions get reassigned  to
> different consumer subtasks which would probably stress correct
> recalculation of watermarks. Hence #partition == number subtask might
> reduce the problem
> >>>      • can you enable logging of partition-consumer assignment, to see
> if that is the cause of the problem
> >>>      • also involuntary restarts of the job can cause havoc as this
> resets watermarking
> >>>
> >>>
> >>> I’ll be off next week, unable to take part in the active discussion …
> >>>
> >>>
> >>>
> >>> Sincere greetings
> >>>
> >>>
> >>>
> >>> Thias
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> From: Dan Hill <qu...@gmail.com>
> >>> Sent: Freitag, 18. März 2022 08:23
> >>> To: Dongwon Kim <ea...@gmail.com>
> >>> Cc: user <us...@flink.apache.org>
> >>> Subject: Re: Weird Flink Kafka source watermark behavior
> >>>
> >>>
> >>>
> >>> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> >>>
> >>>
> >>>
> >>> I'll try forcing # source tasks = # partitions tomorrow.
> >>>
> >>>
> >>>
> >>> Thank you, Dongwon, for all of your help!
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com>
> wrote:
> >>>
> >>> I believe your job with per-partition watermarking should be working
> okay even in a backfill scenario.
> >>>
> >>>
> >>>
> >>> BTW, is the problem still observed even with # sour tasks = #
> partitions?
> >>>
> >>>
> >>>
> >>> For committers:
> >>>
> >>> Is there a way to confirm that per-partition watermarking is used in
> TM log?
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com>
> wrote:
> >>>
> >>> I hit this using event processing and no idleness detection.  The same
> issue happens if I enable idleness.
> >>>
> >>>
> >>>
> >>> My code matches the code example for per-partition watermarking.
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com>
> wrote:
> >>>
> >>> Hi Dan,
> >>>
> >>>
> >>>
> >>> I'm quite confused as you already use per-partition watermarking.
> >>>
> >>>
> >>>
> >>> What I meant in the reply is
> >>>
> >>> - If you don't use per-partition watermarking, # tasks < # partitions
> can cause the problem for backfill jobs.
> >>>
> >>> - If you don't use per-partition watermarking, # tasks = # partitions
> is going to be okay even for backfill jobs.
> >>>
> >>> - If you use per-partition watermarking, # tasks < # partitions
> shouldn't cause any problems unless you turn on the idleness detection.
> >>>
> >>>
> >>>
> >>> Regarding the idleness detection which is based on processing time,
> what is your setting? If you set the value to 10 seconds for example,
> you'll face the same problem unless the watermark of your backfill job
> catches up real-time within 10 seconds. If you increase the value to 1
> minute, your backfill job should catch up real-time within 1 minute.
> >>>
> >>>
> >>>
> >>> Best,
> >>>
> >>>
> >>>
> >>> Dongwon
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com>
> wrote:
> >>>
> >>> Thanks Dongwon!
> >>>
> >>>
> >>>
> >>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
> tasks < # kafka partitions.  This should be called out in the docs or the
> bug should be fixed.
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com>
> wrote:
> >>>
> >>> Hi Dan,
> >>>
> >>>
> >>>
> >>> Do you use the per-partition watermarking explained in [1]?
> >>>
> >>> I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks < # kafka partitions.
> >>>
> >>> - When # source tasks = # kafka partitions, the backfill job works as
> expected.
> >>>
> >>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
> multiple partitions. This case can destroying the per-partition patterns as
> explained in [2].
> >>>
> >>>
> >>>
> >>> Hope this helps.
> >>>
> >>>
> >>>
> >>> p.s. If you plan to use the per-partition watermarking, be aware that
> idleness detection [3] can cause another problem when you run a backfill
> job. Kafka source tasks in a backfill job seem to read a batch of records
> from Kafka and then wait for downstream tasks to catch up the progress,
> which can be counted as idleness.
> >>>
> >>>
> >>>
> >>> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> >>>
> >>> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> >>>
> >>> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> >>>
> >>>
> >>>
> >>> Best,
> >>>
> >>>
> >>>
> >>> Dongwon
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com>
> wrote:
> >>>
> >>> I'm following the example from this section:
> >>>
> >>>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com>
> wrote:
> >>>
> >>> Other points
> >>>
> >>> - I'm using the kafka timestamp as event time.
> >>>
> >>> - The same issue happens even if I use an idle watermark.
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com>
> wrote:
> >>>
> >>> There are 12 Kafka partitions (to keep the structure similar to other
> low traffic environments).
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com>
> wrote:
> >>>
> >>> Hi.
> >>>
> >>>
> >>>
> >>> I'm running a backfill from a kafka topic with very few records spread
> across a few days.  I'm seeing a case where the records coming from a kafka
> source have a watermark that's more recent (by hours) than the event time.
> I haven't seen this before when running this.  This violates what I'd
> assume the kafka source would do.
> >>>
> >>>
> >>>
> >>> Example problem:
> >>>
> >>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
> times are separated by a longer time period.
> >>>
> >>> 2.  My first operator after the FlinkKafkaConsumer sees:
> >>>
> >>>   context.timestamp() = 1000
> >>>
> >>>   context.timerService().currentWatermark() = 500000
> >>>
> >>>
> >>>
> >>> Details about how I'm running this:
> >>>
> >>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
> source.
> >>>
> >>> - I'm using FlinkKafkaConsumer
> >>>
> >>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
> idleness settings.
> >>>
> >>> - I'm running similar code in all the environments.  The main
> difference is low traffic.  I have not been able to reproduce this out of
> the environment.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> I put the following process function right after my kafka source.
> >>>
> >>>
> >>>
> >>> --------
> >>>
> >>>
> >>> AfterSource
> >>>
> >>> ts=1647274892728
> >>> watermark=1647575140007
> >>> record=...
> >>>
> >>>
> >>>
> >>>
> >>> public static class TextLog extends ProcessFunction<Record, Record> {
> >>>    private final String label;
> >>>    public TextLogDeliveryLog(String label) {
> >>>        this.label = label;
> >>>    }
> >>>    @Override
> >>>    public void processElement(Record record, Context context,
> Collector<Record> collector) throws Exception {
> >>>        LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
> >>>                label, context.timestamp(),
> context.timerService().currentWatermark(), record);
> >>>        collector.collect(deliveryLog);
> >>>    }
> >>> }
> >>>
> >>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
> >>>
> >>> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
> >>
> >
>
>

Re: Weird Flink Kafka source watermark behavior

Posted by Qingsheng Ren <re...@gmail.com>.
Another solution would be setting the parallelism = #partitions, so that one parallelism would be responsible for reading exactly one partition.

Qingsheng

> On Apr 13, 2022, at 17:52, Qingsheng Ren <re...@gmail.com> wrote:
> 
> Hi Jin, 
> 
> Unfortunately I don’t have any quick bypass in mind except increasing the tolerance of out of orderness. 
> 
> Best regards, 
> 
> Qingsheng
> 
>> On Apr 8, 2022, at 18:12, Jin Yi <ji...@promoted.ai> wrote:
>> 
>> confirmed that moving back to FlinkKafkaConsumer fixes things.
>> 
>> is there some notification channel/medium that highlights critical bugs/issues on the intended features like this pretty readily?
>> 
>> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <ji...@promoted.ai> wrote:
>> based on symptoms/observations on the first operator (LogRequestFilter) watermark and event timestamps, it does seem like it's the bug.  things track fine (timestamp > watermark) for the first batch of events, then the event timestamps go back into the past and are "late".
>> 
>> looks like the 1.14 backport just got in 11 days ago (https://github.com/apache/flink/pull/19128).  is there a way to easily test this fix locally?  based on the threads, should i just move back to FlinkKafkaConsumer until 1.14.5?
>> 
>> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <re...@gmail.com> wrote:
>> Hi Jin,
>> 
>> If you are using new FLIP-27 sources like KafkaSource, per-partition watermark (or per-split watermark) is a default feature integrated in SourceOperator. You might hit the bug described in FLINK-26018 [1], which happens during the first fetch of the source that records in the first split pushes the watermark far away, then records from other splits will be treated as late events.  
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>> 
>> Best regards,
>> 
>> Qingsheng
>> 
>> 
>>> On Apr 8, 2022, at 15:54, Jin Yi <ji...@promoted.ai> wrote:
>>> 
>>> how should the code look like to verify we're using per-partition watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in 1.14.4?
>>> 
>>> we currently have it looking like:
>>> 
>>> streamExecutionEnvironment.fromSource(
>>>   KafkaSource.<T>builder().....build(),
>>>   watermarkStrategy,
>>>   "whatever",
>>>   typeInfo);
>>> 
>>> when running this job with the streamExecutionEnviornment parallelism set to 1, and the kafka source having 30 partitions, i'm seeing weird behaviors where the first operator after this source consumes events out of order (and therefore, violates watermarks).  the operator simply checks to see what "type" of event something is and uses side outputs to output the type-specific messages.  here's a snippet of the event timestamp going back before the current watermark (first instance of going backwards in time in bold):
>>> 
>>> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>>> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>>> 2022-04-08 05:47:06,317 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>>> 
>>> 
>>> 
>>> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com> wrote:
>>> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank you for the help!
>>> 
>>> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com> wrote:
>>> Thanks, Thias and Dongwon.
>>> 
>>> I'll keep debugging this with the idle watermark turned off.
>>> 
>>> Next TODOs:
>>> - Verify that we’re using per-partition watermarks.  Our code matches the example but maybe something is disabling it.
>>> - Enable logging of partition-consumer assignment, to see if that is the cause of the problem.
>>> - Look at adding flags to set the source parallelism to see if that fixes the issue.
>>> 
>>> Yes, I've seen Flink talks on creating our own watermarks through Kafka.  Sounds like a good idea.
>>> 
>>> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com> wrote:
>>> I totally agree with Schwalbe that per-partition watermarking allows # source tasks < # kafka partitions. 
>>> 
>>> Otherwise, Dan, you should suspect other possibilities like what Schwalbe said.
>>> 
>>> Best,
>>> 
>>> Dongwon
>>> 
>>> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <Ma...@viseca.ch> wrote:
>>> Hi San, Dongwon,
>>> 
>>> 
>>> 
>>> I share the opinion that when per-partition watermarking is enabled, you should observe correct behavior … would be interesting to see why it does not work for you.
>>> 
>>> 
>>> 
>>> I’d like to clear one tiny misconception here when you write:
>>> 
>>> 
>>> 
>>>>> - The same issue happens even if I use an idle watermark.
>>> 
>>> 
>>> 
>>> You would expect to see glitches with watermarking when you enable idleness.
>>> 
>>> Idleness sort of trades watermark correctness for reduces latency when processing timers (much simplified).
>>> 
>>> With idleness enabled you have no guaranties whatsoever as to the quality of watermarks (which might be ok in some cases).
>>> 
>>> BTW we dominantly use a mix of fast and slow sources (that only update once a day) which hand-pimped watermarking and late event processing, and enabling idleness would break everything.
>>> 
>>> 
>>> 
>>> Oversight put aside things should work the way you implemented it.
>>> 
>>> 
>>> 
>>> One thing I could imagine to be a cause is
>>> 
>>>      • that over time the kafka partitions get reassigned  to different consumer subtasks which would probably stress correct recalculation of watermarks. Hence #partition == number subtask might reduce the problem
>>>      • can you enable logging of partition-consumer assignment, to see if that is the cause of the problem
>>>      • also involuntary restarts of the job can cause havoc as this resets watermarking
>>> 
>>> 
>>> I’ll be off next week, unable to take part in the active discussion …
>>> 
>>> 
>>> 
>>> Sincere greetings
>>> 
>>> 
>>> 
>>> Thias
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> From: Dan Hill <qu...@gmail.com> 
>>> Sent: Freitag, 18. März 2022 08:23
>>> To: Dongwon Kim <ea...@gmail.com>
>>> Cc: user <us...@flink.apache.org>
>>> Subject: Re: Weird Flink Kafka source watermark behavior
>>> 
>>> 
>>> 
>>> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
>>> 
>>> 
>>> 
>>> I'll try forcing # source tasks = # partitions tomorrow.
>>> 
>>> 
>>> 
>>> Thank you, Dongwon, for all of your help!
>>> 
>>> 
>>> 
>>> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com> wrote:
>>> 
>>> I believe your job with per-partition watermarking should be working okay even in a backfill scenario. 
>>> 
>>> 
>>> 
>>> BTW, is the problem still observed even with # sour tasks = # partitions?
>>> 
>>> 
>>> 
>>> For committers:
>>> 
>>> Is there a way to confirm that per-partition watermarking is used in TM log?
>>> 
>>> 
>>> 
>>> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com> wrote:
>>> 
>>> I hit this using event processing and no idleness detection.  The same issue happens if I enable idleness.
>>> 
>>> 
>>> 
>>> My code matches the code example for per-partition watermarking.
>>> 
>>> 
>>> 
>>> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com> wrote:
>>> 
>>> Hi Dan,
>>> 
>>> 
>>> 
>>> I'm quite confused as you already use per-partition watermarking.
>>> 
>>> 
>>> 
>>> What I meant in the reply is
>>> 
>>> - If you don't use per-partition watermarking, # tasks < # partitions can cause the problem for backfill jobs.
>>> 
>>> - If you don't use per-partition watermarking, # tasks = # partitions is going to be okay even for backfill jobs.
>>> 
>>> - If you use per-partition watermarking, # tasks < # partitions shouldn't cause any problems unless you turn on the idleness detection.
>>> 
>>> 
>>> 
>>> Regarding the idleness detection which is based on processing time, what is your setting? If you set the value to 10 seconds for example, you'll face the same problem unless the watermark of your backfill job catches up real-time within 10 seconds. If you increase the value to 1 minute, your backfill job should catch up real-time within 1 minute.
>>> 
>>> 
>>> 
>>> Best,
>>> 
>>> 
>>> 
>>> Dongwon
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com> wrote:
>>> 
>>> Thanks Dongwon!
>>> 
>>> 
>>> 
>>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks < # kafka partitions.  This should be called out in the docs or the bug should be fixed.
>>> 
>>> 
>>> 
>>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com> wrote:
>>> 
>>> Hi Dan,
>>> 
>>> 
>>> 
>>> Do you use the per-partition watermarking explained in [1]?
>>> 
>>> I've also experienced a similar problem when running backfill jobs specifically when # source tasks < # kafka partitions. 
>>> 
>>> - When # source tasks = # kafka partitions, the backfill job works as expected.
>>> 
>>> - When # source tasks < # kafka partitions, a Kafka consumer consumes multiple partitions. This case can destroying the per-partition patterns as explained in [2].
>>> 
>>> 
>>> 
>>> Hope this helps.
>>> 
>>> 
>>> 
>>> p.s. If you plan to use the per-partition watermarking, be aware that idleness detection [3] can cause another problem when you run a backfill job. Kafka source tasks in a backfill job seem to read a batch of records from Kafka and then wait for downstream tasks to catch up the progress, which can be counted as idleness.
>>> 
>>> 
>>> 
>>> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>>> 
>>> [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> 
>>> [3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>> 
>>> 
>>> 
>>> Best,
>>> 
>>> 
>>> 
>>> Dongwon
>>> 
>>> 
>>> 
>>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com> wrote:
>>> 
>>> I'm following the example from this section:
>>> 
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> 
>>> 
>>> 
>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com> wrote:
>>> 
>>> Other points
>>> 
>>> - I'm using the kafka timestamp as event time.
>>> 
>>> - The same issue happens even if I use an idle watermark.
>>> 
>>> 
>>> 
>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com> wrote:
>>> 
>>> There are 12 Kafka partitions (to keep the structure similar to other low traffic environments).
>>> 
>>> 
>>> 
>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com> wrote:
>>> 
>>> Hi.
>>> 
>>> 
>>> 
>>> I'm running a backfill from a kafka topic with very few records spread across a few days.  I'm seeing a case where the records coming from a kafka source have a watermark that's more recent (by hours) than the event time.  I haven't seen this before when running this.  This violates what I'd assume the kafka source would do.
>>> 
>>> 
>>> 
>>> Example problem:
>>> 
>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times are separated by a longer time period.
>>> 
>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>> 
>>>   context.timestamp() = 1000
>>> 
>>>   context.timerService().currentWatermark() = 500000
>>> 
>>> 
>>> 
>>> Details about how I'm running this:
>>> 
>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>>> 
>>> - I'm using FlinkKafkaConsumer
>>> 
>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness settings.
>>> 
>>> - I'm running similar code in all the environments.  The main difference is low traffic.  I have not been able to reproduce this out of the environment.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> I put the following process function right after my kafka source.
>>> 
>>> 
>>> 
>>> --------
>>> 
>>> 
>>> AfterSource
>>> 
>>> ts=1647274892728
>>> watermark=1647575140007
>>> record=...
>>> 
>>> 
>>> 
>>> 
>>> public static class TextLog extends ProcessFunction<Record, Record> {
>>>    private final String label;
>>>    public TextLogDeliveryLog(String label) {
>>>        this.label = label;
>>>    }
>>>    @Override
>>>    public void processElement(Record record, Context context, Collector<Record> collector) throws Exception {
>>>        LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>                label, context.timestamp(), context.timerService().currentWatermark(), record);
>>>        collector.collect(deliveryLog);
>>>    }
>>> }
>>> 
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.
>>> 
>>> This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
>> 
> 


Re: Weird Flink Kafka source watermark behavior

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Jin, 

Unfortunately I don’t have any quick bypass in mind except increasing the tolerance of out of orderness. 

Best regards, 

Qingsheng

> On Apr 8, 2022, at 18:12, Jin Yi <ji...@promoted.ai> wrote:
> 
> confirmed that moving back to FlinkKafkaConsumer fixes things.
> 
> is there some notification channel/medium that highlights critical bugs/issues on the intended features like this pretty readily?
> 
> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <ji...@promoted.ai> wrote:
> based on symptoms/observations on the first operator (LogRequestFilter) watermark and event timestamps, it does seem like it's the bug.  things track fine (timestamp > watermark) for the first batch of events, then the event timestamps go back into the past and are "late".
> 
> looks like the 1.14 backport just got in 11 days ago (https://github.com/apache/flink/pull/19128).  is there a way to easily test this fix locally?  based on the threads, should i just move back to FlinkKafkaConsumer until 1.14.5?
> 
> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <re...@gmail.com> wrote:
> Hi Jin,
> 
> If you are using new FLIP-27 sources like KafkaSource, per-partition watermark (or per-split watermark) is a default feature integrated in SourceOperator. You might hit the bug described in FLINK-26018 [1], which happens during the first fetch of the source that records in the first split pushes the watermark far away, then records from other splits will be treated as late events.  
> 
> [1] https://issues.apache.org/jira/browse/FLINK-26018
> 
> Best regards,
> 
> Qingsheng
> 
> 
> > On Apr 8, 2022, at 15:54, Jin Yi <ji...@promoted.ai> wrote:
> > 
> > how should the code look like to verify we're using per-partition watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in 1.14.4?
> > 
> > we currently have it looking like:
> > 
> > streamExecutionEnvironment.fromSource(
> >    KafkaSource.<T>builder().....build(),
> >    watermarkStrategy,
> >    "whatever",
> >    typeInfo);
> > 
> > when running this job with the streamExecutionEnviornment parallelism set to 1, and the kafka source having 30 partitions, i'm seeing weird behaviors where the first operator after this source consumes events out of order (and therefore, violates watermarks).  the operator simply checks to see what "type" of event something is and uses side outputs to output the type-specific messages.  here's a snippet of the event timestamp going back before the current watermark (first instance of going backwards in time in bold):
> > 
> > 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> > 2022-04-08 05:47:06,317 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> > 
> > 
> > 
> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com> wrote:
> > I dove deeper.  I wasn't actually using per-partition watermarks.  Thank you for the help!
> > 
> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com> wrote:
> > Thanks, Thias and Dongwon.
> > 
> > I'll keep debugging this with the idle watermark turned off.
> > 
> > Next TODOs:
> > - Verify that we’re using per-partition watermarks.  Our code matches the example but maybe something is disabling it.
> > - Enable logging of partition-consumer assignment, to see if that is the cause of the problem.
> > - Look at adding flags to set the source parallelism to see if that fixes the issue.
> > 
> > Yes, I've seen Flink talks on creating our own watermarks through Kafka.  Sounds like a good idea.
> > 
> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com> wrote:
> > I totally agree with Schwalbe that per-partition watermarking allows # source tasks < # kafka partitions. 
> > 
> > Otherwise, Dan, you should suspect other possibilities like what Schwalbe said.
> > 
> > Best,
> > 
> > Dongwon
> > 
> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <Ma...@viseca.ch> wrote:
> > Hi San, Dongwon,
> > 
> >  
> > 
> > I share the opinion that when per-partition watermarking is enabled, you should observe correct behavior … would be interesting to see why it does not work for you.
> > 
> >  
> > 
> > I’d like to clear one tiny misconception here when you write:
> > 
> >  
> > 
> > >> - The same issue happens even if I use an idle watermark.
> > 
> >  
> > 
> > You would expect to see glitches with watermarking when you enable idleness.
> > 
> > Idleness sort of trades watermark correctness for reduces latency when processing timers (much simplified).
> > 
> > With idleness enabled you have no guaranties whatsoever as to the quality of watermarks (which might be ok in some cases).
> > 
> > BTW we dominantly use a mix of fast and slow sources (that only update once a day) which hand-pimped watermarking and late event processing, and enabling idleness would break everything.
> > 
> >  
> > 
> > Oversight put aside things should work the way you implemented it.
> > 
> >  
> > 
> > One thing I could imagine to be a cause is
> > 
> >       • that over time the kafka partitions get reassigned  to different consumer subtasks which would probably stress correct recalculation of watermarks. Hence #partition == number subtask might reduce the problem
> >       • can you enable logging of partition-consumer assignment, to see if that is the cause of the problem
> >       • also involuntary restarts of the job can cause havoc as this resets watermarking
> >  
> > 
> > I’ll be off next week, unable to take part in the active discussion …
> > 
> >  
> > 
> > Sincere greetings
> > 
> >  
> > 
> > Thias
> > 
> >  
> > 
> >  
> > 
> >  
> > 
> >  
> > 
> > From: Dan Hill <qu...@gmail.com> 
> > Sent: Freitag, 18. März 2022 08:23
> > To: Dongwon Kim <ea...@gmail.com>
> > Cc: user <us...@flink.apache.org>
> > Subject: Re: Weird Flink Kafka source watermark behavior
> > 
> >  
> > 
> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> > 
> >  
> > 
> > I'll try forcing # source tasks = # partitions tomorrow.
> > 
> >  
> > 
> > Thank you, Dongwon, for all of your help!
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com> wrote:
> > 
> > I believe your job with per-partition watermarking should be working okay even in a backfill scenario. 
> > 
> >  
> > 
> > BTW, is the problem still observed even with # sour tasks = # partitions?
> > 
> >  
> > 
> > For committers:
> > 
> > Is there a way to confirm that per-partition watermarking is used in TM log?
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com> wrote:
> > 
> > I hit this using event processing and no idleness detection.  The same issue happens if I enable idleness.
> > 
> >  
> > 
> > My code matches the code example for per-partition watermarking.
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com> wrote:
> > 
> > Hi Dan,
> > 
> >  
> > 
> > I'm quite confused as you already use per-partition watermarking.
> > 
> >  
> > 
> > What I meant in the reply is
> > 
> > - If you don't use per-partition watermarking, # tasks < # partitions can cause the problem for backfill jobs.
> > 
> > - If you don't use per-partition watermarking, # tasks = # partitions is going to be okay even for backfill jobs.
> > 
> > - If you use per-partition watermarking, # tasks < # partitions shouldn't cause any problems unless you turn on the idleness detection.
> > 
> >  
> > 
> > Regarding the idleness detection which is based on processing time, what is your setting? If you set the value to 10 seconds for example, you'll face the same problem unless the watermark of your backfill job catches up real-time within 10 seconds. If you increase the value to 1 minute, your backfill job should catch up real-time within 1 minute.
> > 
> >  
> > 
> > Best,
> > 
> >  
> > 
> > Dongwon
> > 
> >  
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com> wrote:
> > 
> > Thanks Dongwon!
> > 
> >  
> > 
> > Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks < # kafka partitions.  This should be called out in the docs or the bug should be fixed.
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com> wrote:
> > 
> > Hi Dan,
> > 
> >  
> > 
> > Do you use the per-partition watermarking explained in [1]?
> > 
> > I've also experienced a similar problem when running backfill jobs specifically when # source tasks < # kafka partitions. 
> > 
> > - When # source tasks = # kafka partitions, the backfill job works as expected.
> > 
> > - When # source tasks < # kafka partitions, a Kafka consumer consumes multiple partitions. This case can destroying the per-partition patterns as explained in [2].
> > 
> >  
> > 
> > Hope this helps.
> > 
> >  
> > 
> > p.s. If you plan to use the per-partition watermarking, be aware that idleness detection [3] can cause another problem when you run a backfill job. Kafka source tasks in a backfill job seem to read a batch of records from Kafka and then wait for downstream tasks to catch up the progress, which can be counted as idleness.
> > 
> >  
> > 
> > [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> > 
> > [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> > 
> > [3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> > 
> >  
> > 
> > Best,
> > 
> >  
> > 
> > Dongwon
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com> wrote:
> > 
> > I'm following the example from this section:
> > 
> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com> wrote:
> > 
> > Other points
> > 
> > - I'm using the kafka timestamp as event time.
> > 
> > - The same issue happens even if I use an idle watermark.
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com> wrote:
> > 
> > There are 12 Kafka partitions (to keep the structure similar to other low traffic environments).
> > 
> >  
> > 
> > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com> wrote:
> > 
> > Hi.
> > 
> >  
> > 
> > I'm running a backfill from a kafka topic with very few records spread across a few days.  I'm seeing a case where the records coming from a kafka source have a watermark that's more recent (by hours) than the event time.  I haven't seen this before when running this.  This violates what I'd assume the kafka source would do.
> > 
> >  
> > 
> > Example problem:
> > 
> > 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times are separated by a longer time period.
> > 
> > 2.  My first operator after the FlinkKafkaConsumer sees:
> > 
> >    context.timestamp() = 1000
> > 
> >    context.timerService().currentWatermark() = 500000
> > 
> >  
> > 
> > Details about how I'm running this:
> > 
> > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
> > 
> > - I'm using FlinkKafkaConsumer
> > 
> > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness settings.
> > 
> > - I'm running similar code in all the environments.  The main difference is low traffic.  I have not been able to reproduce this out of the environment.
> > 
> >  
> > 
> >  
> > 
> > I put the following process function right after my kafka source.
> > 
> >  
> > 
> > --------
> > 
> > 
> > AfterSource
> > 
> > ts=1647274892728
> > watermark=1647575140007
> > record=...
> > 
> >  
> > 
> > 
> > public static class TextLog extends ProcessFunction<Record, Record> {
> >     private final String label;
> >     public TextLogDeliveryLog(String label) {
> >         this.label = label;
> >     }
> >     @Override
> >     public void processElement(Record record, Context context, Collector<Record> collector) throws Exception {
> >         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
> >                 label, context.timestamp(), context.timerService().currentWatermark(), record);
> >         collector.collect(deliveryLog);
> >     }
> > }
> > 
> > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.
> > 
> > This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
> 


Re: Weird Flink Kafka source watermark behavior

Posted by Jin Yi <ji...@promoted.ai>.
confirmed that moving back to FlinkKafkaConsumer fixes things.

is there some notification channel/medium that highlights critical
bugs/issues on the intended features like this pretty readily?

On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <ji...@promoted.ai> wrote:

> based on symptoms/observations on the first operator (LogRequestFilter)
> watermark and event timestamps, it does seem like it's the bug.  things
> track fine (timestamp > watermark) for the first batch of events, then the
> event timestamps go back into the past and are "late".
>
> looks like the 1.14 backport just got in 11 days ago (
> https://github.com/apache/flink/pull/19128).  is there a way to easily
> test this fix locally?  based on the threads, should i just move back to
> FlinkKafkaConsumer until 1.14.5?
>
> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <re...@gmail.com> wrote:
>
>> Hi Jin,
>>
>> If you are using new FLIP-27 sources like KafkaSource, per-partition
>> watermark (or per-split watermark) is a default feature integrated in
>> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
>> happens during the first fetch of the source that records in the first
>> split pushes the watermark far away, then records from other splits will be
>> treated as late events.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-26018
>>
>> Best regards,
>>
>> Qingsheng
>>
>>
>> > On Apr 8, 2022, at 15:54, Jin Yi <ji...@promoted.ai> wrote:
>> >
>> > how should the code look like to verify we're using per-partition
>> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
>> 1.14.4?
>> >
>> > we currently have it looking like:
>> >
>> > streamExecutionEnvironment.fromSource(
>> >    KafkaSource.<T>builder().....build(),
>> >    watermarkStrategy,
>> >    "whatever",
>> >    typeInfo);
>> >
>> > when running this job with the streamExecutionEnviornment parallelism
>> set to 1, and the kafka source having 30 partitions, i'm seeing weird
>> behaviors where the first operator after this source consumes events out of
>> order (and therefore, violates watermarks).  the operator simply checks to
>> see what "type" of event something is and uses side outputs to output the
>> type-specific messages.  here's a snippet of the event timestamp going back
>> before the current watermark (first instance of going backwards in time in
>> bold):
>> >
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
>> > 2022-04-08 05:47:06,315 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>> > 2022-04-08 05:47:06,316 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>> > 2022-04-08 05:47:06,317 WARN
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>> >
>> >
>> >
>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com>
>> wrote:
>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>> Thank you for the help!
>> >
>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com>
>> wrote:
>> > Thanks, Thias and Dongwon.
>> >
>> > I'll keep debugging this with the idle watermark turned off.
>> >
>> > Next TODOs:
>> > - Verify that we’re using per-partition watermarks.  Our code matches
>> the example but maybe something is disabling it.
>> > - Enable logging of partition-consumer assignment, to see if that is
>> the cause of the problem.
>> > - Look at adding flags to set the source parallelism to see if that
>> fixes the issue.
>> >
>> > Yes, I've seen Flink talks on creating our own watermarks through
>> Kafka.  Sounds like a good idea.
>> >
>> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>> > I totally agree with Schwalbe that per-partition watermarking allows #
>> source tasks < # kafka partitions.
>> >
>> > Otherwise, Dan, you should suspect other possibilities like what
>> Schwalbe said.
>> >
>> > Best,
>> >
>> > Dongwon
>> >
>> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
>> Matthias.Schwalbe@viseca.ch> wrote:
>> > Hi San, Dongwon,
>> >
>> >
>> >
>> > I share the opinion that when per-partition watermarking is enabled,
>> you should observe correct behavior … would be interesting to see why it
>> does not work for you.
>> >
>> >
>> >
>> > I’d like to clear one tiny misconception here when you write:
>> >
>> >
>> >
>> > >> - The same issue happens even if I use an idle watermark.
>> >
>> >
>> >
>> > You would expect to see glitches with watermarking when you enable
>> idleness.
>> >
>> > Idleness sort of trades watermark correctness for reduces latency when
>> processing timers (much simplified).
>> >
>> > With idleness enabled you have no guaranties whatsoever as to the
>> quality of watermarks (which might be ok in some cases).
>> >
>> > BTW we dominantly use a mix of fast and slow sources (that only update
>> once a day) which hand-pimped watermarking and late event processing, and
>> enabling idleness would break everything.
>> >
>> >
>> >
>> > Oversight put aside things should work the way you implemented it.
>> >
>> >
>> >
>> > One thing I could imagine to be a cause is
>> >
>> >       • that over time the kafka partitions get reassigned  to
>> different consumer subtasks which would probably stress correct
>> recalculation of watermarks. Hence #partition == number subtask might
>> reduce the problem
>> >       • can you enable logging of partition-consumer assignment, to see
>> if that is the cause of the problem
>> >       • also involuntary restarts of the job can cause havoc as this
>> resets watermarking
>> >
>> >
>> > I’ll be off next week, unable to take part in the active discussion …
>> >
>> >
>> >
>> > Sincere greetings
>> >
>> >
>> >
>> > Thias
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > From: Dan Hill <qu...@gmail.com>
>> > Sent: Freitag, 18. März 2022 08:23
>> > To: Dongwon Kim <ea...@gmail.com>
>> > Cc: user <us...@flink.apache.org>
>> > Subject: Re: Weird Flink Kafka source watermark behavior
>> >
>> >
>> >
>> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
>> >
>> >
>> >
>> > I'll try forcing # source tasks = # partitions tomorrow.
>> >
>> >
>> >
>> > Thank you, Dongwon, for all of your help!
>> >
>> >
>> >
>> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>> >
>> > I believe your job with per-partition watermarking should be working
>> okay even in a backfill scenario.
>> >
>> >
>> >
>> > BTW, is the problem still observed even with # sour tasks = #
>> partitions?
>> >
>> >
>> >
>> > For committers:
>> >
>> > Is there a way to confirm that per-partition watermarking is used in TM
>> log?
>> >
>> >
>> >
>> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com> wrote:
>> >
>> > I hit this using event processing and no idleness detection.  The same
>> issue happens if I enable idleness.
>> >
>> >
>> >
>> > My code matches the code example for per-partition watermarking.
>> >
>> >
>> >
>> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>> >
>> > Hi Dan,
>> >
>> >
>> >
>> > I'm quite confused as you already use per-partition watermarking.
>> >
>> >
>> >
>> > What I meant in the reply is
>> >
>> > - If you don't use per-partition watermarking, # tasks < # partitions
>> can cause the problem for backfill jobs.
>> >
>> > - If you don't use per-partition watermarking, # tasks = # partitions
>> is going to be okay even for backfill jobs.
>> >
>> > - If you use per-partition watermarking, # tasks < # partitions
>> shouldn't cause any problems unless you turn on the idleness detection.
>> >
>> >
>> >
>> > Regarding the idleness detection which is based on processing time,
>> what is your setting? If you set the value to 10 seconds for example,
>> you'll face the same problem unless the watermark of your backfill job
>> catches up real-time within 10 seconds. If you increase the value to 1
>> minute, your backfill job should catch up real-time within 1 minute.
>> >
>> >
>> >
>> > Best,
>> >
>> >
>> >
>> > Dongwon
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com> wrote:
>> >
>> > Thanks Dongwon!
>> >
>> >
>> >
>> > Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>> tasks < # kafka partitions.  This should be called out in the docs or the
>> bug should be fixed.
>> >
>> >
>> >
>> > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>> >
>> > Hi Dan,
>> >
>> >
>> >
>> > Do you use the per-partition watermarking explained in [1]?
>> >
>> > I've also experienced a similar problem when running backfill jobs
>> specifically when # source tasks < # kafka partitions.
>> >
>> > - When # source tasks = # kafka partitions, the backfill job works as
>> expected.
>> >
>> > - When # source tasks < # kafka partitions, a Kafka consumer consumes
>> multiple partitions. This case can destroying the per-partition patterns as
>> explained in [2].
>> >
>> >
>> >
>> > Hope this helps.
>> >
>> >
>> >
>> > p.s. If you plan to use the per-partition watermarking, be aware that
>> idleness detection [3] can cause another problem when you run a backfill
>> job. Kafka source tasks in a backfill job seem to read a batch of records
>> from Kafka and then wait for downstream tasks to catch up the progress,
>> which can be counted as idleness.
>> >
>> >
>> >
>> > [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>> >
>> > [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>> >
>> > [3]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>> >
>> >
>> >
>> > Best,
>> >
>> >
>> >
>> > Dongwon
>> >
>> >
>> >
>> > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com> wrote:
>> >
>> > I'm following the example from this section:
>> >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>> >
>> >
>> >
>> > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com>
>> wrote:
>> >
>> > Other points
>> >
>> > - I'm using the kafka timestamp as event time.
>> >
>> > - The same issue happens even if I use an idle watermark.
>> >
>> >
>> >
>> > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com>
>> wrote:
>> >
>> > There are 12 Kafka partitions (to keep the structure similar to other
>> low traffic environments).
>> >
>> >
>> >
>> > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com>
>> wrote:
>> >
>> > Hi.
>> >
>> >
>> >
>> > I'm running a backfill from a kafka topic with very few records spread
>> across a few days.  I'm seeing a case where the records coming from a kafka
>> source have a watermark that's more recent (by hours) than the event time.
>> I haven't seen this before when running this.  This violates what I'd
>> assume the kafka source would do.
>> >
>> >
>> >
>> > Example problem:
>> >
>> > 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
>> are separated by a longer time period.
>> >
>> > 2.  My first operator after the FlinkKafkaConsumer sees:
>> >
>> >    context.timestamp() = 1000
>> >
>> >    context.timerService().currentWatermark() = 500000
>> >
>> >
>> >
>> > Details about how I'm running this:
>> >
>> > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>> >
>> > - I'm using FlinkKafkaConsumer
>> >
>> > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>> idleness settings.
>> >
>> > - I'm running similar code in all the environments.  The main
>> difference is low traffic.  I have not been able to reproduce this out of
>> the environment.
>> >
>> >
>> >
>> >
>> >
>> > I put the following process function right after my kafka source.
>> >
>> >
>> >
>> > --------
>> >
>> >
>> > AfterSource
>> >
>> > ts=1647274892728
>> > watermark=1647575140007
>> > record=...
>> >
>> >
>> >
>> >
>> > public static class TextLog extends ProcessFunction<Record, Record> {
>> >     private final String label;
>> >     public TextLogDeliveryLog(String label) {
>> >         this.label = label;
>> >     }
>> >     @Override
>> >     public void processElement(Record record, Context context,
>> Collector<Record> collector) throws Exception {
>> >         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>> >                 label, context.timestamp(),
>> context.timerService().currentWatermark(), record);
>> >         collector.collect(deliveryLog);
>> >     }
>> > }
>> >
>> > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>> >
>> > This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>>

Re: Weird Flink Kafka source watermark behavior

Posted by Jin Yi <ji...@promoted.ai>.
based on symptoms/observations on the first operator (LogRequestFilter)
watermark and event timestamps, it does seem like it's the bug.  things
track fine (timestamp > watermark) for the first batch of events, then the
event timestamps go back into the past and are "late".

looks like the 1.14 backport just got in 11 days ago (
https://github.com/apache/flink/pull/19128).  is there a way to easily test
this fix locally?  based on the threads, should i just move back to
FlinkKafkaConsumer until 1.14.5?

On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <re...@gmail.com> wrote:

> Hi Jin,
>
> If you are using new FLIP-27 sources like KafkaSource, per-partition
> watermark (or per-split watermark) is a default feature integrated in
> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
> happens during the first fetch of the source that records in the first
> split pushes the watermark far away, then records from other splits will be
> treated as late events.
>
> [1] https://issues.apache.org/jira/browse/FLINK-26018
>
> Best regards,
>
> Qingsheng
>
>
> > On Apr 8, 2022, at 15:54, Jin Yi <ji...@promoted.ai> wrote:
> >
> > how should the code look like to verify we're using per-partition
> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
> 1.14.4?
> >
> > we currently have it looking like:
> >
> > streamExecutionEnvironment.fromSource(
> >    KafkaSource.<T>builder().....build(),
> >    watermarkStrategy,
> >    "whatever",
> >    typeInfo);
> >
> > when running this job with the streamExecutionEnviornment parallelism
> set to 1, and the kafka source having 30 partitions, i'm seeing weird
> behaviors where the first operator after this source consumes events out of
> order (and therefore, violates watermarks).  the operator simply checks to
> see what "type" of event something is and uses side outputs to output the
> type-specific messages.  here's a snippet of the event timestamp going back
> before the current watermark (first instance of going backwards in time in
> bold):
> >
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> > 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> > 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> > 2022-04-08 05:47:06,317 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> >
> >
> >
> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com> wrote:
> > I dove deeper.  I wasn't actually using per-partition watermarks.  Thank
> you for the help!
> >
> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com> wrote:
> > Thanks, Thias and Dongwon.
> >
> > I'll keep debugging this with the idle watermark turned off.
> >
> > Next TODOs:
> > - Verify that we’re using per-partition watermarks.  Our code matches
> the example but maybe something is disabling it.
> > - Enable logging of partition-consumer assignment, to see if that is the
> cause of the problem.
> > - Look at adding flags to set the source parallelism to see if that
> fixes the issue.
> >
> > Yes, I've seen Flink talks on creating our own watermarks through
> Kafka.  Sounds like a good idea.
> >
> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com>
> wrote:
> > I totally agree with Schwalbe that per-partition watermarking allows #
> source tasks < # kafka partitions.
> >
> > Otherwise, Dan, you should suspect other possibilities like what
> Schwalbe said.
> >
> > Best,
> >
> > Dongwon
> >
> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
> Matthias.Schwalbe@viseca.ch> wrote:
> > Hi San, Dongwon,
> >
> >
> >
> > I share the opinion that when per-partition watermarking is enabled, you
> should observe correct behavior … would be interesting to see why it does
> not work for you.
> >
> >
> >
> > I’d like to clear one tiny misconception here when you write:
> >
> >
> >
> > >> - The same issue happens even if I use an idle watermark.
> >
> >
> >
> > You would expect to see glitches with watermarking when you enable
> idleness.
> >
> > Idleness sort of trades watermark correctness for reduces latency when
> processing timers (much simplified).
> >
> > With idleness enabled you have no guaranties whatsoever as to the
> quality of watermarks (which might be ok in some cases).
> >
> > BTW we dominantly use a mix of fast and slow sources (that only update
> once a day) which hand-pimped watermarking and late event processing, and
> enabling idleness would break everything.
> >
> >
> >
> > Oversight put aside things should work the way you implemented it.
> >
> >
> >
> > One thing I could imagine to be a cause is
> >
> >       • that over time the kafka partitions get reassigned  to different
> consumer subtasks which would probably stress correct recalculation of
> watermarks. Hence #partition == number subtask might reduce the problem
> >       • can you enable logging of partition-consumer assignment, to see
> if that is the cause of the problem
> >       • also involuntary restarts of the job can cause havoc as this
> resets watermarking
> >
> >
> > I’ll be off next week, unable to take part in the active discussion …
> >
> >
> >
> > Sincere greetings
> >
> >
> >
> > Thias
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > From: Dan Hill <qu...@gmail.com>
> > Sent: Freitag, 18. März 2022 08:23
> > To: Dongwon Kim <ea...@gmail.com>
> > Cc: user <us...@flink.apache.org>
> > Subject: Re: Weird Flink Kafka source watermark behavior
> >
> >
> >
> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> >
> >
> >
> > I'll try forcing # source tasks = # partitions tomorrow.
> >
> >
> >
> > Thank you, Dongwon, for all of your help!
> >
> >
> >
> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com>
> wrote:
> >
> > I believe your job with per-partition watermarking should be working
> okay even in a backfill scenario.
> >
> >
> >
> > BTW, is the problem still observed even with # sour tasks = # partitions?
> >
> >
> >
> > For committers:
> >
> > Is there a way to confirm that per-partition watermarking is used in TM
> log?
> >
> >
> >
> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com> wrote:
> >
> > I hit this using event processing and no idleness detection.  The same
> issue happens if I enable idleness.
> >
> >
> >
> > My code matches the code example for per-partition watermarking.
> >
> >
> >
> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com>
> wrote:
> >
> > Hi Dan,
> >
> >
> >
> > I'm quite confused as you already use per-partition watermarking.
> >
> >
> >
> > What I meant in the reply is
> >
> > - If you don't use per-partition watermarking, # tasks < # partitions
> can cause the problem for backfill jobs.
> >
> > - If you don't use per-partition watermarking, # tasks = # partitions is
> going to be okay even for backfill jobs.
> >
> > - If you use per-partition watermarking, # tasks < # partitions
> shouldn't cause any problems unless you turn on the idleness detection.
> >
> >
> >
> > Regarding the idleness detection which is based on processing time, what
> is your setting? If you set the value to 10 seconds for example, you'll
> face the same problem unless the watermark of your backfill job catches up
> real-time within 10 seconds. If you increase the value to 1 minute, your
> backfill job should catch up real-time within 1 minute.
> >
> >
> >
> > Best,
> >
> >
> >
> > Dongwon
> >
> >
> >
> >
> >
> > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com> wrote:
> >
> > Thanks Dongwon!
> >
> >
> >
> > Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
> tasks < # kafka partitions.  This should be called out in the docs or the
> bug should be fixed.
> >
> >
> >
> > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com>
> wrote:
> >
> > Hi Dan,
> >
> >
> >
> > Do you use the per-partition watermarking explained in [1]?
> >
> > I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks < # kafka partitions.
> >
> > - When # source tasks = # kafka partitions, the backfill job works as
> expected.
> >
> > - When # source tasks < # kafka partitions, a Kafka consumer consumes
> multiple partitions. This case can destroying the per-partition patterns as
> explained in [2].
> >
> >
> >
> > Hope this helps.
> >
> >
> >
> > p.s. If you plan to use the per-partition watermarking, be aware that
> idleness detection [3] can cause another problem when you run a backfill
> job. Kafka source tasks in a backfill job seem to read a batch of records
> from Kafka and then wait for downstream tasks to catch up the progress,
> which can be counted as idleness.
> >
> >
> >
> > [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> >
> > [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> >
> > [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> >
> >
> >
> > Best,
> >
> >
> >
> > Dongwon
> >
> >
> >
> > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com> wrote:
> >
> > I'm following the example from this section:
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> >
> >
> >
> > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com> wrote:
> >
> > Other points
> >
> > - I'm using the kafka timestamp as event time.
> >
> > - The same issue happens even if I use an idle watermark.
> >
> >
> >
> > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com> wrote:
> >
> > There are 12 Kafka partitions (to keep the structure similar to other
> low traffic environments).
> >
> >
> >
> > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com> wrote:
> >
> > Hi.
> >
> >
> >
> > I'm running a backfill from a kafka topic with very few records spread
> across a few days.  I'm seeing a case where the records coming from a kafka
> source have a watermark that's more recent (by hours) than the event time.
> I haven't seen this before when running this.  This violates what I'd
> assume the kafka source would do.
> >
> >
> >
> > Example problem:
> >
> > 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
> are separated by a longer time period.
> >
> > 2.  My first operator after the FlinkKafkaConsumer sees:
> >
> >    context.timestamp() = 1000
> >
> >    context.timerService().currentWatermark() = 500000
> >
> >
> >
> > Details about how I'm running this:
> >
> > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
> >
> > - I'm using FlinkKafkaConsumer
> >
> > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
> settings.
> >
> > - I'm running similar code in all the environments.  The main difference
> is low traffic.  I have not been able to reproduce this out of the
> environment.
> >
> >
> >
> >
> >
> > I put the following process function right after my kafka source.
> >
> >
> >
> > --------
> >
> >
> > AfterSource
> >
> > ts=1647274892728
> > watermark=1647575140007
> > record=...
> >
> >
> >
> >
> > public static class TextLog extends ProcessFunction<Record, Record> {
> >     private final String label;
> >     public TextLogDeliveryLog(String label) {
> >         this.label = label;
> >     }
> >     @Override
> >     public void processElement(Record record, Context context,
> Collector<Record> collector) throws Exception {
> >         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
> >                 label, context.timestamp(),
> context.timerService().currentWatermark(), record);
> >         collector.collect(deliveryLog);
> >     }
> > }
> >
> > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
> >
> > This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
>

Re: Weird Flink Kafka source watermark behavior

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Jin,

If you are using new FLIP-27 sources like KafkaSource, per-partition watermark (or per-split watermark) is a default feature integrated in SourceOperator. You might hit the bug described in FLINK-26018 [1], which happens during the first fetch of the source that records in the first split pushes the watermark far away, then records from other splits will be treated as late events.  

[1] https://issues.apache.org/jira/browse/FLINK-26018

Best regards,

Qingsheng


> On Apr 8, 2022, at 15:54, Jin Yi <ji...@promoted.ai> wrote:
> 
> how should the code look like to verify we're using per-partition watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in 1.14.4?
> 
> we currently have it looking like:
> 
> streamExecutionEnvironment.fromSource(
>    KafkaSource.<T>builder().....build(),
>    watermarkStrategy,
>    "whatever",
>    typeInfo);
> 
> when running this job with the streamExecutionEnviornment parallelism set to 1, and the kafka source having 30 partitions, i'm seeing weird behaviors where the first operator after this source consumes events out of order (and therefore, violates watermarks).  the operator simply checks to see what "type" of event something is and uses side outputs to output the type-specific messages.  here's a snippet of the event timestamp going back before the current watermark (first instance of going backwards in time in bold):
> 
> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> 2022-04-08 05:47:06,315 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> 2022-04-08 05:47:06,316 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> 2022-04-08 05:47:06,317 WARN  ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> 
> 
> 
> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <qu...@gmail.com> wrote:
> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank you for the help!
> 
> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <qu...@gmail.com> wrote:
> Thanks, Thias and Dongwon.
> 
> I'll keep debugging this with the idle watermark turned off.
> 
> Next TODOs:
> - Verify that we’re using per-partition watermarks.  Our code matches the example but maybe something is disabling it.
> - Enable logging of partition-consumer assignment, to see if that is the cause of the problem.
> - Look at adding flags to set the source parallelism to see if that fixes the issue.
> 
> Yes, I've seen Flink talks on creating our own watermarks through Kafka.  Sounds like a good idea.
> 
> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <ea...@gmail.com> wrote:
> I totally agree with Schwalbe that per-partition watermarking allows # source tasks < # kafka partitions. 
> 
> Otherwise, Dan, you should suspect other possibilities like what Schwalbe said.
> 
> Best,
> 
> Dongwon
> 
> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <Ma...@viseca.ch> wrote:
> Hi San, Dongwon,
> 
>  
> 
> I share the opinion that when per-partition watermarking is enabled, you should observe correct behavior … would be interesting to see why it does not work for you.
> 
>  
> 
> I’d like to clear one tiny misconception here when you write:
> 
>  
> 
> >> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> You would expect to see glitches with watermarking when you enable idleness.
> 
> Idleness sort of trades watermark correctness for reduces latency when processing timers (much simplified).
> 
> With idleness enabled you have no guaranties whatsoever as to the quality of watermarks (which might be ok in some cases).
> 
> BTW we dominantly use a mix of fast and slow sources (that only update once a day) which hand-pimped watermarking and late event processing, and enabling idleness would break everything.
> 
>  
> 
> Oversight put aside things should work the way you implemented it.
> 
>  
> 
> One thing I could imagine to be a cause is
> 
> 	• that over time the kafka partitions get reassigned  to different consumer subtasks which would probably stress correct recalculation of watermarks. Hence #partition == number subtask might reduce the problem
> 	• can you enable logging of partition-consumer assignment, to see if that is the cause of the problem
> 	• also involuntary restarts of the job can cause havoc as this resets watermarking
>  
> 
> I’ll be off next week, unable to take part in the active discussion …
> 
>  
> 
> Sincere greetings
> 
>  
> 
> Thias
> 
>  
> 
>  
> 
>  
> 
>  
> 
> From: Dan Hill <qu...@gmail.com> 
> Sent: Freitag, 18. März 2022 08:23
> To: Dongwon Kim <ea...@gmail.com>
> Cc: user <us...@flink.apache.org>
> Subject: Re: Weird Flink Kafka source watermark behavior
> 
>  
> 
> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> 
>  
> 
> I'll try forcing # source tasks = # partitions tomorrow.
> 
>  
> 
> Thank you, Dongwon, for all of your help!
> 
>  
> 
> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <ea...@gmail.com> wrote:
> 
> I believe your job with per-partition watermarking should be working okay even in a backfill scenario. 
> 
>  
> 
> BTW, is the problem still observed even with # sour tasks = # partitions?
> 
>  
> 
> For committers:
> 
> Is there a way to confirm that per-partition watermarking is used in TM log?
> 
>  
> 
> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <qu...@gmail.com> wrote:
> 
> I hit this using event processing and no idleness detection.  The same issue happens if I enable idleness.
> 
>  
> 
> My code matches the code example for per-partition watermarking.
> 
>  
> 
> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <ea...@gmail.com> wrote:
> 
> Hi Dan,
> 
>  
> 
> I'm quite confused as you already use per-partition watermarking.
> 
>  
> 
> What I meant in the reply is
> 
> - If you don't use per-partition watermarking, # tasks < # partitions can cause the problem for backfill jobs.
> 
> - If you don't use per-partition watermarking, # tasks = # partitions is going to be okay even for backfill jobs.
> 
> - If you use per-partition watermarking, # tasks < # partitions shouldn't cause any problems unless you turn on the idleness detection.
> 
>  
> 
> Regarding the idleness detection which is based on processing time, what is your setting? If you set the value to 10 seconds for example, you'll face the same problem unless the watermark of your backfill job catches up real-time within 10 seconds. If you increase the value to 1 minute, your backfill job should catch up real-time within 1 minute.
> 
>  
> 
> Best,
> 
>  
> 
> Dongwon
> 
>  
> 
>  
> 
> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <qu...@gmail.com> wrote:
> 
> Thanks Dongwon!
> 
>  
> 
> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks < # kafka partitions.  This should be called out in the docs or the bug should be fixed.
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <ea...@gmail.com> wrote:
> 
> Hi Dan,
> 
>  
> 
> Do you use the per-partition watermarking explained in [1]?
> 
> I've also experienced a similar problem when running backfill jobs specifically when # source tasks < # kafka partitions. 
> 
> - When # source tasks = # kafka partitions, the backfill job works as expected.
> 
> - When # source tasks < # kafka partitions, a Kafka consumer consumes multiple partitions. This case can destroying the per-partition patterns as explained in [2].
> 
>  
> 
> Hope this helps.
> 
>  
> 
> p.s. If you plan to use the per-partition watermarking, be aware that idleness detection [3] can cause another problem when you run a backfill job. Kafka source tasks in a backfill job seem to read a batch of records from Kafka and then wait for downstream tasks to catch up the progress, which can be counted as idleness.
> 
>  
> 
> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> 
> [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> 
> [3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> 
>  
> 
> Best,
> 
>  
> 
> Dongwon
> 
>  
> 
> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <qu...@gmail.com> wrote:
> 
> I'm following the example from this section:
> 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <qu...@gmail.com> wrote:
> 
> Other points
> 
> - I'm using the kafka timestamp as event time.
> 
> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <qu...@gmail.com> wrote:
> 
> There are 12 Kafka partitions (to keep the structure similar to other low traffic environments).
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <qu...@gmail.com> wrote:
> 
> Hi.
> 
>  
> 
> I'm running a backfill from a kafka topic with very few records spread across a few days.  I'm seeing a case where the records coming from a kafka source have a watermark that's more recent (by hours) than the event time.  I haven't seen this before when running this.  This violates what I'd assume the kafka source would do.
> 
>  
> 
> Example problem:
> 
> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times are separated by a longer time period.
> 
> 2.  My first operator after the FlinkKafkaConsumer sees:
> 
>    context.timestamp() = 1000
> 
>    context.timerService().currentWatermark() = 500000
> 
>  
> 
> Details about how I'm running this:
> 
> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
> 
> - I'm using FlinkKafkaConsumer
> 
> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness settings.
> 
> - I'm running similar code in all the environments.  The main difference is low traffic.  I have not been able to reproduce this out of the environment.
> 
>  
> 
>  
> 
> I put the following process function right after my kafka source.
> 
>  
> 
> --------
> 
> 
> AfterSource
> 
> ts=1647274892728
> watermark=1647575140007
> record=...
> 
>  
> 
> 
> public static class TextLog extends ProcessFunction<Record, Record> {
>     private final String label;
>     public TextLogDeliveryLog(String label) {
>         this.label = label;
>     }
>     @Override
>     public void processElement(Record record, Context context, Collector<Record> collector) throws Exception {
>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>                 label, context.timestamp(), context.timerService().currentWatermark(), record);
>         collector.collect(deliveryLog);
>     }
> }
> 
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.
> 
> This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.