You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Chris Egerton <ch...@confluent.io> on 2021/01/25 15:29:29 UTC

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Hi Ning,

Apologies for the delay in response. I realized after publishing the KIP
that there were some finer points I hadn't considered in my design and that
it was far from providing exactly-once guarantees. In response to your
questions:

1) The goal of the KIP is to ensure the accuracy of the offsets that the
framework provides to source tasks; if tasks choose to manage offsets
outside of the framework, they're on their own. So, the source records and
their offsets will be written/committed to Kafka, and the task will be
provided them on startup, but it (or really, its predecessor) may not have
had time to do cleanup on resources associated with those records before
being killed.

2) I've cleaned up this section and removed the pseudocode as it seems too
low-level to be worth discussing in a KIP. I'll try to summarize here,
though: task.commit() is not what causes offsets provided to the framework
by tasks to be committed; it's simply a follow-up hook provided out of
convenience to tasks so that they can clean up resources associated with
the most recent batch of records (by ack'ing JMS messages, for example).
The Connect framework uses an internal Kafka topic to store source task
offsets.

3) In order to benefit from the improvements proposed in this KIP, yes, the
single source-of-truth should be the OffsetStorageReader provided to the
task by the Connect framework, at least at startup. After startup, tasks
should ideally bookkeep their own offset progress as each request to read
offsets requires a read to the end of the offsets topic, which can be
expensive in some cases.

I've since expanded the KIP to include general exactly-once support for
source connectors that should cover the points I neglected in my initial
design, so it should be ready for review again.

Cheers,

Chris

On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang <ni...@gmail.com> wrote:

> Hello Chris,
>
> That is an interesting KIP. I have a couple of questions:
>
> (1) in section of pseudo-code, what if the failure happens between 4(b)
> and 5(a), meaning after the producer commit the transaction, and before
> task.commitRecord().
>
> (2) in section "source task life time",  what is the difference between
> "commit offset" and "offsets to commit"? Given that the offset storage can
> be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could only
> produce to a kafka topic, are / is the topic(s) the same ? (the topic that
> producer writes offsets to and the topic task.commit() to)
>
> (3) for JDBC source task, it relies on `context.offsetStorageReader()` (
> https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140)
> to retrieve the previously committed offset (if from a fresh start or
> resume from failure). so it seems that the single-source-of-truth of where
> to consume from last known / committed position stored in offset storage
> (e.g. kafka topic) managed by the periodic task.commit()?
>
> On 2020/05/22 06:20:51, Chris Egerton <ch...@confluent.io> wrote:
> > Hi all,
> >
> > I know it's a busy time with the upcoming 2.6 release and I don't expect
> > this to get a lot of traction until that's done, but I've published a KIP
> > for allowing atomic commit of offsets and records for source connectors
> and
> > would appreciate your feedback:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> >
> > This feature should make it possible to implement source connectors with
> > exactly-once delivery guarantees, and even allow a wide range of existing
> > source connectors to provide exactly-once delivery guarantees with no
> > changes required.
> >
> > Cheers,
> >
> > Chris
> >
>

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Posted by Chris Egerton <ch...@confluent.io>.
Hi Jason,

Fine by me; I wanted to be conservative with the return type but the case
you've outlined sounds enticing enough that adding a little flexibility to
the API seems warranted. I've added your suggestion to the proposed admin
API expansions; let me know what you think.

Cheers,

Chris

On Mon, Feb 1, 2021 at 3:38 PM Jason Gustafson <ja...@confluent.io> wrote:

> Hi Chris,
>
> If we add the new `fenceProducers` admin API, can we return the information
> from the `InitProducerId` response (i.e. producer id and epoch)? We may not
> have a use case for it yet, but I don't see any harm exposing it for the
> future. For example, we could allow this state to be provided to the
> Producer instance on initialization, which would save the need for the
> second `InitProducerId` request in the current proposal. Also, the `Void`
> type does give us much room for extension.
>
> -Jason
>
>
> On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton <ch...@confluent.io> wrote:
>
> > Hi Ning,
> >
> > Apologies for the delay in response. I realized after publishing the KIP
> > that there were some finer points I hadn't considered in my design and
> that
> > it was far from providing exactly-once guarantees. In response to your
> > questions:
> >
> > 1) The goal of the KIP is to ensure the accuracy of the offsets that the
> > framework provides to source tasks; if tasks choose to manage offsets
> > outside of the framework, they're on their own. So, the source records
> and
> > their offsets will be written/committed to Kafka, and the task will be
> > provided them on startup, but it (or really, its predecessor) may not
> have
> > had time to do cleanup on resources associated with those records before
> > being killed.
> >
> > 2) I've cleaned up this section and removed the pseudocode as it seems
> too
> > low-level to be worth discussing in a KIP. I'll try to summarize here,
> > though: task.commit() is not what causes offsets provided to the
> framework
> > by tasks to be committed; it's simply a follow-up hook provided out of
> > convenience to tasks so that they can clean up resources associated with
> > the most recent batch of records (by ack'ing JMS messages, for example).
> > The Connect framework uses an internal Kafka topic to store source task
> > offsets.
> >
> > 3) In order to benefit from the improvements proposed in this KIP, yes,
> the
> > single source-of-truth should be the OffsetStorageReader provided to the
> > task by the Connect framework, at least at startup. After startup, tasks
> > should ideally bookkeep their own offset progress as each request to read
> > offsets requires a read to the end of the offsets topic, which can be
> > expensive in some cases.
> >
> > I've since expanded the KIP to include general exactly-once support for
> > source connectors that should cover the points I neglected in my initial
> > design, so it should be ready for review again.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang <ni...@gmail.com>
> > wrote:
> >
> > > Hello Chris,
> > >
> > > That is an interesting KIP. I have a couple of questions:
> > >
> > > (1) in section of pseudo-code, what if the failure happens between 4(b)
> > > and 5(a), meaning after the producer commit the transaction, and before
> > > task.commitRecord().
> > >
> > > (2) in section "source task life time",  what is the difference between
> > > "commit offset" and "offsets to commit"? Given that the offset storage
> > can
> > > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could
> only
> > > produce to a kafka topic, are / is the topic(s) the same ? (the topic
> > that
> > > producer writes offsets to and the topic task.commit() to)
> > >
> > > (3) for JDBC source task, it relies on `context.offsetStorageReader()`
> (
> > >
> >
> https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140
> > )
> > > to retrieve the previously committed offset (if from a fresh start or
> > > resume from failure). so it seems that the single-source-of-truth of
> > where
> > > to consume from last known / committed position stored in offset
> storage
> > > (e.g. kafka topic) managed by the periodic task.commit()?
> > >
> > > On 2020/05/22 06:20:51, Chris Egerton <ch...@confluent.io> wrote:
> > > > Hi all,
> > > >
> > > > I know it's a busy time with the upcoming 2.6 release and I don't
> > expect
> > > > this to get a lot of traction until that's done, but I've published a
> > KIP
> > > > for allowing atomic commit of offsets and records for source
> connectors
> > > and
> > > > would appreciate your feedback:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > > >
> > > > This feature should make it possible to implement source connectors
> > with
> > > > exactly-once delivery guarantees, and even allow a wide range of
> > existing
> > > > source connectors to provide exactly-once delivery guarantees with no
> > > > changes required.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Chris,

