You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Chris Egerton <ch...@confluent.io.INVALID> on 2021/06/01 15:42:19 UTC

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Hi Gunnar,

Thanks for taking a look! I've addressed the low-hanging fruit in the KIP;
responses to other comments inline here:

> * TransactionContext: What's the use case for the methods accepting a
source record (commitTransaction(SourceRecord
record), abortTransaction(SourceRecord record))?

This allows developers to decouple transaction boundaries from record
batches. If a connector has a configuration that dictates how often it
returns from "SourceTask::poll", for example, it may be easier to define
multiple transactions within a single batch or a single transaction across
several batches than to retrofit the connector's poll logic to work with
transaction boundaries.

> * SourceTaskContext: Instead of guarding against NSME, is there a way for
a
connector to query the KC version and thus derive its capabilities? Going
forward, a generic API for querying capabilities could be nice, so a
connector can query for capabilities of the runtime in a safe and
compatible way.

This would be a great quality-of-life improvement for connector and
framework developers alike, but I think it may be best left for a separate
KIP. The current approach, clunky though it may be, seems like a nuisance
at worst. It's definitely worth addressing but I'm not sure we have the
time to think through all the details thoroughly enough in time for the
upcoming KIP freeze.

> * SourceConnector: Would it make sense to merge the two methods perhaps
and
return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }?

Hmm... at first glance I like the idea of merging the two methods a lot.
The one thing that gives me pause is that there may be connectors that
would like to define their own transaction boundaries without providing
exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
accommodate that, but then, it might actually be simpler to keep the two
methods separate in case we add some third variable to the mix that would
also have to be reflected in the possible ExactlyOnceSupport enum values.

> Or, alternatively return an enum from canDefineTransactionBoundaries(),
too; even if it only has two values now, that'd allow for extension in the
future

This is fine by me; we just have to figure out exactly which enum values
would be suitable. It's a little clunky but right now I'm toying with
something like "ConnectorDefinedTransactionBoundaries" with values of
"SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
need more granularity in the future then we can deprecate one or both of
them and add new values. Thoughts?

> And one general question: in Debezium, we have some connectors that
produce
records "out-of-bands" to a schema history topic via their own custom
producer. Is there any way envisionable where such a producer would
participate in the transaction managed by the KC runtime environment?

To answer the question exactly as asked: no; transactions cannot be shared
across producers and until/unless that is changed (which seems unlikely)
this won't be possible. However, I'm curious why a source connector would
spin up its own producer instead of using "SourceTask::poll" to provide
records to Connect. Is it easier to consume from that topic when the
connector can define its own (de)serialization format? I'm optimistic that
if we understand the use case for the separate producer we may still be
able to help bridge the gap here, one way or another.

> One follow-up question after thinking some more about this; is there any
limit in terms of duration or size of in-flight, connector-controlled
transactions? In case of Debezium for instance, there may be cases where we
tail the TX log from an upstream source database, not knowing whether the
events we receive belong to a committed or aborted transaction. Would it be
valid to emit all these events via a transactional task, and in case we
receive a ROLLBACK event eventually, to abort the pending Kafka
transaction? Such source transactions could be running for a long time
potentially, e.g. hours or days (at least in theory). Or would this sort of
usage not be considered a reasonable one?

I think the distinction between reasonable and unreasonable usage here is
likely dependent on use cases that people are trying to satisfy with their
connector, but if I had to guess, I'd say that a different approach is
probably warranted in most cases if the transaction spans across entire
days at a time. If there's no concern about data not being visible to
downstream consumers until its transaction is committed, and the number of
records in the transaction isn't so large that the amount of memory
required to buffer them all locally on a consumer before delivering them to
the downstream application is reasonable, it would technically be possible
though. Connect users would have to be mindful of the following:

- A separate offsets topic for the connector would be highly recommended in
order to avoid crippling other connectors with hanging transactions
- The producer-level transaction.timeout.ms property (
https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms),
which can be configured in connectors either via the worker-level
producer.transaction.timeout.ms or connector-level
producer.override.transaction.timeout.ms property, would have to be high
enough to allow for transactions that stay open for long periods of time
(the default is 1 minute, so this would almost certainly have to be
adjusted)
- The broker-level transaction.max.timeout.ms property (
https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms)
would have to be at least as high as the transaction timeout necessary for
the task (default is 15 minutes, so this would probably need to be adjusted)
- The broker-level transactional.id.expiration.ms property (
https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms)
would have to be high enough to not automatically expire the task's
producer if there was a long enough period without new records; default is
7 days, so this would probably be fine in most scenarios

Thanks again for taking a look; insight from connector developers is
tremendously valuable here!

Cheers,

Chris

On Thu, May 27, 2021 at 6:35 PM Gunnar Morling
<gu...@googlemail.com.invalid> wrote:

