You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Alexander Sorokoumov <as...@confluent.io.INVALID> on 2023/04/05 01:39:01 UTC

[DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

Hello Flink community,

I would like to discuss if it is worth adding EXACTLY_ONCE delivery
semantics to upsert-kafka connector. According to upsert-kafka docs[1] and
ReducingUpsertSink javadoc[2], the connector is correct even with duplicate
records under AT_LEAST_ONCE because the records are idempotent, and the
read path de-duplicates them. However, there are at least 2 reasons to
configure the connector with EXACTLY_ONCE:

1. There might be other non-Flink topic consumers that would rather not
have duplicated records.
2. Multiple upsert-kafka producers might cause keys to roll back to
previous values. Consider a scenario where 2 producing jobs A and B write
to the same topic with AT_LEAST_ONCE, and a consuming job reads from the
topic. Both producers write unique, monotonically increasing sequences to
the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
x=b1,b2,b3,b4,b5,.... With this setup, we can have the following sequence:

   1. Job A produces x=a5.
   2. Job B produces x=b5.
   3. Job A produces the duplicate write x=5.

The consuming job would observe x going to a5, then to b5, then back a5.
EXACTLY_ONCE would prevent this behavior.

I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP patch
to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
community thinks about it before moving forward with it.

Thanks,
Alexander

1.
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
2.
https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

Posted by Alexander Sorokoumov <as...@confluent.io.INVALID>.
Hi Jark, John,

Thank you for the discussion! I will proceed with completing the patch that
adds exactly-once to upsert-kafka connector.

Best,
Alexander

On Wed, Apr 12, 2023 at 12:21 AM Jark Wu <im...@gmail.com> wrote:

> Hi John,
>
> Thank you for your valuable input. It sounds reasonable to me.
>
> From this point of view, the exactly-once is used to guarantee transaction
> semantics other than avoid duplication/upserts.
> This is similar to the JDBC connectors that already support eventual
> consistency with idempotent updates, but we still add the support of
> 2PC[1].
>
> Best,
> Jark
>
> [1]:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink
>
> On Wed, 12 Apr 2023 at 10:36, John Roesler <vv...@apache.org> wrote:
>
> > Hi Jark,
> >
> > I hope you don’t mind if I chime in.
> >
> > You have a good point that the sequence of upserts will eventually
> > converge to the correct value under the at-least-once delivery guarantee,
> > but it can still be important to avoid passing on uncommitted results.
> Some
> > thoughts, numbered for reference:
> >
> > 1. Most generally, if some result R is written to the sink topic, but
> then
> > the job fails before a checkpoint, rolls back, and reprocesses, producing
> > R’, then it is incorrect to call R an “upsert”. In fact, as far as the
> > system is concerned, R never happened at all (because it was part of a
> > rolled-back batch of processing).
> >
> > 2. Readers may reasonably wish to impose some meaning on the sequence of
> > upserts itself, so including aborted results can lead to wrong semantics
> > downstream. Eg: “how many times has ‘x’ been updated today”?
> >
> > 3. Note that processing may not be deterministic over failures, and,
> > building on (2), readers may have an expectation that every record in the
> > topic corresponds to a real value that was associated with that key at
> some
> > point. Eg, if we start with x=1, checkpoint, then produce x=99, crash,
> > restart and produce x=2. Under at-least-once, the history of x
> is[1,99,2],
> > while exactly-once would give the correct history of [1,2]. If we set up
> an
> > alert if the value of x is ever greater over 10, then at-least-once will
> > erroneously alert us, while exactly-once does not.
> >
> > 4. Sending results for failed processing can also cause operational
> > problems: if you’re processing a high volume of data, and you get into a
> > crash loop, you can create a flood of repeated results. I’ve seen this
> case
> > cause real world pain for people, and it’s nice to have a way to avoid
> it.
> >
> > I hope some of these examples show why a user might reasonably want to
> > configure the connector with the exactly-once guarantee.
> >
> > Thanks!
> > -John
> >
> > On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> > > Hi Alexander,
> > >
> > > Yes, Kafka’s exactly-once semantics are used to avoid duplicated
> records
> > in
> > > case of producer retries
> > > or failovers. But as I explained above, it can’t avoid intentionally
> > > duplicated records. Actually, I would
> > > like to call them "upsert records" instead of "duplicates", that's why
> > the
> > > connector is named "upsert-kafka",
> > > to make Kafka work like a database that supports updating and deleting
> by
> > > key.
> > >
> > > For example, there is a SQL query:
> > >
> > > SELECT URL, COUNT(*) page_views
> > > FROM access_logs
> > > GROUP BY URL;
> > >
> > > This is a continuous query[1] that continuously emits a new <url,
> > > page_views> record once a new URL
> > > access entry is received. The same URLs in the log may be far away and
> be
> > > processed in different checkpoints.
> > >
> > > It's easy to make upsert-kafka to support exactly-once delivery
> > guarantee,
> > > but as we discussed above,
> > > it's unnecessary to support it and we intend to expose as few
> > > configurations to users as possible.
> > >
> > >
> > > Best,
> > > Jark
> > >
> > > [1]
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
> > >
> > >
> > >
> > > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> > > <as...@confluent.io.invalid> wrote:
> > >
> > >> Hi Jark,
> > >>
> > >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with
> > idempotent
> > >> producers prevent duplicated records[1], at least in the cases when
> > >> upstream does not produce them intentionally and across checkpoints.
> > >>
> > >> Could you please elaborate or point me to the docs that explain the
> > reason
> > >> for duplicated records upstream and across checkpoints? I am
> relatively
> > new
> > >> to Flink and not aware of it. According to the kafka connector
> > >> documentation, it does support exactly once semantics by configuring '
> > >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why
> we
> > >> can't make upsert-kafka configurable in the same way to support this
> > >> delivery guarantee.
> > >>
> > >> Thank you,
> > >> Alexander
> > >>
> > >> 1.
> > >>
> > >>
> >
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
> > >> 2.
> > >>
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
> > >>
> > >>
> > >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <im...@gmail.com> wrote:
> > >>
> > >> > Hi Alexander,
> > >> >
> > >> > I’m not sure I fully understand the reasons. I left my comments
> > inline.
> > >> >
> > >> > > 1. There might be other non-Flink topic consumers that would
> rather
> > not
> > >> > have duplicated records.
> > >> >
> > >> > Exactly once can’t avoid producing duplicated records. Because the
> > >> upstream
> > >> > produces duplicated records intentionally and across checkpoints.
> > Exactly
> > >> > once
> > >> > can’t recognize duplicated records and drop duplications.  That
> means
> > >> > duplicated
> > >> > records are written into topics even if exactly-once mode is
> enabled.
> > >> >
> > >> >
> > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back
> to
> > >> > previous values.
> > >> >
> > >> > Sorry, I don’t understand how exactly once can prevent this rollback
> > >> > behavior.
> > >> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5,
> > and
> > >> > b5,
> > >> > then back a5 if jobs perform checkpoints after producing records.
> > >> >
> > >> >
> > >> > Best,
> > >> > Jark
> > >> >
> > >> >
> > >> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokoumov@confluent.io
> > >> .INVALID>
> > >> > 写道:
> > >> > >
> > >> > > Hello Flink community,
> > >> > >
> > >> > > I would like to discuss if it is worth adding EXACTLY_ONCE
> delivery
> > >> > > semantics to upsert-kafka connector. According to upsert-kafka
> > docs[1]
> > >> > and
> > >> > > ReducingUpsertSink javadoc[2], the connector is correct even with
> > >> > duplicate
> > >> > > records under AT_LEAST_ONCE because the records are idempotent,
> and
> > the
> > >> > > read path de-duplicates them. However, there are at least 2
> reasons
> > to
> > >> > > configure the connector with EXACTLY_ONCE:
> > >> > >
> > >> > > 1. There might be other non-Flink topic consumers that would
> rather
> > not
> > >> > > have duplicated records.
> > >> > > 2. Multiple upsert-kafka producers might cause keys to roll back
> to
> > >> > > previous values. Consider a scenario where 2 producing jobs A and
> B
> > >> write
> > >> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads
> from
> > >> the
> > >> > > topic. Both producers write unique, monotonically increasing
> > sequences
> > >> to
> > >> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> > >> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
> > >> > sequence:
> > >> > >
> > >> > >   1. Job A produces x=a5.
> > >> > >   2. Job B produces x=b5.
> > >> > >   3. Job A produces the duplicate write x=5.
> > >> > >
> > >> > > The consuming job would observe x going to a5, then to b5, then
> back
> > >> a5.
> > >> > > EXACTLY_ONCE would prevent this behavior.
> > >> > >
> > >> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a
> > WIP
> > >> > patch
> > >> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what
> the
> > >> > > community thinks about it before moving forward with it.
> > >> > >
> > >> > > Thanks,
> > >> > > Alexander
> > >> > >
> > >> > > 1.
> > >> > >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> > >> > > 2.
> > >> > >
> > >> >
> > >>
> >
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
> > >> >
> > >> >
> > >>
> >
>

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

Posted by Jark Wu <im...@gmail.com>.
Hi John,

Thank you for your valuable input. It sounds reasonable to me.

From this point of view, the exactly-once is used to guarantee transaction
semantics other than avoid duplication/upserts.
This is similar to the JDBC connectors that already support eventual
consistency with idempotent updates, but we still add the support of 2PC[1].

Best,
Jark

[1]:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink

On Wed, 12 Apr 2023 at 10:36, John Roesler <vv...@apache.org> wrote:

> Hi Jark,
>
> I hope you don’t mind if I chime in.
>
> You have a good point that the sequence of upserts will eventually
> converge to the correct value under the at-least-once delivery guarantee,
> but it can still be important to avoid passing on uncommitted results. Some
> thoughts, numbered for reference:
>
> 1. Most generally, if some result R is written to the sink topic, but then
> the job fails before a checkpoint, rolls back, and reprocesses, producing
> R’, then it is incorrect to call R an “upsert”. In fact, as far as the
> system is concerned, R never happened at all (because it was part of a
> rolled-back batch of processing).
>
> 2. Readers may reasonably wish to impose some meaning on the sequence of
> upserts itself, so including aborted results can lead to wrong semantics
> downstream. Eg: “how many times has ‘x’ been updated today”?
>
> 3. Note that processing may not be deterministic over failures, and,
> building on (2), readers may have an expectation that every record in the
> topic corresponds to a real value that was associated with that key at some
> point. Eg, if we start with x=1, checkpoint, then produce x=99, crash,
> restart and produce x=2. Under at-least-once, the history of x is[1,99,2],
> while exactly-once would give the correct history of [1,2]. If we set up an
> alert if the value of x is ever greater over 10, then at-least-once will
> erroneously alert us, while exactly-once does not.
>
> 4. Sending results for failed processing can also cause operational
> problems: if you’re processing a high volume of data, and you get into a
> crash loop, you can create a flood of repeated results. I’ve seen this case
> cause real world pain for people, and it’s nice to have a way to avoid it.
>
> I hope some of these examples show why a user might reasonably want to
> configure the connector with the exactly-once guarantee.
>
> Thanks!
> -John
>
> On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> > Hi Alexander,
> >
> > Yes, Kafka’s exactly-once semantics are used to avoid duplicated records
> in
> > case of producer retries
> > or failovers. But as I explained above, it can’t avoid intentionally
> > duplicated records. Actually, I would
> > like to call them "upsert records" instead of "duplicates", that's why
> the
> > connector is named "upsert-kafka",
> > to make Kafka work like a database that supports updating and deleting by
> > key.
> >
> > For example, there is a SQL query:
> >
> > SELECT URL, COUNT(*) page_views
> > FROM access_logs
> > GROUP BY URL;
> >
> > This is a continuous query[1] that continuously emits a new <url,
> > page_views> record once a new URL
> > access entry is received. The same URLs in the log may be far away and be
> > processed in different checkpoints.
> >
> > It's easy to make upsert-kafka to support exactly-once delivery
> guarantee,
> > but as we discussed above,
> > it's unnecessary to support it and we intend to expose as few
> > configurations to users as possible.
> >
> >
> > Best,
> > Jark
> >
> > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
> >
> >
> >
> > On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> > <as...@confluent.io.invalid> wrote:
> >
> >> Hi Jark,
> >>
> >> To my knowledge, Kafka's EXACTLY_ONCE transactions together with
> idempotent
> >> producers prevent duplicated records[1], at least in the cases when
> >> upstream does not produce them intentionally and across checkpoints.
> >>
> >> Could you please elaborate or point me to the docs that explain the
> reason
> >> for duplicated records upstream and across checkpoints? I am relatively
> new
> >> to Flink and not aware of it. According to the kafka connector
> >> documentation, it does support exactly once semantics by configuring '
> >> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
> >> can't make upsert-kafka configurable in the same way to support this
> >> delivery guarantee.
> >>
> >> Thank you,
> >> Alexander
> >>
> >> 1.
> >>
> >>
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
> >> 2.
> >>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
> >>
> >>
> >> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <im...@gmail.com> wrote:
> >>
> >> > Hi Alexander,
> >> >
> >> > I’m not sure I fully understand the reasons. I left my comments
> inline.
> >> >
> >> > > 1. There might be other non-Flink topic consumers that would rather
> not
> >> > have duplicated records.
> >> >
> >> > Exactly once can’t avoid producing duplicated records. Because the
> >> upstream
> >> > produces duplicated records intentionally and across checkpoints.
> Exactly
> >> > once
> >> > can’t recognize duplicated records and drop duplications.  That means
> >> > duplicated
> >> > records are written into topics even if exactly-once mode is enabled.
> >> >
> >> >
> >> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> >> > previous values.
> >> >
> >> > Sorry, I don’t understand how exactly once can prevent this rollback
> >> > behavior.
> >> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5,
> and
> >> > b5,
> >> > then back a5 if jobs perform checkpoints after producing records.
> >> >
> >> >
> >> > Best,
> >> > Jark
> >> >
> >> >
> >> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokoumov@confluent.io
> >> .INVALID>
> >> > 写道:
> >> > >
> >> > > Hello Flink community,
> >> > >
> >> > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery
> >> > > semantics to upsert-kafka connector. According to upsert-kafka
> docs[1]
> >> > and
> >> > > ReducingUpsertSink javadoc[2], the connector is correct even with
> >> > duplicate
> >> > > records under AT_LEAST_ONCE because the records are idempotent, and
> the
> >> > > read path de-duplicates them. However, there are at least 2 reasons
> to
> >> > > configure the connector with EXACTLY_ONCE:
> >> > >
> >> > > 1. There might be other non-Flink topic consumers that would rather
> not
> >> > > have duplicated records.
> >> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> >> > > previous values. Consider a scenario where 2 producing jobs A and B
> >> write
> >> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from
> >> the
> >> > > topic. Both producers write unique, monotonically increasing
> sequences
> >> to
> >> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> >> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
> >> > sequence:
> >> > >
> >> > >   1. Job A produces x=a5.
> >> > >   2. Job B produces x=b5.
> >> > >   3. Job A produces the duplicate write x=5.
> >> > >
> >> > > The consuming job would observe x going to a5, then to b5, then back
> >> a5.
> >> > > EXACTLY_ONCE would prevent this behavior.
> >> > >
> >> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a
> WIP
> >> > patch
> >> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
> >> > > community thinks about it before moving forward with it.
> >> > >
> >> > > Thanks,
> >> > > Alexander
> >> > >
> >> > > 1.
> >> > >
> >> >
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> >> > > 2.
> >> > >
> >> >
> >>
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
> >> >
> >> >
> >>
>

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

Posted by John Roesler <vv...@apache.org>.
Hi Jark,

I hope you don’t mind if I chime in.

You have a good point that the sequence of upserts will eventually converge to the correct value under the at-least-once delivery guarantee, but it can still be important to avoid passing on uncommitted results. Some thoughts, numbered for reference:

1. Most generally, if some result R is written to the sink topic, but then the job fails before a checkpoint, rolls back, and reprocesses, producing R’, then it is incorrect to call R an “upsert”. In fact, as far as the system is concerned, R never happened at all (because it was part of a rolled-back batch of processing).

2. Readers may reasonably wish to impose some meaning on the sequence of upserts itself, so including aborted results can lead to wrong semantics downstream. Eg: “how many times has ‘x’ been updated today”?

3. Note that processing may not be deterministic over failures, and, building on (2), readers may have an expectation that every record in the topic corresponds to a real value that was associated with that key at some point. Eg, if we start with x=1, checkpoint, then produce x=99, crash, restart and produce x=2. Under at-least-once, the history of x is[1,99,2], while exactly-once would give the correct history of [1,2]. If we set up an alert if the value of x is ever greater over 10, then at-least-once will erroneously alert us, while exactly-once does not. 

4. Sending results for failed processing can also cause operational problems: if you’re processing a high volume of data, and you get into a crash loop, you can create a flood of repeated results. I’ve seen this case cause real world pain for people, and it’s nice to have a way to avoid it.

I hope some of these examples show why a user might reasonably want to configure the connector with the exactly-once guarantee. 

Thanks!
-John

On Sat, Apr 8, 2023, at 10:03, Jark Wu wrote:
> Hi Alexander,
>
> Yes, Kafka’s exactly-once semantics are used to avoid duplicated records in
> case of producer retries
> or failovers. But as I explained above, it can’t avoid intentionally
> duplicated records. Actually, I would
> like to call them "upsert records" instead of "duplicates", that's why the
> connector is named "upsert-kafka",
> to make Kafka work like a database that supports updating and deleting by
> key.
>
> For example, there is a SQL query:
>
> SELECT URL, COUNT(*) page_views
> FROM access_logs
> GROUP BY URL;
>
> This is a continuous query[1] that continuously emits a new <url,
> page_views> record once a new URL
> access entry is received. The same URLs in the log may be far away and be
> processed in different checkpoints.
>
> It's easy to make upsert-kafka to support exactly-once delivery guarantee,
> but as we discussed above,
> it's unnecessary to support it and we intend to expose as few
> configurations to users as possible.
>
>
> Best,
> Jark
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/
>
>
>
> On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
> <as...@confluent.io.invalid> wrote:
>
>> Hi Jark,
>>
>> To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent
>> producers prevent duplicated records[1], at least in the cases when
>> upstream does not produce them intentionally and across checkpoints.
>>
>> Could you please elaborate or point me to the docs that explain the reason
>> for duplicated records upstream and across checkpoints? I am relatively new
>> to Flink and not aware of it. According to the kafka connector
>> documentation, it does support exactly once semantics by configuring '
>> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
>> can't make upsert-kafka configurable in the same way to support this
>> delivery guarantee.
>>
>> Thank you,
>> Alexander
>>
>> 1.
>>
>> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
>> 2.
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
>>
>>
>> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <im...@gmail.com> wrote:
>>
>> > Hi Alexander,
>> >
>> > I’m not sure I fully understand the reasons. I left my comments inline.
>> >
>> > > 1. There might be other non-Flink topic consumers that would rather not
>> > have duplicated records.
>> >
>> > Exactly once can’t avoid producing duplicated records. Because the
>> upstream
>> > produces duplicated records intentionally and across checkpoints. Exactly
>> > once
>> > can’t recognize duplicated records and drop duplications.  That means
>> > duplicated
>> > records are written into topics even if exactly-once mode is enabled.
>> >
>> >
>> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
>> > previous values.
>> >
>> > Sorry, I don’t understand how exactly once can prevent this rollback
>> > behavior.
>> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and
>> > b5,
>> > then back a5 if jobs perform checkpoints after producing records.
>> >
>> >
>> > Best,
>> > Jark
>> >
>> >
>> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokoumov@confluent.io
>> .INVALID>
>> > 写道:
>> > >
>> > > Hello Flink community,
>> > >
>> > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery
>> > > semantics to upsert-kafka connector. According to upsert-kafka docs[1]
>> > and
>> > > ReducingUpsertSink javadoc[2], the connector is correct even with
>> > duplicate
>> > > records under AT_LEAST_ONCE because the records are idempotent, and the
>> > > read path de-duplicates them. However, there are at least 2 reasons to
>> > > configure the connector with EXACTLY_ONCE:
>> > >
>> > > 1. There might be other non-Flink topic consumers that would rather not
>> > > have duplicated records.
>> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
>> > > previous values. Consider a scenario where 2 producing jobs A and B
>> write
>> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from
>> the
>> > > topic. Both producers write unique, monotonically increasing sequences
>> to
>> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
>> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
>> > sequence:
>> > >
>> > >   1. Job A produces x=a5.
>> > >   2. Job B produces x=b5.
>> > >   3. Job A produces the duplicate write x=5.
>> > >
>> > > The consuming job would observe x going to a5, then to b5, then back
>> a5.
>> > > EXACTLY_ONCE would prevent this behavior.
>> > >
>> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP
>> > patch
>> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
>> > > community thinks about it before moving forward with it.
>> > >
>> > > Thanks,
>> > > Alexander
>> > >
>> > > 1.
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
>> > > 2.
>> > >
>> >
>> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
>> >
>> >
>>

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

Posted by Jark Wu <im...@gmail.com>.
Hi Alexander,

Yes, Kafka’s exactly-once semantics are used to avoid duplicated records in
case of producer retries
or failovers. But as I explained above, it can’t avoid intentionally
duplicated records. Actually, I would
like to call them "upsert records" instead of "duplicates", that's why the
connector is named "upsert-kafka",
to make Kafka work like a database that supports updating and deleting by
key.

For example, there is a SQL query:

SELECT URL, COUNT(*) page_views
FROM access_logs
GROUP BY URL;

This is a continuous query[1] that continuously emits a new <url,
page_views> record once a new URL
access entry is received. The same URLs in the log may be far away and be
processed in different checkpoints.

It's easy to make upsert-kafka to support exactly-once delivery guarantee,
but as we discussed above,
it's unnecessary to support it and we intend to expose as few
configurations to users as possible.


Best,
Jark

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/



On Sat, 8 Apr 2023 at 02:42, Alexander Sorokoumov
<as...@confluent.io.invalid> wrote:

> Hi Jark,
>
> To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent
> producers prevent duplicated records[1], at least in the cases when
> upstream does not produce them intentionally and across checkpoints.
>
> Could you please elaborate or point me to the docs that explain the reason
> for duplicated records upstream and across checkpoints? I am relatively new
> to Flink and not aware of it. According to the kafka connector
> documentation, it does support exactly once semantics by configuring '
> sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
> can't make upsert-kafka configurable in the same way to support this
> delivery guarantee.
>
> Thank you,
> Alexander
>
> 1.
>
> https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
> 2.
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees
>
>
> On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <im...@gmail.com> wrote:
>
> > Hi Alexander,
> >
> > I’m not sure I fully understand the reasons. I left my comments inline.
> >
> > > 1. There might be other non-Flink topic consumers that would rather not
> > have duplicated records.
> >
> > Exactly once can’t avoid producing duplicated records. Because the
> upstream
> > produces duplicated records intentionally and across checkpoints. Exactly
> > once
> > can’t recognize duplicated records and drop duplications.  That means
> > duplicated
> > records are written into topics even if exactly-once mode is enabled.
> >
> >
> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> > previous values.
> >
> > Sorry, I don’t understand how exactly once can prevent this rollback
> > behavior.
> > Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and
> > b5,
> > then back a5 if jobs perform checkpoints after producing records.
> >
> >
> > Best,
> > Jark
> >
> >
> > > 2023年4月5日 09:39,Alexander Sorokoumov <asorokoumov@confluent.io
> .INVALID>
> > 写道:
> > >
> > > Hello Flink community,
> > >
> > > I would like to discuss if it is worth adding EXACTLY_ONCE delivery
> > > semantics to upsert-kafka connector. According to upsert-kafka docs[1]
> > and
> > > ReducingUpsertSink javadoc[2], the connector is correct even with
> > duplicate
> > > records under AT_LEAST_ONCE because the records are idempotent, and the
> > > read path de-duplicates them. However, there are at least 2 reasons to
> > > configure the connector with EXACTLY_ONCE:
> > >
> > > 1. There might be other non-Flink topic consumers that would rather not
> > > have duplicated records.
> > > 2. Multiple upsert-kafka producers might cause keys to roll back to
> > > previous values. Consider a scenario where 2 producing jobs A and B
> write
> > > to the same topic with AT_LEAST_ONCE, and a consuming job reads from
> the
> > > topic. Both producers write unique, monotonically increasing sequences
> to
> > > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> > > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
> > sequence:
> > >
> > >   1. Job A produces x=a5.
> > >   2. Job B produces x=b5.
> > >   3. Job A produces the duplicate write x=5.
> > >
> > > The consuming job would observe x going to a5, then to b5, then back
> a5.
> > > EXACTLY_ONCE would prevent this behavior.
> > >
> > > I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP
> > patch
> > > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
> > > community thinks about it before moving forward with it.
> > >
> > > Thanks,
> > > Alexander
> > >
> > > 1.
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> > > 2.
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
> >
> >
>

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

Posted by Alexander Sorokoumov <as...@confluent.io.INVALID>.
Hi Jark,

To my knowledge, Kafka's EXACTLY_ONCE transactions together with idempotent
producers prevent duplicated records[1], at least in the cases when
upstream does not produce them intentionally and across checkpoints.

Could you please elaborate or point me to the docs that explain the reason
for duplicated records upstream and across checkpoints? I am relatively new
to Flink and not aware of it. According to the kafka connector
documentation, it does support exactly once semantics by configuring '
sink.delivery-guarantee'='exactly-once'[2]. It is not clear to me why we
can't make upsert-kafka configurable in the same way to support this
delivery guarantee.

Thank you,
Alexander

1.
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
2.
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/#consistency-guarantees


On Fri, Apr 7, 2023 at 3:44 AM Jark Wu <im...@gmail.com> wrote:

> Hi Alexander,
>
> I’m not sure I fully understand the reasons. I left my comments inline.
>
> > 1. There might be other non-Flink topic consumers that would rather not
> have duplicated records.
>
> Exactly once can’t avoid producing duplicated records. Because the upstream
> produces duplicated records intentionally and across checkpoints. Exactly
> once
> can’t recognize duplicated records and drop duplications.  That means
> duplicated
> records are written into topics even if exactly-once mode is enabled.
>
>
> > 2. Multiple upsert-kafka producers might cause keys to roll back to
> previous values.
>
> Sorry, I don’t understand how exactly once can prevent this rollback
> behavior.
> Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and
> b5,
> then back a5 if jobs perform checkpoints after producing records.
>
>
> Best,
> Jark
>
>
> > 2023年4月5日 09:39,Alexander Sorokoumov <as...@confluent.io.INVALID>
> 写道:
> >
> > Hello Flink community,
> >
> > I would like to discuss if it is worth adding EXACTLY_ONCE delivery
> > semantics to upsert-kafka connector. According to upsert-kafka docs[1]
> and
> > ReducingUpsertSink javadoc[2], the connector is correct even with
> duplicate
> > records under AT_LEAST_ONCE because the records are idempotent, and the
> > read path de-duplicates them. However, there are at least 2 reasons to
> > configure the connector with EXACTLY_ONCE:
> >
> > 1. There might be other non-Flink topic consumers that would rather not
> > have duplicated records.
> > 2. Multiple upsert-kafka producers might cause keys to roll back to
> > previous values. Consider a scenario where 2 producing jobs A and B write
> > to the same topic with AT_LEAST_ONCE, and a consuming job reads from the
> > topic. Both producers write unique, monotonically increasing sequences to
> > the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> > x=b1,b2,b3,b4,b5,.... With this setup, we can have the following
> sequence:
> >
> >   1. Job A produces x=a5.
> >   2. Job B produces x=b5.
> >   3. Job A produces the duplicate write x=5.
> >
> > The consuming job would observe x going to a5, then to b5, then back a5.
> > EXACTLY_ONCE would prevent this behavior.
> >
> > I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP
> patch
> > to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
> > community thinks about it before moving forward with it.
> >
> > Thanks,
> > Alexander
> >
> > 1.
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> > 2.
> >
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37
>
>

Re: [DISCUSS] EXACTLY_ONCE delivery semantics for upsert-kafka connector

Posted by Jark Wu <im...@gmail.com>.
Hi Alexander,

I’m not sure I fully understand the reasons. I left my comments inline. 

> 1. There might be other non-Flink topic consumers that would rather not
have duplicated records.

Exactly once can’t avoid producing duplicated records. Because the upstream
produces duplicated records intentionally and across checkpoints. Exactly once
can’t recognize duplicated records and drop duplications.  That means duplicated 
records are written into topics even if exactly-once mode is enabled. 


> 2. Multiple upsert-kafka producers might cause keys to roll back to
previous values.

Sorry, I don’t understand how exactly once can prevent this rollback behavior. 
Even in your example with EXACTLY_ONCE enabled, the x will go to a5, and b5, 
then back a5 if jobs perform checkpoints after producing records. 


Best,
Jark


> 2023年4月5日 09:39,Alexander Sorokoumov <as...@confluent.io.INVALID> 写道:
> 
> Hello Flink community,
> 
> I would like to discuss if it is worth adding EXACTLY_ONCE delivery
> semantics to upsert-kafka connector. According to upsert-kafka docs[1] and
> ReducingUpsertSink javadoc[2], the connector is correct even with duplicate
> records under AT_LEAST_ONCE because the records are idempotent, and the
> read path de-duplicates them. However, there are at least 2 reasons to
> configure the connector with EXACTLY_ONCE:
> 
> 1. There might be other non-Flink topic consumers that would rather not
> have duplicated records.
> 2. Multiple upsert-kafka producers might cause keys to roll back to
> previous values. Consider a scenario where 2 producing jobs A and B write
> to the same topic with AT_LEAST_ONCE, and a consuming job reads from the
> topic. Both producers write unique, monotonically increasing sequences to
> the same key. Job A writes x=a1,a2,a3,a4,a5… Job B writes
> x=b1,b2,b3,b4,b5,.... With this setup, we can have the following sequence:
> 
>   1. Job A produces x=a5.
>   2. Job B produces x=b5.
>   3. Job A produces the duplicate write x=5.
> 
> The consuming job would observe x going to a5, then to b5, then back a5.
> EXACTLY_ONCE would prevent this behavior.
> 
> I created https://issues.apache.org/jira/browse/FLINK-31408 and a WIP patch
> to add EXACTLY_ONCE to upsert-kafka, but would like to know what the
> community thinks about it before moving forward with it.
> 
> Thanks,
> Alexander
> 
> 1.
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/#consistency-guarantees
> 2.
> https://github.com/apache/flink-connector-kafka/blob/40cf9994dd847c13602acf1f90895cf9f89b2ce6/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java#L31-L37