If we add the new `fenceProducers` admin API, can we return the information
from the `InitProducerId` response (i.e. producer id and epoch)? We may not
have a use case for it yet, but I don't see any harm exposing it for the
future. For example, we could allow this state to be provided to the
Producer instance on initialization, which would save the need for the
second `InitProducerId` request in the current proposal. Also, the `Void`
type does give us much room for extension.

-Jason


On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton <ch...@confluent.io> wrote:

> Hi Ning,
>
> Apologies for the delay in response. I realized after publishing the KIP
> that there were some finer points I hadn't considered in my design and that
> it was far from providing exactly-once guarantees. In response to your
> questions:
>
> 1) The goal of the KIP is to ensure the accuracy of the offsets that the
> framework provides to source tasks; if tasks choose to manage offsets
> outside of the framework, they're on their own. So, the source records and
> their offsets will be written/committed to Kafka, and the task will be
> provided them on startup, but it (or really, its predecessor) may not have
> had time to do cleanup on resources associated with those records before
> being killed.
>
> 2) I've cleaned up this section and removed the pseudocode as it seems too
> low-level to be worth discussing in a KIP. I'll try to summarize here,
> though: task.commit() is not what causes offsets provided to the framework
> by tasks to be committed; it's simply a follow-up hook provided out of
> convenience to tasks so that they can clean up resources associated with
> the most recent batch of records (by ack'ing JMS messages, for example).
> The Connect framework uses an internal Kafka topic to store source task
> offsets.
>
> 3) In order to benefit from the improvements proposed in this KIP, yes, the
> single source-of-truth should be the OffsetStorageReader provided to the
> task by the Connect framework, at least at startup. After startup, tasks
> should ideally bookkeep their own offset progress as each request to read
> offsets requires a read to the end of the offsets topic, which can be
> expensive in some cases.
>
> I've since expanded the KIP to include general exactly-once support for
> source connectors that should cover the points I neglected in my initial
> design, so it should be ready for review again.
>
> Cheers,
>
> Chris
>
> On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang <ni...@gmail.com>
> wrote:
>
> > Hello Chris,
> >
> > That is an interesting KIP. I have a couple of questions:
> >
> > (1) in section of pseudo-code, what if the failure happens between 4(b)
> > and 5(a), meaning after the producer commit the transaction, and before
> > task.commitRecord().
> >
> > (2) in section "source task life time",  what is the difference between
> > "commit offset" and "offsets to commit"? Given that the offset storage
> can
> > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could only
> > produce to a kafka topic, are / is the topic(s) the same ? (the topic
> that
> > producer writes offsets to and the topic task.commit() to)
> >
> > (3) for JDBC source task, it relies on `context.offsetStorageReader()` (
> >
> https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140
> )
> > to retrieve the previously committed offset (if from a fresh start or
> > resume from failure). so it seems that the single-source-of-truth of
> where
> > to consume from last known / committed position stored in offset storage
> > (e.g. kafka topic) managed by the periodic task.commit()?
> >
> > On 2020/05/22 06:20:51, Chris Egerton <ch...@confluent.io> wrote:
> > > Hi all,
> > >
> > > I know it's a busy time with the upcoming 2.6 release and I don't
> expect
> > > this to get a lot of traction until that's done, but I've published a
> KIP
> > > for allowing atomic commit of offsets and records for source connectors
> > and
> > > would appreciate your feedback:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > >
> > > This feature should make it possible to implement source connectors
> with
> > > exactly-once delivery guarantees, and even allow a wide range of
> existing
> > > source connectors to provide exactly-once delivery guarantees with no
> > > changes required.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> >
>