> Chris,
>
> One follow-up question after thinking some more about this; is there any
> limit in terms of duration or size of in-flight, connector-controlled
> transactions? In case of Debezium for instance, there may be cases where we
> tail the TX log from an upstream source database, not knowing whether the
> events we receive belong to a committed or aborted transaction. Would it be
> valid to emit all these events via a transactional task, and in case we
> receive a ROLLBACK event eventually, to abort the pending Kafka
> transaction? Such source transactions could be running for a long time
> potentially, e.g. hours or days (at least in theory). Or would this sort of
> usage not be considered a reasonable one?
>
> Thanks,
>
> --Gunnar
>
>
> Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
> gunnar.morling@googlemail.com>:
>
> > Chris, all,
> >
> > I've just read KIP-618, and let me congratulate you first of all for this
> > impressive piece of work! Here's a few small suggestions and questions I
> > had while reading:
> >
> > * TransactionContext: What's the use case for the methods accepting a
> > source record (commitTransaction(SourceRecord
> > record), abortTransaction(SourceRecord record))?
> > * SourceTaskContext: Typo in "when the sink connector is deployed" ->
> > source task
> > * SourceTaskContext: Instead of guarding against NSME, is there a way for
> > a connector to query the KC version and thus derive its capabilities?
> Going
> > forward, a generic API for querying capabilities could be nice, so a
> > connector can query for capabilities of the runtime in a safe and
> > compatible way.
> > * SourceConnector: exactlyOnceSupport() -> false return value doesn't
> match
> > * SourceConnector: Would it make sense to merge the two methods perhaps
> > and return one enum of { SUPPORTED, NOT_SUPPORTED,
> > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> > from canDefineTransactionBoundaries(), too; even if it only has two
> values
> > now, that'd allow for extension in the future
> >
> > And one general question: in Debezium, we have some connectors that
> > produce records "out-of-bands" to a schema history topic via their own
> > custom producer. Is there any way envisionable where such a producer
> would
> > participate in the transaction managed by the KC runtime environment?
> >
> > Thanks a lot,
> >
> > --Gunnar
> >
> >
> > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> > <ch...@confluent.io.invalid>:
> >
> >> Hi all,
> >>
> >> Wanted to note here that I've updated the KIP document to include the
> >> changes discussed recently. They're mostly located in the "Public
> >> Interfaces" section. I suspect discussion hasn't concluded yet and there
> >> will probably be a few more changes to come, but wanted to take the
> >> opportunity to provide a snapshot of what the current design looks like.
> >>
> >> Cheers,
> >>
> >> Chris
> >>
> >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton <ch...@confluent.io>
> >> wrote:
> >>
> >> > Hi Tom,
> >> >
> >> > Wow, I was way off base! I was thinking that the intent of the
> fencible
> >> > producer was to employ it by default with 3.0, as opposed to only
> after
> >> the
> >> > worker-level
> >> > "exactly.once.source.enabled" property was flipped on. You are correct
> >> > that with the case you were actually describing, there would be no
> >> > heightened ACL requirements, and that it would leave room in the
> future
> >> for
> >> > exactly-once to be disabled on a per-connector basis (as long as all
> the
> >> > workers in the cluster already had "exactly.once.source.enabled" set
> to
> >> > "true") with no worries about breaking changes.
> >> >
> >> > I agree that this is something for another KIP; even if we could
> squeeze
> >> > it in in time for this release, it might be a bit much for new users
> to
> >> > take in all at once. But I can add it to the doc as "future work"
> since
> >> > it's a promising idea that could prove valuable to someone who might
> >> need
> >> > per-connector granularity in the future.
> >> >
> >> > Thanks for clearing things up; in retrospect your comments make a lot
> >> more
> >> > sense now, and I hope I've sufficiently addressed them by now.
> >> >
> >> > PSA for you and everyone else--I plan on updating the doc next week
> with
> >> > the new APIs for connector-defined transaction boundaries,
> >> > user-configurable transaction boundaries (i.e., poll vs. interval vs.
> >> > connectors), and preflight checks for exactly-once validation
> (required
> >> vs.
> >> > requested).
> >> >
> >> > Cheers,
> >> >
> >> > Chris
> >> >
> >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley <tb...@redhat.com>
> >> wrote:
> >> >
> >> >> Hi Chris,
> >> >>
> >> >> Thanks for continuing to entertain some of these ideas.
> >> >>
> >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
> >> <chrise@confluent.io.invalid
> >> >> >
> >> >> wrote:
> >> >>
> >> >> > [...]
> >> >> >
> >> >> That's true, but we do go from three static ACLs (write/describe on a
> >> >> fixed
> >> >> > transactional ID, and idempotent write on a fixed cluster) to a
> >> dynamic
> >> >> > collection of ACLs.
> >> >> >
> >> >>
> >> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I
> was
> >> >> suggesting the use of a 'fencing producer' only in clusters with
> >> >> exactly.once.source.enabled=true where I imagined the key difference
> >> >> between the exactly once and fencing cases was how the producer was
> >> >> configured/used (transactional vs this new fencing semantic). I think
> >> the
> >> >> ACL requirements for connector producer principals would therefore be
> >> the
> >> >> same as currently described in the KIP. The same is true for the
> worker
> >> >> principals (which is the only breaking change you give in the KIP).
> So
> >> I
> >> >> don't think the fencing idea changes the backwards compatibility
> story
> >> >> that's already in the KIP, just allows a safe per-connector
> >> >> exactly.once=disabled option to be supported (with required as
> >> requested
> >> >> as
> >> >> we already discussed).
> >> >>
> >> >> But I'm wondering whether I've overlooked something.
> >> >>
> >> >> Ultimately I think it may behoove us to err on the side of reducing
> the
> >> >> > breaking changes here for now and saving them for 4.0 (or some
> later
> >> >> major
> >> >> > release), but would be interested in thoughts from you and others.
> >> >> >
> >> >>
> >> >> Difficult to answer (given I think I might be missing something).
> >> >> If there are breaking changes then I don't disagree. It's difficult
> to
> >> >> reason about big changes like this without some practical experience.
> >> >> If there are not, then I think we could also implement the whole
> >> >> exactly.once=disabled thing in a later KIP without additional
> breaking
> >> >> changes (i.e. some time in 3.x), right?
> >> >>
> >> >>
> >> >> > > Gouzhang also has a (possible) use case for a fencing-only
> >> producer (
> >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as he
> points
> >> >> out
> >> >> > there, you should be able to get these semantics today by calling
> >> >> > initTransactions() and then just using the producer as normal (no
> >> >> > beginTransaction()/abortTransaction()/endTransaction()).
> >> >> >
> >> >> > I tested this locally and was not met with success; transactional
> >> >> producers
> >> >> > do a check right now to ensure that any calls to
> >> "KafkaProducer::send"
> >> >> > occur within a transaction (see
> >> >> >
> >> >> >
> >> >>
> >>
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> >> >> > and
> >> >> >
> >> >> >
> >> >>
> >>
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> >> >> > ).
> >> >> > Not a blocker, just noting that we'd have to do some legwork to
> make
> >> >> this
> >> >> > workable with the producer API.
> >> >> >
> >> >>
> >> >> Ah, sorry, I should have actually tried it rather than just taking a
> >> quick
> >> >> look at the code.
> >> >>
> >> >> Rather than remove those safety checks I suppose we'd need a way of
> >> >> distinguishing, in the config, the difference in semantics. E.g.
> >> Something
> >> >> like a fencing.id config, which was mutually exclusive with
> >> >> transactional.id.
> >> >> Likewise perhaps initFencing() alongside initTransactions() in the
> API.
> >> >> But
> >> >> I think at this point it's something for another KIP.
> >> >>
> >> >> Kind regards,
> >> >>
> >> >> Tom
> >> >>
> >> >
> >>
> >
>

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Posted by Chris Egerton <ch...@confluent.io.INVALID>.
Hi Randall,

That's a fair assessment; if a user upgrades their cluster to 3.0 with no
changes to worker or connector configs, it's possible that their cluster
will break if their worker principal(s) lack the necessary ACLs on the
Kafka cluster that hosts the config topic.

If we wanted to take a more conservative approach, we could allow users to
opt in to the use of a transactional producer by their cluster's leader
through some worker configuration property. The rolling upgrade process
from some pre-3.0 cluster to a 3.0+ cluster with exactly-once source
support enabled would become:

1. Upgrade cluster to 3.0 (or a later version, if one is available)
2. Enable the use of a transactional producer by the cluster's leader
3. Enable exactly-once source support

Since steps 1 and 2 could take place within the same rolling upgrade, the
number of rolling upgrades for this new approach would be the same as the
current approach: two. The only downside would be additional configuration
complexity for the worker, and the upgrade process itself would be a little
trickier for users (and potentially more error-prone).

In order to reduce the added configuration complexity as much as possible,
we could expose this intermediate state (workers are on 3.0 and the leader
uses a transactional producer, but exactly-once source support is not
enabled) by renaming the "exactly.once.source.enabled" property to
"exactly.once.source.support", and permitting values of "disabled"
(default), "preparing", and "enabled". The "preparing" and "enabled" values
would provide the same behavior as the current proposal with
"exactly.once.source.enabled" set to "false" and "true", respectively, and
"disabled" would have the same behavior as the current proposal, except
without the use of a transactional producer by the leader.

I'll update the proposal with this new behavior shortly. Thanks for the
review!

Cheers,

Chris

On Wed, Jun 9, 2021 at 1:02 PM Randall Hauch <rh...@gmail.com> wrote:

> Chris,
>
> Sorry for the late question/comment. But the "Breaking Changes" concerns
> me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when
> they restart their 3.0 worker(s) the workers will fail due to this producer
> requirement even if they make no changes to their worker configs or
> connector configs. Is this correct?
>
> If so, I'm concerned about this. Even though the additional producer ACLs
> are seemingly minor and easy to change, it is likely that users will not
> read the docs before they upgrade, causing their simple upgrade to fail.
> And even though in 3.0 we could allow ourselves to cause breaking changes
> with a major release, I personally would prefer we not have any such
> breaking changes.
>
> Given that, what would be required for us to eliminate that breaking
> change, or change it from a breaking change to a prerequisite for enabling
> EOS support in their cluster?
>
> Thanks,
>
> Randall
>
> On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton <ch...@confluent.io.invalid>
> wrote:
>
> > Hi Tom,
> >
> > I do agree that it'd be safer to default to "required", but since at the
> > time of the 3.0 release no existing connectors will have implemented the
> > "SourceConnector::exactlyOnceSupport" method, it'd require all Connect
> > users to downgrade to "requested" anyways in order to enable exactly-once
> > support on their workers. The friction there seems a little excessive; we
> > might consider changing the default from "requested" to "required" later
> on
> > down the line after connector developers have had enough time to put out
> > new connector versions that implement the new API. Thoughts?
> >
> > Cheers,
> >
> > Chris
> >
> > On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley <tb...@redhat.com> wrote:
> >
> > > Hi Chris,
> > >
> > > Just a minor question: I can see why the default for
> exactly.once.support
> > > is requested (you want a good first-run experience, I assume), but
> it's a
> > > little like engineering a safety catch and then not enabling it.
> Wouldn't
> > > it be safer to default to required, so that there's no way someone can
> > > mistakenly not get EoS without explicitly having configured it?
> > >
> > > Thanks,
> > >
> > > Tom
> > >
> > > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton
> <chrise@confluent.io.invalid
> > >
> > > wrote:
> > >
> > > > Hi Gunnar,
> > > >
> > > > Thanks for taking a look! I've addressed the low-hanging fruit in the
> > > KIP;
> > > > responses to other comments inline here:
> > > >
> > > > > * TransactionContext: What's the use case for the methods
> accepting a
> > > > source record (commitTransaction(SourceRecord
> > > > record), abortTransaction(SourceRecord record))?
> > > >
> > > > This allows developers to decouple transaction boundaries from record
> > > > batches. If a connector has a configuration that dictates how often
> it
> > > > returns from "SourceTask::poll", for example, it may be easier to
> > define
> > > > multiple transactions within a single batch or a single transaction
> > > across
> > > > several batches than to retrofit the connector's poll logic to work
> > with
> > > > transaction boundaries.
> > > >
> > > > > * SourceTaskContext: Instead of guarding against NSME, is there a
> way
> > > for
> > > > a
> > > > connector to query the KC version and thus derive its capabilities?
> > Going
> > > > forward, a generic API for querying capabilities could be nice, so a
> > > > connector can query for capabilities of the runtime in a safe and
> > > > compatible way.
> > > >
> > > > This would be a great quality-of-life improvement for connector and
> > > > framework developers alike, but I think it may be best left for a
> > > separate
> > > > KIP. The current approach, clunky though it may be, seems like a
> > nuisance
> > > > at worst. It's definitely worth addressing but I'm not sure we have
> the
> > > > time to think through all the details thoroughly enough in time for
> the
> > > > upcoming KIP freeze.
> > > >
> > > > > * SourceConnector: Would it make sense to merge the two methods
> > perhaps
> > > > and
> > > > return one enum of { SUPPORTED, NOT_SUPPORTED,
> > SUPPORTED_WITH_BOUNDARIES
> > > }?
> > > >
> > > > Hmm... at first glance I like the idea of merging the two methods a
> > lot.
> > > > The one thing that gives me pause is that there may be connectors
> that
> > > > would like to define their own transaction boundaries without
> providing
> > > > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> > > > accommodate that, but then, it might actually be simpler to keep the
> > two
> > > > methods separate in case we add some third variable to the mix that
> > would
> > > > also have to be reflected in the possible ExactlyOnceSupport enum
> > values.
> > > >
> > > > > Or, alternatively return an enum from
> > canDefineTransactionBoundaries(),
> > > > too; even if it only has two values now, that'd allow for extension
> in
> > > the
> > > > future
> > > >
> > > > This is fine by me; we just have to figure out exactly which enum
> > values
> > > > would be suitable. It's a little clunky but right now I'm toying with
> > > > something like "ConnectorDefinedTransactionBoundaries" with values of
> > > > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If
> we
> > > > need more granularity in the future then we can deprecate one or both
> > of
> > > > them and add new values. Thoughts?
> > > >
> > > > > And one general question: in Debezium, we have some connectors that
> > > > produce
> > > > records "out-of-bands" to a schema history topic via their own custom
> > > > producer. Is there any way envisionable where such a producer would
> > > > participate in the transaction managed by the KC runtime environment?
> > > >
> > > > To answer the question exactly as asked: no; transactions cannot be
> > > shared
> > > > across producers and until/unless that is changed (which seems
> > unlikely)
> > > > this won't be possible. However, I'm curious why a source connector
> > would
> > > > spin up its own producer instead of using "SourceTask::poll" to
> provide
> > > > records to Connect. Is it easier to consume from that topic when the
> > > > connector can define its own (de)serialization format? I'm optimistic
> > > that
> > > > if we understand the use case for the separate producer we may still
> be
> > > > able to help bridge the gap here, one way or another.
> > > >
> > > > > One follow-up question after thinking some more about this; is
> there
> > > any
> > > > limit in terms of duration or size of in-flight, connector-controlled
> > > > transactions? In case of Debezium for instance, there may be cases
> > where
> > > we
> > > > tail the TX log from an upstream source database, not knowing whether
> > the
> > > > events we receive belong to a committed or aborted transaction. Would
> > it
> > > be
> > > > valid to emit all these events via a transactional task, and in case
> we
> > > > receive a ROLLBACK event eventually, to abort the pending Kafka
> > > > transaction? Such source transactions could be running for a long
> time
> > > > potentially, e.g. hours or days (at least in theory). Or would this
> > sort
> > > of
> > > > usage not be considered a reasonable one?
> > > >
> > > > I think the distinction between reasonable and unreasonable usage
> here
> > is
> > > > likely dependent on use cases that people are trying to satisfy with
> > > their
> > > > connector, but if I had to guess, I'd say that a different approach
> is
> > > > probably warranted in most cases if the transaction spans across
> entire
> > > > days at a time. If there's no concern about data not being visible to
> > > > downstream consumers until its transaction is committed, and the
> number
> > > of
> > > > records in the transaction isn't so large that the amount of memory
> > > > required to buffer them all locally on a consumer before delivering
> > them
> > > to
> > > > the downstream application is reasonable, it would technically be
> > > possible
> > > > though. Connect users would have to be mindful of the following:
> > > >
> > > > - A separate offsets topic for the connector would be highly
> > recommended
> > > in
> > > > order to avoid crippling other connectors with hanging transactions
> > > > - The producer-level transaction.timeout.ms property (
> > > >
> > > >
> > >
> >
> https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms
> > > > ),
> > > > which can be configured in connectors either via the worker-level
> > > > producer.transaction.timeout.ms or connector-level
> > > > producer.override.transaction.timeout.ms property, would have to be
> > high
> > > > enough to allow for transactions that stay open for long periods of
> > time
> > > > (the default is 1 minute, so this would almost certainly have to be
> > > > adjusted)
> > > > - The broker-level transaction.max.timeout.ms property (
> > > >
> > > >
> > >
> >
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms
> > > > )
> > > > would have to be at least as high as the transaction timeout
> necessary
> > > for
> > > > the task (default is 15 minutes, so this would probably need to be
> > > > adjusted)
> > > > - The broker-level transactional.id.expiration.ms property (
> > > >
> > > >
> > >
> >
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms
> > > > )
> > > > would have to be high enough to not automatically expire the task's
> > > > producer if there was a long enough period without new records;
> default
> > > is
> > > > 7 days, so this would probably be fine in most scenarios
> > > >
> > > > Thanks again for taking a look; insight from connector developers is
> > > > tremendously valuable here!
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Thu, May 27, 2021 at 6:35 PM Gunnar Morling
> > > > <gu...@googlemail.com.invalid> wrote:
> > > >
> > > > > Chris,
> > > > >
> > > > > One follow-up question after thinking some more about this; is
> there
> > > any
> > > > > limit in terms of duration or size of in-flight,
> connector-controlled
> > > > > transactions? In case of Debezium for instance, there may be cases
> > > where
> > > > we
> > > > > tail the TX log from an upstream source database, not knowing
> whether
> > > the
> > > > > events we receive belong to a committed or aborted transaction.
> Would
> > > it
> > > > be
> > > > > valid to emit all these events via a transactional task, and in
> case
> > we
> > > > > receive a ROLLBACK event eventually, to abort the pending Kafka
> > > > > transaction? Such source transactions could be running for a long
> > time
> > > > > potentially, e.g. hours or days (at least in theory). Or would this
> > > sort
> > > > of
> > > > > usage not be considered a reasonable one?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > --Gunnar
> > > > >
> > > > >
> > > > > Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
> > > > > gunnar.morling@googlemail.com>:
> > > > >
> > > > > > Chris, all,
> > > > > >
> > > > > > I've just read KIP-618, and let me congratulate you first of all
> > for
> > > > this
> > > > > > impressive piece of work! Here's a few small suggestions and
> > > questions
> > > > I
> > > > > > had while reading:
> > > > > >
> > > > > > * TransactionContext: What's the use case for the methods
> > accepting a
> > > > > > source record (commitTransaction(SourceRecord
> > > > > > record), abortTransaction(SourceRecord record))?
> > > > > > * SourceTaskContext: Typo in "when the sink connector is
> deployed"
> > ->
> > > > > > source task
> > > > > > * SourceTaskContext: Instead of guarding against NSME, is there a
> > way
> > > > for
> > > > > > a connector to query the KC version and thus derive its
> > capabilities?
> > > > > Going
> > > > > > forward, a generic API for querying capabilities could be nice,
> so
> > a
> > > > > > connector can query for capabilities of the runtime in a safe and
> > > > > > compatible way.
> > > > > > * SourceConnector: exactlyOnceSupport() -> false return value
> > doesn't
> > > > > match
> > > > > > * SourceConnector: Would it make sense to merge the two methods
> > > perhaps
> > > > > > and return one enum of { SUPPORTED, NOT_SUPPORTED,
> > > > > > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> > > > > > from canDefineTransactionBoundaries(), too; even if it only has
> two
> > > > > values
> > > > > > now, that'd allow for extension in the future
> > > > > >
> > > > > > And one general question: in Debezium, we have some connectors
> that
> > > > > > produce records "out-of-bands" to a schema history topic via
> their
> > > own
> > > > > > custom producer. Is there any way envisionable where such a
> > producer
> > > > > would
> > > > > > participate in the transaction managed by the KC runtime
> > environment?
> > > > > >
> > > > > > Thanks a lot,
> > > > > >
> > > > > > --Gunnar
> > > > > >
> > > > > >
> > > > > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> > > > > > <ch...@confluent.io.invalid>:
> > > > > >
> > > > > >> Hi all,
> > > > > >>
> > > > > >> Wanted to note here that I've updated the KIP document to
> include
> > > the
> > > > > >> changes discussed recently. They're mostly located in the
> "Public
> > > > > >> Interfaces" section. I suspect discussion hasn't concluded yet
> and
> > > > there
> > > > > >> will probably be a few more changes to come, but wanted to take
> > the
> > > > > >> opportunity to provide a snapshot of what the current design
> looks
> > > > like.
> > > > > >>
> > > > > >> Cheers,
> > > > > >>
> > > > > >> Chris
> > > > > >>
> > > > > >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton <
> > chrise@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Tom,
> > > > > >> >
> > > > > >> > Wow, I was way off base! I was thinking that the intent of the
> > > > > fencible
> > > > > >> > producer was to employ it by default with 3.0, as opposed to
> > only
> > > > > after
> > > > > >> the
> > > > > >> > worker-level
> > > > > >> > "exactly.once.source.enabled" property was flipped on. You are
> > > > correct
> > > > > >> > that with the case you were actually describing, there would
> be
> > no
> > > > > >> > heightened ACL requirements, and that it would leave room in
> the
> > > > > future
> > > > > >> for
> > > > > >> > exactly-once to be disabled on a per-connector basis (as long
> as
> > > all
> > > > > the
> > > > > >> > workers in the cluster already had
> "exactly.once.source.enabled"
> > > set
> > > > > to
> > > > > >> > "true") with no worries about breaking changes.
> > > > > >> >
> > > > > >> > I agree that this is something for another KIP; even if we
> could
> > > > > squeeze
> > > > > >> > it in in time for this release, it might be a bit much for new
> > > users
> > > > > to
> > > > > >> > take in all at once. But I can add it to the doc as "future
> > work"
> > > > > since
> > > > > >> > it's a promising idea that could prove valuable to someone who
> > > might
> > > > > >> need
> > > > > >> > per-connector granularity in the future.
> > > > > >> >
> > > > > >> > Thanks for clearing things up; in retrospect your comments
> make
> > a
> > > > lot
> > > > > >> more
> > > > > >> > sense now, and I hope I've sufficiently addressed them by now.
> > > > > >> >
> > > > > >> > PSA for you and everyone else--I plan on updating the doc next
> > > week
> > > > > with
> > > > > >> > the new APIs for connector-defined transaction boundaries,
> > > > > >> > user-configurable transaction boundaries (i.e., poll vs.
> > interval
> > > > vs.
> > > > > >> > connectors), and preflight checks for exactly-once validation
> > > > > (required
> > > > > >> vs.
> > > > > >> > requested).
> > > > > >> >
> > > > > >> > Cheers,
> > > > > >> >
> > > > > >> > Chris
> > > > > >> >
> > > > > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley <
> > tbentley@redhat.com>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Hi Chris,
> > > > > >> >>
> > > > > >> >> Thanks for continuing to entertain some of these ideas.
> > > > > >> >>
> > > > > >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
> > > > > >> <chrise@confluent.io.invalid
> > > > > >> >> >
> > > > > >> >> wrote:
> > > > > >> >>
> > > > > >> >> > [...]
> > > > > >> >> >
> > > > > >> >> That's true, but we do go from three static ACLs
> > (write/describe
> > > > on a
> > > > > >> >> fixed
> > > > > >> >> > transactional ID, and idempotent write on a fixed cluster)
> > to a
> > > > > >> dynamic
> > > > > >> >> > collection of ACLs.
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >> I'm not quite sure I follow, maybe I've lost track. To be
> > clear,
> > > I
> > > > > was
> > > > > >> >> suggesting the use of a 'fencing producer' only in clusters
> > with
> > > > > >> >> exactly.once.source.enabled=true where I imagined the key
> > > > difference
> > > > > >> >> between the exactly once and fencing cases was how the
> producer
> > > was
> > > > > >> >> configured/used (transactional vs this new fencing
> semantic). I
> > > > think
> > > > > >> the
> > > > > >> >> ACL requirements for connector producer principals would
> > > therefore
> > > > be
> > > > > >> the
> > > > > >> >> same as currently described in the KIP. The same is true for
> > the
> > > > > worker
> > > > > >> >> principals (which is the only breaking change you give in the
> > > KIP).
> > > > > So
> > > > > >> I
> > > > > >> >> don't think the fencing idea changes the backwards
> > compatibility
> > > > > story
> > > > > >> >> that's already in the KIP, just allows a safe per-connector
> > > > > >> >> exactly.once=disabled option to be supported (with required
> as
> > > > > >> requested
> > > > > >> >> as
> > > > > >> >> we already discussed).
> > > > > >> >>
> > > > > >> >> But I'm wondering whether I've overlooked something.
> > > > > >> >>
> > > > > >> >> Ultimately I think it may behoove us to err on the side of
> > > reducing
> > > > > the
> > > > > >> >> > breaking changes here for now and saving them for 4.0 (or
> > some
> > > > > later
> > > > > >> >> major
> > > > > >> >> > release), but would be interested in thoughts from you and
> > > > others.
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >> Difficult to answer (given I think I might be missing
> > something).
> > > > > >> >> If there are breaking changes then I don't disagree. It's
> > > difficult
> > > > > to
> > > > > >> >> reason about big changes like this without some practical
> > > > experience.
> > > > > >> >> If there are not, then I think we could also implement the
> > whole
> > > > > >> >> exactly.once=disabled thing in a later KIP without additional
> > > > > breaking
> > > > > >> >> changes (i.e. some time in 3.x), right?
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> > > Gouzhang also has a (possible) use case for a
> fencing-only
> > > > > >> producer (
> > > > > >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as
> > he
> > > > > points
> > > > > >> >> out
> > > > > >> >> > there, you should be able to get these semantics today by
> > > calling
> > > > > >> >> > initTransactions() and then just using the producer as
> normal
> > > (no
> > > > > >> >> > beginTransaction()/abortTransaction()/endTransaction()).
> > > > > >> >> >
> > > > > >> >> > I tested this locally and was not met with success;
> > > transactional
> > > > > >> >> producers
> > > > > >> >> > do a check right now to ensure that any calls to
> > > > > >> "KafkaProducer::send"
> > > > > >> >> > occur within a transaction (see
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> > > > > >> >> > and
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> > > > > >> >> > ).
> > > > > >> >> > Not a blocker, just noting that we'd have to do some
> legwork
> > to
> > > > > make
> > > > > >> >> this
> > > > > >> >> > workable with the producer API.
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >> Ah, sorry, I should have actually tried it rather than just
> > > taking
> > > > a
> > > > > >> quick
> > > > > >> >> look at the code.
> > > > > >> >>
> > > > > >> >> Rather than remove those safety checks I suppose we'd need a
> > way
> > > of
> > > > > >> >> distinguishing, in the config, the difference in semantics.
> > E.g.
> > > > > >> Something
> > > > > >> >> like a fencing.id config, which was mutually exclusive with
> > > > > >> >> transactional.id.
> > > > > >> >> Likewise perhaps initFencing() alongside initTransactions()
> in
> > > the
> > > > > API.
> > > > > >> >> But
> > > > > >> >> I think at this point it's something for another KIP.
> > > > > >> >>
> > > > > >> >> Kind regards,
> > > > > >> >>
> > > > > >> >> Tom
> > > > > >> >>
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Posted by Randall Hauch <rh...@gmail.com>.
Chris,

Sorry for the late question/comment. But the "Breaking Changes" concerns
me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when
they restart their 3.0 worker(s) the workers will fail due to this producer
requirement even if they make no changes to their worker configs or
connector configs. Is this correct?

If so, I'm concerned about this. Even though the additional producer ACLs
are seemingly minor and easy to change, it is likely that users will not
read the docs before they upgrade, causing their simple upgrade to fail.
And even though in 3.0 we could allow ourselves to cause breaking changes
with a major release, I personally would prefer we not have any such
breaking changes.

Given that, what would be required for us to eliminate that breaking
change, or change it from a breaking change to a prerequisite for enabling
EOS support in their cluster?

Thanks,

Randall

On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton <ch...@confluent.io.invalid>
wrote:

> Hi Tom,
>
> I do agree that it'd be safer to default to "required", but since at the
> time of the 3.0 release no existing connectors will have implemented the
> "SourceConnector::exactlyOnceSupport" method, it'd require all Connect
> users to downgrade to "requested" anyways in order to enable exactly-once
> support on their workers. The friction there seems a little excessive; we
> might consider changing the default from "requested" to "required" later on
> down the line after connector developers have had enough time to put out
> new connector versions that implement the new API. Thoughts?
>
> Cheers,
>
> Chris
>
> On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley <tb...@redhat.com> wrote:
>
> > Hi Chris,
> >
> > Just a minor question: I can see why the default for exactly.once.support
> > is requested (you want a good first-run experience, I assume), but it's a
> > little like engineering a safety catch and then not enabling it. Wouldn't
> > it be safer to default to required, so that there's no way someone can
> > mistakenly not get EoS without explicitly having configured it?
> >
> > Thanks,
> >
> > Tom
> >
> > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton <chrise@confluent.io.invalid
> >
> > wrote:
> >
> > > Hi Gunnar,
> > >
> > > Thanks for taking a look! I've addressed the low-hanging fruit in the
> > KIP;
> > > responses to other comments inline here:
> > >
> > > > * TransactionContext: What's the use case for the methods accepting a
> > > source record (commitTransaction(SourceRecord
> > > record), abortTransaction(SourceRecord record))?
> > >
> > > This allows developers to decouple transaction boundaries from record
> > > batches. If a connector has a configuration that dictates how often it
> > > returns from "SourceTask::poll", for example, it may be easier to
> define
> > > multiple transactions within a single batch or a single transaction
> > across
> > > several batches than to retrofit the connector's poll logic to work
> with
> > > transaction boundaries.
> > >
> > > > * SourceTaskContext: Instead of guarding against NSME, is there a way
> > for
> > > a
> > > connector to query the KC version and thus derive its capabilities?
> Going
> > > forward, a generic API for querying capabilities could be nice, so a
> > > connector can query for capabilities of the runtime in a safe and
> > > compatible way.
> > >
> > > This would be a great quality-of-life improvement for connector and
> > > framework developers alike, but I think it may be best left for a
> > separate
> > > KIP. The current approach, clunky though it may be, seems like a
> nuisance
> > > at worst. It's definitely worth addressing but I'm not sure we have the
> > > time to think through all the details thoroughly enough in time for the
> > > upcoming KIP freeze.
> > >
> > > > * SourceConnector: Would it make sense to merge the two methods
> perhaps
> > > and
> > > return one enum of { SUPPORTED, NOT_SUPPORTED,
> SUPPORTED_WITH_BOUNDARIES
> > }?
> > >
> > > Hmm... at first glance I like the idea of merging the two methods a
> lot.
> > > The one thing that gives me pause is that there may be connectors that
> > > would like to define their own transaction boundaries without providing
> > > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> > > accommodate that, but then, it might actually be simpler to keep the
> two
> > > methods separate in case we add some third variable to the mix that
> would
> > > also have to be reflected in the possible ExactlyOnceSupport enum
> values.
> > >
> > > > Or, alternatively return an enum from
> canDefineTransactionBoundaries(),
> > > too; even if it only has two values now, that'd allow for extension in
> > the
> > > future
> > >
> > > This is fine by me; we just have to figure out exactly which enum
> values
> > > would be suitable. It's a little clunky but right now I'm toying with
> > > something like "ConnectorDefinedTransactionBoundaries" with values of
> > > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
> > > need more granularity in the future then we can deprecate one or both
> of
> > > them and add new values. Thoughts?
> > >
> > > > And one general question: in Debezium, we have some connectors that
> > > produce
> > > records "out-of-bands" to a schema history topic via their own custom
> > > producer. Is there any way envisionable where such a producer would
> > > participate in the transaction managed by the KC runtime environment?
> > >
> > > To answer the question exactly as asked: no; transactions cannot be
> > shared
> > > across producers and until/unless that is changed (which seems
> unlikely)
> > > this won't be possible. However, I'm curious why a source connector
> would
> > > spin up its own producer instead of using "SourceTask::poll" to provide
> > > records to Connect. Is it easier to consume from that topic when the
> > > connector can define its own (de)serialization format? I'm optimistic
> > that
> > > if we understand the use case for the separate producer we may still be
> > > able to help bridge the gap here, one way or another.
> > >
> > > > One follow-up question after thinking some more about this; is there
> > any
> > > limit in terms of duration or size of in-flight, connector-controlled
> > > transactions? In case of Debezium for instance, there may be cases
> where
> > we
> > > tail the TX log from an upstream source database, not knowing whether
> the
> > > events we receive belong to a committed or aborted transaction. Would
> it
> > be
> > > valid to emit all these events via a transactional task, and in case we
> > > receive a ROLLBACK event eventually, to abort the pending Kafka
> > > transaction? Such source transactions could be running for a long time
> > > potentially, e.g. hours or days (at least in theory). Or would this
> sort
> > of
> > > usage not be considered a reasonable one?
> > >
> > > I think the distinction between reasonable and unreasonable usage here
> is
> > > likely dependent on use cases that people are trying to satisfy with
> > their
> > > connector, but if I had to guess, I'd say that a different approach is
> > > probably warranted in most cases if the transaction spans across entire
> > > days at a time. If there's no concern about data not being visible to
> > > downstream consumers until its transaction is committed, and the number
> > of
> > > records in the transaction isn't so large that the amount of memory
> > > required to buffer them all locally on a consumer before delivering
> them
> > to
> > > the downstream application is reasonable, it would technically be
> > possible
> > > though. Connect users would have to be mindful of the following:
> > >
> > > - A separate offsets topic for the connector would be highly
> recommended
> > in
> > > order to avoid crippling other connectors with hanging transactions
> > > - The producer-level transaction.timeout.ms property (
> > >
> > >
> >
> https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms
> > > ),
> > > which can be configured in connectors either via the worker-level
> > > producer.transaction.timeout.ms or connector-level
> > > producer.override.transaction.timeout.ms property, would have to be
> high
> > > enough to allow for transactions that stay open for long periods of
> time
> > > (the default is 1 minute, so this would almost certainly have to be
> > > adjusted)
> > > - The broker-level transaction.max.timeout.ms property (
> > >
> > >
> >
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms
> > > )
> > > would have to be at least as high as the transaction timeout necessary
> > for
> > > the task (default is 15 minutes, so this would probably need to be
> > > adjusted)
> > > - The broker-level transactional.id.expiration.ms property (
> > >
> > >
> >
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms
> > > )
> > > would have to be high enough to not automatically expire the task's
> > > producer if there was a long enough period without new records; default
> > is
> > > 7 days, so this would probably be fine in most scenarios
> > >
> > > Thanks again for taking a look; insight from connector developers is
> > > tremendously valuable here!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Thu, May 27, 2021 at 6:35 PM Gunnar Morling
> > > <gu...@googlemail.com.invalid> wrote:
> > >
> > > > Chris,
> > > >
> > > > One follow-up question after thinking some more about this; is there
> > any
> > > > limit in terms of duration or size of in-flight, connector-controlled
> > > > transactions? In case of Debezium for instance, there may be cases
> > where
> > > we
> > > > tail the TX log from an upstream source database, not knowing whether
> > the
> > > > events we receive belong to a committed or aborted transaction. Would
> > it
> > > be
> > > > valid to emit all these events via a transactional task, and in case
> we
> > > > receive a ROLLBACK event eventually, to abort the pending Kafka
> > > > transaction? Such source transactions could be running for a long
> time
> > > > potentially, e.g. hours or days (at least in theory). Or would this
> > sort
> > > of
> > > > usage not be considered a reasonable one?
> > > >
> > > > Thanks,
> > > >
> > > > --Gunnar
> > > >
> > > >
> > > > Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
> > > > gunnar.morling@googlemail.com>:
> > > >
> > > > > Chris, all,
> > > > >
> > > > > I've just read KIP-618, and let me congratulate you first of all
> for
> > > this
> > > > > impressive piece of work! Here's a few small suggestions and
> > questions
> > > I
> > > > > had while reading:
> > > > >
> > > > > * TransactionContext: What's the use case for the methods
> accepting a
> > > > > source record (commitTransaction(SourceRecord
> > > > > record), abortTransaction(SourceRecord record))?
> > > > > * SourceTaskContext: Typo in "when the sink connector is deployed"
> ->
> > > > > source task
> > > > > * SourceTaskContext: Instead of guarding against NSME, is there a
> way
> > > for
> > > > > a connector to query the KC version and thus derive its
> capabilities?
> > > > Going
> > > > > forward, a generic API for querying capabilities could be nice, so
> a
> > > > > connector can query for capabilities of the runtime in a safe and
> > > > > compatible way.
> > > > > * SourceConnector: exactlyOnceSupport() -> false return value
> doesn't
> > > > match
> > > > > * SourceConnector: Would it make sense to merge the two methods
> > perhaps
> > > > > and return one enum of { SUPPORTED, NOT_SUPPORTED,
> > > > > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> > > > > from canDefineTransactionBoundaries(), too; even if it only has two
> > > > values
> > > > > now, that'd allow for extension in the future
> > > > >
> > > > > And one general question: in Debezium, we have some connectors that
> > > > > produce records "out-of-bands" to a schema history topic via their
> > own
> > > > > custom producer. Is there any way envisionable where such a
> producer
> > > > would
> > > > > participate in the transaction managed by the KC runtime
> environment?
> > > > >
> > > > > Thanks a lot,
> > > > >
> > > > > --Gunnar
> > > > >
> > > > >
> > > > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> > > > > <ch...@confluent.io.invalid>:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Wanted to note here that I've updated the KIP document to include
> > the
> > > > >> changes discussed recently. They're mostly located in the "Public
> > > > >> Interfaces" section. I suspect discussion hasn't concluded yet and
> > > there
> > > > >> will probably be a few more changes to come, but wanted to take
> the
> > > > >> opportunity to provide a snapshot of what the current design looks
> > > like.
> > > > >>
> > > > >> Cheers,
> > > > >>
> > > > >> Chris
> > > > >>
> > > > >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton <
> chrise@confluent.io>
> > > > >> wrote:
> > > > >>
> > > > >> > Hi Tom,
> > > > >> >
> > > > >> > Wow, I was way off base! I was thinking that the intent of the
> > > > fencible
> > > > >> > producer was to employ it by default with 3.0, as opposed to
> only
> > > > after
> > > > >> the
> > > > >> > worker-level
> > > > >> > "exactly.once.source.enabled" property was flipped on. You are
> > > correct
> > > > >> > that with the case you were actually describing, there would be
> no
> > > > >> > heightened ACL requirements, and that it would leave room in the
> > > > future
> > > > >> for
> > > > >> > exactly-once to be disabled on a per-connector basis (as long as
> > all
> > > > the
> > > > >> > workers in the cluster already had "exactly.once.source.enabled"
> > set
> > > > to
> > > > >> > "true") with no worries about breaking changes.
> > > > >> >
> > > > >> > I agree that this is something for another KIP; even if we could
> > > > squeeze
> > > > >> > it in in time for this release, it might be a bit much for new
> > users
> > > > to
> > > > >> > take in all at once. But I can add it to the doc as "future
> work"
> > > > since
> > > > >> > it's a promising idea that could prove valuable to someone who
> > might
> > > > >> need
> > > > >> > per-connector granularity in the future.
> > > > >> >
> > > > >> > Thanks for clearing things up; in retrospect your comments make
> a
> > > lot
> > > > >> more
> > > > >> > sense now, and I hope I've sufficiently addressed them by now.
> > > > >> >
> > > > >> > PSA for you and everyone else--I plan on updating the doc next
> > week
> > > > with
> > > > >> > the new APIs for connector-defined transaction boundaries,
> > > > >> > user-configurable transaction boundaries (i.e., poll vs.
> interval
> > > vs.
> > > > >> > connectors), and preflight checks for exactly-once validation
> > > > (required
> > > > >> vs.
> > > > >> > requested).
> > > > >> >
> > > > >> > Cheers,
> > > > >> >
> > > > >> > Chris
> > > > >> >
> > > > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley <
> tbentley@redhat.com>
> > > > >> wrote:
> > > > >> >
> > > > >> >> Hi Chris,
> > > > >> >>
> > > > >> >> Thanks for continuing to entertain some of these ideas.
> > > > >> >>
> > > > >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
> > > > >> <chrise@confluent.io.invalid
> > > > >> >> >
> > > > >> >> wrote:
> > > > >> >>
> > > > >> >> > [...]
> > > > >> >> >
> > > > >> >> That's true, but we do go from three static ACLs
> (write/describe
> > > on a
> > > > >> >> fixed
> > > > >> >> > transactional ID, and idempotent write on a fixed cluster)
> to a
> > > > >> dynamic
> > > > >> >> > collection of ACLs.
> > > > >> >> >
> > > > >> >>
> > > > >> >> I'm not quite sure I follow, maybe I've lost track. To be
> clear,
> > I
> > > > was
> > > > >> >> suggesting the use of a 'fencing producer' only in clusters
> with
> > > > >> >> exactly.once.source.enabled=true where I imagined the key
> > > difference
> > > > >> >> between the exactly once and fencing cases was how the producer
> > was
> > > > >> >> configured/used (transactional vs this new fencing semantic). I
> > > think
> > > > >> the
> > > > >> >> ACL requirements for connector producer principals would
> > therefore
> > > be
> > > > >> the
> > > > >> >> same as currently described in the KIP. The same is true for
> the
> > > > worker
> > > > >> >> principals (which is the only breaking change you give in the
> > KIP).
> > > > So
> > > > >> I
> > > > >> >> don't think the fencing idea changes the backwards
> compatibility
> > > > story
> > > > >> >> that's already in the KIP, just allows a safe per-connector
> > > > >> >> exactly.once=disabled option to be supported (with required as
> > > > >> requested
> > > > >> >> as
> > > > >> >> we already discussed).
> > > > >> >>
> > > > >> >> But I'm wondering whether I've overlooked something.
> > > > >> >>
> > > > >> >> Ultimately I think it may behoove us to err on the side of
> > reducing
> > > > the
> > > > >> >> > breaking changes here for now and saving them for 4.0 (or
> some
> > > > later
> > > > >> >> major
> > > > >> >> > release), but would be interested in thoughts from you and
> > > others.
> > > > >> >> >
> > > > >> >>
> > > > >> >> Difficult to answer (given I think I might be missing
> something).
> > > > >> >> If there are breaking changes then I don't disagree. It's
> > difficult
> > > > to
> > > > >> >> reason about big changes like this without some practical
> > > experience.
> > > > >> >> If there are not, then I think we could also implement the
> whole
> > > > >> >> exactly.once=disabled thing in a later KIP without additional
> > > > breaking
> > > > >> >> changes (i.e. some time in 3.x), right?
> > > > >> >>
> > > > >> >>
> > > > >> >> > > Gouzhang also has a (possible) use case for a fencing-only
> > > > >> producer (
> > > > >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as
> he
> > > > points
> > > > >> >> out
> > > > >> >> > there, you should be able to get these semantics today by
> > calling
> > > > >> >> > initTransactions() and then just using the producer as normal
> > (no
> > > > >> >> > beginTransaction()/abortTransaction()/endTransaction()).
> > > > >> >> >
> > > > >> >> > I tested this locally and was not met with success;
> > transactional
> > > > >> >> producers
> > > > >> >> > do a check right now to ensure that any calls to
> > > > >> "KafkaProducer::send"
> > > > >> >> > occur within a transaction (see
> > > > >> >> >
> > > > >> >> >
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> > > > >> >> > and
> > > > >> >> >
> > > > >> >> >
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> > > > >> >> > ).
> > > > >> >> > Not a blocker, just noting that we'd have to do some legwork
> to
> > > > make
> > > > >> >> this
> > > > >> >> > workable with the producer API.
> > > > >> >> >
> > > > >> >>
> > > > >> >> Ah, sorry, I should have actually tried it rather than just
> > taking
> > > a
> > > > >> quick
> > > > >> >> look at the code.
> > > > >> >>
> > > > >> >> Rather than remove those safety checks I suppose we'd need a
> way
> > of
> > > > >> >> distinguishing, in the config, the difference in semantics.
> E.g.
> > > > >> Something
> > > > >> >> like a fencing.id config, which was mutually exclusive with
> > > > >> >> transactional.id.
> > > > >> >> Likewise perhaps initFencing() alongside initTransactions() in
> > the
> > > > API.
> > > > >> >> But
> > > > >> >> I think at this point it's something for another KIP.
> > > > >> >>
> > > > >> >> Kind regards,
> > > > >> >>
> > > > >> >> Tom
> > > > >> >>
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Posted by Chris Egerton <ch...@confluent.io.INVALID>.
Hi Tom,

I do agree that it'd be safer to default to "required", but since at the
time of the 3.0 release no existing connectors will have implemented the
"SourceConnector::exactlyOnceSupport" method, it'd require all Connect
users to downgrade to "requested" anyways in order to enable exactly-once
support on their workers. The friction there seems a little excessive; we
might consider changing the default from "requested" to "required" later on
down the line after connector developers have had enough time to put out
new connector versions that implement the new API. Thoughts?

Cheers,

Chris

On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley <tb...@redhat.com> wrote:

> Hi Chris,
>
> Just a minor question: I can see why the default for exactly.once.support
> is requested (you want a good first-run experience, I assume), but it's a
> little like engineering a safety catch and then not enabling it. Wouldn't
> it be safer to default to required, so that there's no way someone can
> mistakenly not get EoS without explicitly having configured it?
>
> Thanks,
>
> Tom
>
> On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton <ch...@confluent.io.invalid>
> wrote:
>
> > Hi Gunnar,
> >
> > Thanks for taking a look! I've addressed the low-hanging fruit in the
> KIP;
> > responses to other comments inline here:
> >
> > > * TransactionContext: What's the use case for the methods accepting a
> > source record (commitTransaction(SourceRecord
> > record), abortTransaction(SourceRecord record))?
> >
> > This allows developers to decouple transaction boundaries from record
> > batches. If a connector has a configuration that dictates how often it
> > returns from "SourceTask::poll", for example, it may be easier to define
> > multiple transactions within a single batch or a single transaction
> across
> > several batches than to retrofit the connector's poll logic to work with
> > transaction boundaries.
> >
> > > * SourceTaskContext: Instead of guarding against NSME, is there a way
> for
> > a
> > connector to query the KC version and thus derive its capabilities? Going
> > forward, a generic API for querying capabilities could be nice, so a
> > connector can query for capabilities of the runtime in a safe and
> > compatible way.
> >
> > This would be a great quality-of-life improvement for connector and
> > framework developers alike, but I think it may be best left for a
> separate
> > KIP. The current approach, clunky though it may be, seems like a nuisance
> > at worst. It's definitely worth addressing but I'm not sure we have the
> > time to think through all the details thoroughly enough in time for the
> > upcoming KIP freeze.
> >
> > > * SourceConnector: Would it make sense to merge the two methods perhaps
> > and
> > return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES
> }?
> >
> > Hmm... at first glance I like the idea of merging the two methods a lot.
> > The one thing that gives me pause is that there may be connectors that
> > would like to define their own transaction boundaries without providing
> > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> > accommodate that, but then, it might actually be simpler to keep the two
> > methods separate in case we add some third variable to the mix that would
> > also have to be reflected in the possible ExactlyOnceSupport enum values.
> >
> > > Or, alternatively return an enum from canDefineTransactionBoundaries(),
> > too; even if it only has two values now, that'd allow for extension in
> the
> > future
> >
> > This is fine by me; we just have to figure out exactly which enum values
> > would be suitable. It's a little clunky but right now I'm toying with
> > something like "ConnectorDefinedTransactionBoundaries" with values of
> > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
> > need more granularity in the future then we can deprecate one or both of
> > them and add new values. Thoughts?
> >
> > > And one general question: in Debezium, we have some connectors that
> > produce
> > records "out-of-bands" to a schema history topic via their own custom
> > producer. Is there any way envisionable where such a producer would
> > participate in the transaction managed by the KC runtime environment?
> >
> > To answer the question exactly as asked: no; transactions cannot be
> shared
> > across producers and until/unless that is changed (which seems unlikely)
> > this won't be possible. However, I'm curious why a source connector would
> > spin up its own producer instead of using "SourceTask::poll" to provide
> > records to Connect. Is it easier to consume from that topic when the
> > connector can define its own (de)serialization format? I'm optimistic
> that
> > if we understand the use case for the separate producer we may still be
> > able to help bridge the gap here, one way or another.
> >
> > > One follow-up question after thinking some more about this; is there
> any
> > limit in terms of duration or size of in-flight, connector-controlled
> > transactions? In case of Debezium for instance, there may be cases where
> we
> > tail the TX log from an upstream source database, not knowing whether the
> > events we receive belong to a committed or aborted transaction. Would it
> be
> > valid to emit all these events via a transactional task, and in case we
> > receive a ROLLBACK event eventually, to abort the pending Kafka
> > transaction? Such source transactions could be running for a long time
> > potentially, e.g. hours or days (at least in theory). Or would this sort
> of
> > usage not be considered a reasonable one?
> >
> > I think the distinction between reasonable and unreasonable usage here is
> > likely dependent on use cases that people are trying to satisfy with
> their
> > connector, but if I had to guess, I'd say that a different approach is
> > probably warranted in most cases if the transaction spans across entire
> > days at a time. If there's no concern about data not being visible to
> > downstream consumers until its transaction is committed, and the number
> of
> > records in the transaction isn't so large that the amount of memory
> > required to buffer them all locally on a consumer before delivering them
> to
> > the downstream application is reasonable, it would technically be
> possible
> > though. Connect users would have to be mindful of the following:
> >
> > - A separate offsets topic for the connector would be highly recommended
> in
> > order to avoid crippling other connectors with hanging transactions
> > - The producer-level transaction.timeout.ms property (
> >
> >
> https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms
> > ),
> > which can be configured in connectors either via the worker-level
> > producer.transaction.timeout.ms or connector-level
> > producer.override.transaction.timeout.ms property, would have to be high
> > enough to allow for transactions that stay open for long periods of time
> > (the default is 1 minute, so this would almost certainly have to be
> > adjusted)
> > - The broker-level transaction.max.timeout.ms property (
> >
> >
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms
> > )
> > would have to be at least as high as the transaction timeout necessary
> for
> > the task (default is 15 minutes, so this would probably need to be
> > adjusted)
> > - The broker-level transactional.id.expiration.ms property (
> >
> >
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms
> > )
> > would have to be high enough to not automatically expire the task's
> > producer if there was a long enough period without new records; default
> is
> > 7 days, so this would probably be fine in most scenarios
> >
> > Thanks again for taking a look; insight from connector developers is
> > tremendously valuable here!
> >
> > Cheers,
> >
> > Chris
> >
> > On Thu, May 27, 2021 at 6:35 PM Gunnar Morling
> > <gu...@googlemail.com.invalid> wrote:
> >
> > > Chris,
> > >
> > > One follow-up question after thinking some more about this; is there
> any
> > > limit in terms of duration or size of in-flight, connector-controlled
> > > transactions? In case of Debezium for instance, there may be cases
> where
> > we
> > > tail the TX log from an upstream source database, not knowing whether
> the
> > > events we receive belong to a committed or aborted transaction. Would
> it
> > be
> > > valid to emit all these events via a transactional task, and in case we
> > > receive a ROLLBACK event eventually, to abort the pending Kafka
> > > transaction? Such source transactions could be running for a long time
> > > potentially, e.g. hours or days (at least in theory). Or would this
> sort
> > of
> > > usage not be considered a reasonable one?
> > >
> > > Thanks,
> > >
> > > --Gunnar
> > >
> > >
> > > Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
> > > gunnar.morling@googlemail.com>:
> > >
> > > > Chris, all,
> > > >
> > > > I've just read KIP-618, and let me congratulate you first of all for
> > this
> > > > impressive piece of work! Here's a few small suggestions and
> questions
> > I
> > > > had while reading:
> > > >
> > > > * TransactionContext: What's the use case for the methods accepting a
> > > > source record (commitTransaction(SourceRecord
> > > > record), abortTransaction(SourceRecord record))?
> > > > * SourceTaskContext: Typo in "when the sink connector is deployed" ->
> > > > source task
> > > > * SourceTaskContext: Instead of guarding against NSME, is there a way
> > for
> > > > a connector to query the KC version and thus derive its capabilities?
> > > Going
> > > > forward, a generic API for querying capabilities could be nice, so a
> > > > connector can query for capabilities of the runtime in a safe and
> > > > compatible way.
> > > > * SourceConnector: exactlyOnceSupport() -> false return value doesn't
> > > match
> > > > * SourceConnector: Would it make sense to merge the two methods
> perhaps
> > > > and return one enum of { SUPPORTED, NOT_SUPPORTED,
> > > > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> > > > from canDefineTransactionBoundaries(), too; even if it only has two
> > > values
> > > > now, that'd allow for extension in the future
> > > >
> > > > And one general question: in Debezium, we have some connectors that
> > > > produce records "out-of-bands" to a schema history topic via their
> own
> > > > custom producer. Is there any way envisionable where such a producer
> > > would
> > > > participate in the transaction managed by the KC runtime environment?
> > > >
> > > > Thanks a lot,
> > > >
> > > > --Gunnar
> > > >
> > > >
> > > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> > > > <ch...@confluent.io.invalid>:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Wanted to note here that I've updated the KIP document to include
> the
> > > >> changes discussed recently. They're mostly located in the "Public
> > > >> Interfaces" section. I suspect discussion hasn't concluded yet and
> > there
> > > >> will probably be a few more changes to come, but wanted to take the
> > > >> opportunity to provide a snapshot of what the current design looks
> > like.
> > > >>
> > > >> Cheers,
> > > >>
> > > >> Chris
> > > >>
> > > >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton <ch...@confluent.io>
> > > >> wrote:
> > > >>
> > > >> > Hi Tom,
> > > >> >
> > > >> > Wow, I was way off base! I was thinking that the intent of the
> > > fencible
> > > >> > producer was to employ it by default with 3.0, as opposed to only
> > > after
> > > >> the
> > > >> > worker-level
> > > >> > "exactly.once.source.enabled" property was flipped on. You are
> > correct
> > > >> > that with the case you were actually describing, there would be no
> > > >> > heightened ACL requirements, and that it would leave room in the
> > > future
> > > >> for
> > > >> > exactly-once to be disabled on a per-connector basis (as long as
> all
> > > the
> > > >> > workers in the cluster already had "exactly.once.source.enabled"
> set
> > > to
> > > >> > "true") with no worries about breaking changes.
> > > >> >
> > > >> > I agree that this is something for another KIP; even if we could
> > > squeeze
> > > >> > it in in time for this release, it might be a bit much for new
> users
> > > to
> > > >> > take in all at once. But I can add it to the doc as "future work"
> > > since
> > > >> > it's a promising idea that could prove valuable to someone who
> might
> > > >> need
> > > >> > per-connector granularity in the future.
> > > >> >
> > > >> > Thanks for clearing things up; in retrospect your comments make a
> > lot
> > > >> more
> > > >> > sense now, and I hope I've sufficiently addressed them by now.
> > > >> >
> > > >> > PSA for you and everyone else--I plan on updating the doc next
> week
> > > with
> > > >> > the new APIs for connector-defined transaction boundaries,
> > > >> > user-configurable transaction boundaries (i.e., poll vs. interval
> > vs.
> > > >> > connectors), and preflight checks for exactly-once validation
> > > (required
> > > >> vs.
> > > >> > requested).
> > > >> >
> > > >> > Cheers,
> > > >> >
> > > >> > Chris
> > > >> >
> > > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley <tb...@redhat.com>
> > > >> wrote:
> > > >> >
> > > >> >> Hi Chris,
> > > >> >>
> > > >> >> Thanks for continuing to entertain some of these ideas.
> > > >> >>
> > > >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
> > > >> <chrise@confluent.io.invalid
> > > >> >> >
> > > >> >> wrote:
> > > >> >>
> > > >> >> > [...]
> > > >> >> >
> > > >> >> That's true, but we do go from three static ACLs (write/describe
> > on a
> > > >> >> fixed
> > > >> >> > transactional ID, and idempotent write on a fixed cluster) to a
> > > >> dynamic
> > > >> >> > collection of ACLs.
> > > >> >> >
> > > >> >>
> > > >> >> I'm not quite sure I follow, maybe I've lost track. To be clear,
> I
> > > was
> > > >> >> suggesting the use of a 'fencing producer' only in clusters with
> > > >> >> exactly.once.source.enabled=true where I imagined the key
> > difference
> > > >> >> between the exactly once and fencing cases was how the producer
> was
> > > >> >> configured/used (transactional vs this new fencing semantic). I
> > think
> > > >> the
> > > >> >> ACL requirements for connector producer principals would
> therefore
> > be
> > > >> the
> > > >> >> same as currently described in the KIP. The same is true for the
> > > worker
> > > >> >> principals (which is the only breaking change you give in the
> KIP).
> > > So
> > > >> I
> > > >> >> don't think the fencing idea changes the backwards compatibility
> > > story
> > > >> >> that's already in the KIP, just allows a safe per-connector
> > > >> >> exactly.once=disabled option to be supported (with required as
> > > >> requested
> > > >> >> as
> > > >> >> we already discussed).
> > > >> >>
> > > >> >> But I'm wondering whether I've overlooked something.
> > > >> >>
> > > >> >> Ultimately I think it may behoove us to err on the side of
> reducing
> > > the
> > > >> >> > breaking changes here for now and saving them for 4.0 (or some
> > > later
> > > >> >> major
> > > >> >> > release), but would be interested in thoughts from you and
> > others.
> > > >> >> >
> > > >> >>
> > > >> >> Difficult to answer (given I think I might be missing something).
> > > >> >> If there are breaking changes then I don't disagree. It's
> difficult
> > > to
> > > >> >> reason about big changes like this without some practical
> > experience.
> > > >> >> If there are not, then I think we could also implement the whole
> > > >> >> exactly.once=disabled thing in a later KIP without additional
> > > breaking
> > > >> >> changes (i.e. some time in 3.x), right?
> > > >> >>
> > > >> >>
> > > >> >> > > Gouzhang also has a (possible) use case for a fencing-only
> > > >> producer (
> > > >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as he
> > > points
> > > >> >> out
> > > >> >> > there, you should be able to get these semantics today by
> calling
> > > >> >> > initTransactions() and then just using the producer as normal
> (no
> > > >> >> > beginTransaction()/abortTransaction()/endTransaction()).
> > > >> >> >
> > > >> >> > I tested this locally and was not met with success;
> transactional
> > > >> >> producers
> > > >> >> > do a check right now to ensure that any calls to
> > > >> "KafkaProducer::send"
> > > >> >> > occur within a transaction (see
> > > >> >> >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> > > >> >> > and
> > > >> >> >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> > > >> >> > ).
> > > >> >> > Not a blocker, just noting that we'd have to do some legwork to
> > > make
> > > >> >> this
> > > >> >> > workable with the producer API.
> > > >> >> >
> > > >> >>
> > > >> >> Ah, sorry, I should have actually tried it rather than just
> taking
> > a
> > > >> quick
> > > >> >> look at the code.
> > > >> >>
> > > >> >> Rather than remove those safety checks I suppose we'd need a way
> of
> > > >> >> distinguishing, in the config, the difference in semantics. E.g.
> > > >> Something
> > > >> >> like a fencing.id config, which was mutually exclusive with
> > > >> >> transactional.id.
> > > >> >> Likewise perhaps initFencing() alongside initTransactions() in
> the
> > > API.
> > > >> >> But
> > > >> >> I think at this point it's something for another KIP.
> > > >> >>
> > > >> >> Kind regards,
> > > >> >>
> > > >> >> Tom
> > > >> >>
> > > >> >
> > > >>
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-618: Atomic commit of source connector records and offsets

Posted by Tom Bentley <tb...@redhat.com>.
Hi Chris,

Just a minor question: I can see why the default for exactly.once.support
is requested (you want a good first-run experience, I assume), but it's a
little like engineering a safety catch and then not enabling it. Wouldn't
it be safer to default to required, so that there's no way someone can
mistakenly not get EoS without explicitly having configured it?

Thanks,

Tom

On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton <ch...@confluent.io.invalid>
wrote:

> Hi Gunnar,
>
> Thanks for taking a look! I've addressed the low-hanging fruit in the KIP;
> responses to other comments inline here:
>
> > * TransactionContext: What's the use case for the methods accepting a
> source record (commitTransaction(SourceRecord
> record), abortTransaction(SourceRecord record))?
>
> This allows developers to decouple transaction boundaries from record
> batches. If a connector has a configuration that dictates how often it
> returns from "SourceTask::poll", for example, it may be easier to define
> multiple transactions within a single batch or a single transaction across
> several batches than to retrofit the connector's poll logic to work with
> transaction boundaries.
>
> > * SourceTaskContext: Instead of guarding against NSME, is there a way for
> a
> connector to query the KC version and thus derive its capabilities? Going
> forward, a generic API for querying capabilities could be nice, so a
> connector can query for capabilities of the runtime in a safe and
> compatible way.
>
> This would be a great quality-of-life improvement for connector and
> framework developers alike, but I think it may be best left for a separate
> KIP. The current approach, clunky though it may be, seems like a nuisance
> at worst. It's definitely worth addressing but I'm not sure we have the
> time to think through all the details thoroughly enough in time for the
> upcoming KIP freeze.
>
> > * SourceConnector: Would it make sense to merge the two methods perhaps
> and
> return one enum of { SUPPORTED, NOT_SUPPORTED, SUPPORTED_WITH_BOUNDARIES }?
>
> Hmm... at first glance I like the idea of merging the two methods a lot.
> The one thing that gives me pause is that there may be connectors that
> would like to define their own transaction boundaries without providing
> exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to
> accommodate that, but then, it might actually be simpler to keep the two
> methods separate in case we add some third variable to the mix that would
> also have to be reflected in the possible ExactlyOnceSupport enum values.
>
> > Or, alternatively return an enum from canDefineTransactionBoundaries(),
> too; even if it only has two values now, that'd allow for extension in the
> future
>
> This is fine by me; we just have to figure out exactly which enum values
> would be suitable. It's a little clunky but right now I'm toying with
> something like "ConnectorDefinedTransactionBoundaries" with values of
> "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we
> need more granularity in the future then we can deprecate one or both of
> them and add new values. Thoughts?
>
> > And one general question: in Debezium, we have some connectors that
> produce
> records "out-of-bands" to a schema history topic via their own custom
> producer. Is there any way envisionable where such a producer would
> participate in the transaction managed by the KC runtime environment?
>
> To answer the question exactly as asked: no; transactions cannot be shared
> across producers and until/unless that is changed (which seems unlikely)
> this won't be possible. However, I'm curious why a source connector would
> spin up its own producer instead of using "SourceTask::poll" to provide
> records to Connect. Is it easier to consume from that topic when the
> connector can define its own (de)serialization format? I'm optimistic that
> if we understand the use case for the separate producer we may still be
> able to help bridge the gap here, one way or another.
>
> > One follow-up question after thinking some more about this; is there any
> limit in terms of duration or size of in-flight, connector-controlled
> transactions? In case of Debezium for instance, there may be cases where we
> tail the TX log from an upstream source database, not knowing whether the
> events we receive belong to a committed or aborted transaction. Would it be
> valid to emit all these events via a transactional task, and in case we
> receive a ROLLBACK event eventually, to abort the pending Kafka
> transaction? Such source transactions could be running for a long time
> potentially, e.g. hours or days (at least in theory). Or would this sort of
> usage not be considered a reasonable one?
>
> I think the distinction between reasonable and unreasonable usage here is
> likely dependent on use cases that people are trying to satisfy with their
> connector, but if I had to guess, I'd say that a different approach is
> probably warranted in most cases if the transaction spans across entire
> days at a time. If there's no concern about data not being visible to
> downstream consumers until its transaction is committed, and the number of
> records in the transaction isn't so large that the amount of memory
> required to buffer them all locally on a consumer before delivering them to
> the downstream application is reasonable, it would technically be possible
> though. Connect users would have to be mindful of the following:
>
> - A separate offsets topic for the connector would be highly recommended in
> order to avoid crippling other connectors with hanging transactions
> - The producer-level transaction.timeout.ms property (
>
> https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms
> ),
> which can be configured in connectors either via the worker-level
> producer.transaction.timeout.ms or connector-level
> producer.override.transaction.timeout.ms property, would have to be high
> enough to allow for transactions that stay open for long periods of time
> (the default is 1 minute, so this would almost certainly have to be
> adjusted)
> - The broker-level transaction.max.timeout.ms property (
>
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms
> )
> would have to be at least as high as the transaction timeout necessary for
> the task (default is 15 minutes, so this would probably need to be
> adjusted)
> - The broker-level transactional.id.expiration.ms property (
>
> https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms
> )
> would have to be high enough to not automatically expire the task's
> producer if there was a long enough period without new records; default is
> 7 days, so this would probably be fine in most scenarios
>
> Thanks again for taking a look; insight from connector developers is
> tremendously valuable here!
>
> Cheers,
>
> Chris
>
> On Thu, May 27, 2021 at 6:35 PM Gunnar Morling
> <gu...@googlemail.com.invalid> wrote:
>
> > Chris,
> >
> > One follow-up question after thinking some more about this; is there any
> > limit in terms of duration or size of in-flight, connector-controlled
> > transactions? In case of Debezium for instance, there may be cases where
> we
> > tail the TX log from an upstream source database, not knowing whether the
> > events we receive belong to a committed or aborted transaction. Would it
> be
> > valid to emit all these events via a transactional task, and in case we
> > receive a ROLLBACK event eventually, to abort the pending Kafka
> > transaction? Such source transactions could be running for a long time
> > potentially, e.g. hours or days (at least in theory). Or would this sort
> of
> > usage not be considered a reasonable one?
> >
> > Thanks,
> >
> > --Gunnar
> >
> >
> > Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling <
> > gunnar.morling@googlemail.com>:
> >
> > > Chris, all,
> > >
> > > I've just read KIP-618, and let me congratulate you first of all for
> this
> > > impressive piece of work! Here's a few small suggestions and questions
> I
> > > had while reading:
> > >
> > > * TransactionContext: What's the use case for the methods accepting a
> > > source record (commitTransaction(SourceRecord
> > > record), abortTransaction(SourceRecord record))?
> > > * SourceTaskContext: Typo in "when the sink connector is deployed" ->
> > > source task
> > > * SourceTaskContext: Instead of guarding against NSME, is there a way
> for
> > > a connector to query the KC version and thus derive its capabilities?
> > Going
> > > forward, a generic API for querying capabilities could be nice, so a
> > > connector can query for capabilities of the runtime in a safe and
> > > compatible way.
> > > * SourceConnector: exactlyOnceSupport() -> false return value doesn't
> > match
> > > * SourceConnector: Would it make sense to merge the two methods perhaps
> > > and return one enum of { SUPPORTED, NOT_SUPPORTED,
> > > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum
> > > from canDefineTransactionBoundaries(), too; even if it only has two
> > values
> > > now, that'd allow for extension in the future
> > >
> > > And one general question: in Debezium, we have some connectors that
> > > produce records "out-of-bands" to a schema history topic via their own
> > > custom producer. Is there any way envisionable where such a producer
> > would
> > > participate in the transaction managed by the KC runtime environment?
> > >
> > > Thanks a lot,
> > >
> > > --Gunnar
> > >
> > >
> > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton
> > > <ch...@confluent.io.invalid>:
> > >
> > >> Hi all,
> > >>
> > >> Wanted to note here that I've updated the KIP document to include the
> > >> changes discussed recently. They're mostly located in the "Public
> > >> Interfaces" section. I suspect discussion hasn't concluded yet and
> there
> > >> will probably be a few more changes to come, but wanted to take the
> > >> opportunity to provide a snapshot of what the current design looks
> like.
> > >>
> > >> Cheers,
> > >>
> > >> Chris
> > >>
> > >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton <ch...@confluent.io>
> > >> wrote:
> > >>
> > >> > Hi Tom,
> > >> >
> > >> > Wow, I was way off base! I was thinking that the intent of the
> > fencible
> > >> > producer was to employ it by default with 3.0, as opposed to only
> > after
> > >> the
> > >> > worker-level
> > >> > "exactly.once.source.enabled" property was flipped on. You are
> correct
> > >> > that with the case you were actually describing, there would be no
> > >> > heightened ACL requirements, and that it would leave room in the
> > future
> > >> for
> > >> > exactly-once to be disabled on a per-connector basis (as long as all
> > the
> > >> > workers in the cluster already had "exactly.once.source.enabled" set
> > to
> > >> > "true") with no worries about breaking changes.
> > >> >
> > >> > I agree that this is something for another KIP; even if we could
> > squeeze
> > >> > it in in time for this release, it might be a bit much for new users
> > to
> > >> > take in all at once. But I can add it to the doc as "future work"
> > since
> > >> > it's a promising idea that could prove valuable to someone who might
> > >> need
> > >> > per-connector granularity in the future.
> > >> >
> > >> > Thanks for clearing things up; in retrospect your comments make a
> lot
> > >> more
> > >> > sense now, and I hope I've sufficiently addressed them by now.
> > >> >
> > >> > PSA for you and everyone else--I plan on updating the doc next week
> > with
> > >> > the new APIs for connector-defined transaction boundaries,
> > >> > user-configurable transaction boundaries (i.e., poll vs. interval
> vs.
> > >> > connectors), and preflight checks for exactly-once validation
> > (required
> > >> vs.
> > >> > requested).
> > >> >
> > >> > Cheers,
> > >> >
> > >> > Chris
> > >> >
> > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley <tb...@redhat.com>
> > >> wrote:
> > >> >
> > >> >> Hi Chris,
> > >> >>
> > >> >> Thanks for continuing to entertain some of these ideas.
> > >> >>
> > >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton
> > >> <chrise@confluent.io.invalid
> > >> >> >
> > >> >> wrote:
> > >> >>
> > >> >> > [...]
> > >> >> >
> > >> >> That's true, but we do go from three static ACLs (write/describe
> on a
> > >> >> fixed
> > >> >> > transactional ID, and idempotent write on a fixed cluster) to a
> > >> dynamic
> > >> >> > collection of ACLs.
> > >> >> >
> > >> >>
> > >> >> I'm not quite sure I follow, maybe I've lost track. To be clear, I
> > was
> > >> >> suggesting the use of a 'fencing producer' only in clusters with
> > >> >> exactly.once.source.enabled=true where I imagined the key
> difference
> > >> >> between the exactly once and fencing cases was how the producer was
> > >> >> configured/used (transactional vs this new fencing semantic). I
> think
> > >> the
> > >> >> ACL requirements for connector producer principals would therefore
> be
> > >> the
> > >> >> same as currently described in the KIP. The same is true for the
> > worker
> > >> >> principals (which is the only breaking change you give in the KIP).
> > So
> > >> I
> > >> >> don't think the fencing idea changes the backwards compatibility
> > story
> > >> >> that's already in the KIP, just allows a safe per-connector
> > >> >> exactly.once=disabled option to be supported (with required as
> > >> requested
> > >> >> as
> > >> >> we already discussed).
> > >> >>
> > >> >> But I'm wondering whether I've overlooked something.
> > >> >>
> > >> >> Ultimately I think it may behoove us to err on the side of reducing
> > the
> > >> >> > breaking changes here for now and saving them for 4.0 (or some
> > later
> > >> >> major
> > >> >> > release), but would be interested in thoughts from you and
> others.
> > >> >> >
> > >> >>
> > >> >> Difficult to answer (given I think I might be missing something).
> > >> >> If there are breaking changes then I don't disagree. It's difficult
> > to
> > >> >> reason about big changes like this without some practical
> experience.
> > >> >> If there are not, then I think we could also implement the whole
> > >> >> exactly.once=disabled thing in a later KIP without additional
> > breaking
> > >> >> changes (i.e. some time in 3.x), right?
> > >> >>
> > >> >>
> > >> >> > > Gouzhang also has a (possible) use case for a fencing-only
> > >> producer (
> > >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as he
> > points
> > >> >> out
> > >> >> > there, you should be able to get these semantics today by calling
> > >> >> > initTransactions() and then just using the producer as normal (no
> > >> >> > beginTransaction()/abortTransaction()/endTransaction()).
> > >> >> >
> > >> >> > I tested this locally and was not met with success; transactional
> > >> >> producers
> > >> >> > do a check right now to ensure that any calls to
> > >> "KafkaProducer::send"
> > >> >> > occur within a transaction (see
> > >> >> >
> > >> >> >
> > >> >>
> > >>
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959
> > >> >> > and
> > >> >> >
> > >> >> >
> > >> >>
> > >>
> >
> https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451
> > >> >> > ).
> > >> >> > Not a blocker, just noting that we'd have to do some legwork to
> > make
> > >> >> this
> > >> >> > workable with the producer API.
> > >> >> >
> > >> >>
> > >> >> Ah, sorry, I should have actually tried it rather than just taking
> a
> > >> quick
> > >> >> look at the code.
> > >> >>
> > >> >> Rather than remove those safety checks I suppose we'd need a way of
> > >> >> distinguishing, in the config, the difference in semantics. E.g.
> > >> Something
> > >> >> like a fencing.id config, which was mutually exclusive with
> > >> >> transactional.id.
> > >> >> Likewise perhaps initFencing() alongside initTransactions() in the
> > API.
> > >> >> But
> > >> >> I think at this point it's something for another KIP.
> > >> >>
> > >> >> Kind regards,
> > >> >>
> > >> >> Tom
> > >> >>
> > >> >
> > >>
> > >
> >
>