You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Yash Mayya <ya...@gmail.com> on 2023/07/03 10:54:02 UTC

Re: [DISCUSS] KIP-793: Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

Hi Chris,

Thanks for pointing that out, I hadn't realized that the SubmittedRecords
class has almost exactly the same semantics needed for handling offset
commits in the per-sink record ack API case. However, I agree that it isn't
worth the tradeoff and we've already discussed the backward compatibility
concerns imposed on connector developers if we were to consider deprecating
/ removing the preCommit hook in favor of a new ack-based API.

Thanks,
Yash

On Thu, Jun 29, 2023 at 7:31 PM Chris Egerton <ch...@aiven.io.invalid>
wrote:

> Hi Yash,
>
> Thanks for your continued work on this tricky feature. I have no further
> comments or suggestions on the KIP and am ready to vote in favor of it.
>
> That said, I did want to quickly respond to this comment:
>
> > On a side note, this also means that the per sink record ack API
> that was proposed earlier wouldn't really work for this case since Kafka
> consumers themselves don't support per message acknowledgement semantics
> (and any sort of manual book-keeping based on offset linearity in a topic
> partition would be affected by things like log compaction, control records
> for transactional use cases etc.) right?
>
> I believe we could still use the SubmittedRecords class [1] (with some
> small tweaks) to track ack'd messages and the latest-committable offsets
> per topic partition, without relying on assumptions about offsets for
> consecutive records consumed from Kafka always differing by one. But at
> this point I think that, although this approach does come with the
> advantage of also enabling fine-grained metrics on record delivery to the
> sink system, it's not worth the tradeoff in intuition since it's less clear
> why users should prefer that API instead of using SinkTask::preCommit.
>
> [1] -
>
> https://github.com/apache/kafka/blob/12be344fdd3b20f338ccab87933b89049ce202a4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
>
> Cheers,
>
> Chris
>
> On Wed, Jun 21, 2023 at 9:46 AM Yash Mayya <ya...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > Firstly, thanks for sharing your detailed thoughts on this thorny issue!
> > Point taken on Kafka Connect being a brownfield project and I guess we
> > might just need to trade off elegant / "clean" interfaces for fixing this
> > gap in functionality. Also, thanks for calling out all the existing
> > cross-plugin interactions and also the fact that connectors are not and
> > should not be developed in silos ignoring the rest of the ecosystem. That
> > said, here are my thoughts:
> >
> > > we could replace these methods with headers that the
> > > Connect runtime automatically injects into records directly
> > > before dispatching them to SinkTask::put.
> >
> > Hm, that's an interesting idea to get around the need for connectors to
> > handle potential 'NoSuchMethodError's in calls to
> > SinkRecord::originalTopic/originalKafkaPartition/originalKafkaOffset.
> > However, I'm inclined to agree that retrieving these values from the
> record
> > headers seems even less intuitive and I'm okay with adding this to the
> > rejected alternatives list.
> >
> > > we can consider eliminating the overridden
> > > SinkTask::open/close methods
> >
> > I tried to further explore the idea of keeping just the existing
> > SinkTask::open / SinkTask::close methods but only calling them with
> > post-transform topic partitions and ended up coming to the same
> conclusion
> > that you did earlier in this thread :)
> >
> > The overloaded SinkTask::open / SinkTask::close are currently the biggest
> > sticking points with the latest iteration of this KIP and I'd prefer this
> > elimination for now. The primary reasoning is that the information from
> > open / close on pre-transform topic partitions can be combined with the
> per
> > record information of both pre-transform and post-transform topic
> > partitions to handle most practical use cases without significantly
> > muddying the sink connector related public interfaces. The argument that
> > this makes it harder for sink connectors to deal with post-transform
> topic
> > partitions (i.e. in terms of grouping together or batching records for
> > writing to the sink system) can be countered with the fact that it'll be
> > similarly challenging even with the overloaded method approach of calling
> > open / close with both pre-transform and post-transform topic partitions
> > since the batching would be done on post-transform topic partitions
> whereas
> > offset tracking and reporting for commits would be done on pre-transform
> > topic partitions (and the two won't necessarily serially advance in
> > lockstep). On a side note, this also means that the per sink record ack
> API
> > that was proposed earlier wouldn't really work for this case since Kafka
> > consumers themselves don't support per message acknowledgement semantics
> > (and any sort of manual book-keeping based on offset linearity in a topic
> > partition would be affected by things like log compaction, control
> records
> > for transactional use cases etc.) right? Overall, I think that the only
> > benefit of the overloaded open / close methods approach is that the
> > framework can enable the eventual closure of any post-transform topic
> > partition based writers created by sink tasks using the heuristics we
> > discussed earlier (via a cache with a time-based eviction policy) which
> > doesn't seem worth it at this point.
> >
> > Thanks,
> > Yash
> >
> > On Mon, May 22, 2023 at 7:30 PM Chris Egerton <ch...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Yash,
> > >
> > > I've been following the discussion and have some thoughts. Ultimately
> I'm
> > > still in favor of this KIP and would hate to see it go dormant, though
> we
> > > may end up settling for a less-invasive option.
> > >
> > >
> > > On the topic of abstraction and inter-plugin interactions:
> > >
> > > First, there already are instances of cross-plugin interactions.
> Logical
> > > type handling is probably the biggest example: a source connector
> embeds
> > > metadata in the schema for record keys/values it emits that notifies
> > > downstream converters about how to handle them. We provide support for
> > some
> > > logical types in Connect out of the box, but there's nothing stopping
> > > connector and converter developers from implementing their own logical
> > type
> > > support using the exact same mechanism and different logical type
> names,
> > > which is already done by Debezium, to name one example.
> > >
> > > Second, although it's been a goal of Connect to abstract away parts of
> > > building a data pipeline so that, e.g., connector developers don't have
> > to
> > > be concerned with converters or consumers, in reality, this layer of
> > > abstraction has already been eroded. The example that most-readily
> comes
> > to
> > > mind is how source tasks are notified of the offsets of records that
> > > they've emitted after they've been published to Kafka via
> > > SourceTask::commitRecord [1].
> > >
> > > But, more importantly, it's unlikely that connectors are being
> developed
> > in
> > > complete isolation. Nobody's going to implement the SinkConnector /
> > > SinkTask interfaces and then throw that code off to someone else to
> > figure
> > > out all the details of deployment, configuration, testing, etc.
> > Developers
> > > will probably have to be aware of at least the converter interface,
> some
> > of
> > > the available implementations of it, and some details of Kafka clients
> > > (e.g., consumer groups for sink connectors). And this isn't a bad
> > > thing--it's unlikely that someone will write a Kafka connector without
> > > having or benefitting from some understanding of Kafka and the steps of
> > the
> > > data pipeline that it will be a part of.
> > >
> > > Bringing this to the practical topic of discussion--transformations--I
> > > think it's actually in everyone's best interests for connector
> developers
> > > to be aware of transformations. This isn't just because of the specific
> > > problem that the KIP is trying to address. It's because there's plenty
> of
> > > logic that can be implemented via SMT that a naive connector developer
> > will
> > > think that they have to implement on their own, which will ultimately
> > lead
> > > to a sub-par experience for people who end up using those connectors
> due
> > to
> > > inconsistent semantics (especially lack of predicates), inconsistent
> > > configuration syntax, increased chances for bugs, and FUD ("why wasn't
> > this
> > > implemented as an SMT?").
> > >
> > > Finally, although preserving clean, composable interfaces that can be
> > > understood in isolation is a great principle to start with, we are now
> in
> > > what Anna McDonald recently referred to as "brownfield" space for
> > Connect.
> > > We can't go back in time and redesign the SMT interface/contracts to
> make
> > > things cleaner. And I don't think it's fair to anyone to suddenly drop
> > > support for SMTs that mutate t/p/o information for sink records,
> > especially
> > > since these can be used gainfully with plenty of existing sink
> > connectors.
> > >
> > > Ultimately I still think the path forward that's best for the users is
> to
> > > make the impossible possible by addressing this long-standing API gap
> in
> > > Connect. Yes, it adds to the cognitive burden for connector developers,
> > but
> > > if they can tolerate it, the end result is better for everyone
> involved,
> > > and if they can't, it's likely that the end result will be a
> preservation
> > > of existing behavior, which leaves us no worse than before.
> > >
> > >
> > > With all that said, I've thought about how to minimize or at least hide
> > the
> > > API changes as much as possible. I've had two thoughts:
> > >
> > > 1. On the
> > > SinkRecord::originalTopic/originalKafkaPartition/originalKafkaOffset
> > front,
> > > we could replace these methods with headers that the Connect runtime
> > > automatically injects into records directly before dispatching them to
> > > SinkTask::put. The names can be the proposed method names (e.g.,
> > > "originalTopic"). I believe this is inferior to the current proposal
> and
> > > should be a rejected alternative, but it at least seemed worth floating
> > in
> > > the name of compromise. I dislike this approach for two reasons: first,
> > it
> > > seems even less intuitive, and second, it doesn't come with the benefit
> > of
> > > encouraging connector developers to understand the SMT interface and
> take
> > > it into account when designing connectors.
> > >
> > > 2. Although I'd hate to see the same bookkeeping logic implemented in
> > > multiple connectors, we can consider eliminating the overridden
> > > SinkTask::open/close methods. A note should be added to both methods
> > > clarifying that they are only invoked with the original, pre-transform
> > > topic partitions, and developers will be on their own if they want to
> > deal
> > > with post-transform topic partitions instead. I'm on the fence with
> this
> > > one, but if it's a choice between passing this KIP without modifying
> > > SinkTask::open/close, or letting the KIP go dormant, I'd happily choose
> > the
> > > former.
> > >
> > > Thanks Yash and Greg for the discussion so far, and apologies for the
> > wall
> > > of text. Looking forward to your thoughts.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > [1] -
> > >
> > >
> >
> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceTask.html#commitRecord(org.apache.kafka.connect.source.SourceRecord,org.apache.kafka.clients.producer.RecordMetadata)
> > >
> > > On Sun, Apr 23, 2023 at 11:20 AM Yash Mayya <ya...@gmail.com>
> > wrote:
> > >
> > > > Hi Greg,
> > > >
> > > > Thanks for the response and sorry for the late reply.
> > > >
> > > > > Currently the AK tests have a lot of calls to, for example, new
> > > > > SinkRecord(String topic, int partition, Schema keySchema,
> > > > > Object key, Schema valueSchema, Object value, long kafkaOffset)
> > > > > , a constructor without the original T/P/O values. I assumed that
> for
> > > > > backwards compatibility these constructors would still be usable in
> > > > > new runtimes. I imagine that there are also tests in downstream
> > > projects
> > > > > which make use of these constructors, whenever a Transform,
> > Predicate,
> > > > > or Task is tested without a corresponding Converter. My question
> was
> > > > > about what values are chosen for the original T/P/O methods when
> > these
> > > > > constructors are used after an upgrade to the latest connect-api.
> > > >
> > > > That's a good question - since this should only primarily affect
> > testing
> > > I
> > > > think it should be acceptable to simply use the topic, partition and
> > > > kafkaOffset values as the originalTopic, originalKafkaPartition
> > > > and originalKafkaOffset?
> > > >
> > > > > If you inject the original T/P/O only before and after the chain,
> > SMTs
> > > > > after an SMT which changes the original T/P/O will see whatever the
> > > > earlier
> > > > > SMT emitted. Is this intentional, or should this be avoided?
> > > >
> > > > Hmm, this sounds like a misbehaving / badly implemented SMTs since
> > there
> > > > doesn't seem to be any reasonable situation where an SMT should
> modify
> > a
> > > > sink record's original topic / partition / offset data so I'm not in
> > > favor
> > > > of introducing checks and guards in the framework for this.
> > > >
> > > > Another point that I've been pondering about is the one you raised
> > about
> > > > the composability of Connect's plugin ecosystem and the special case
> > > > handling we're adding to sink connector plugins to work with certain
> > > > transformation plugin types. This really doesn't seem like a good
> > > precedent
> > > > to be setting / starting (since there don't seem to be any other such
> > > > "snowflake" inter-plugin interactions) in my opinion. The alternative
> > of
> > > > completely managing this in the framework (and only exposing the
> > virtual
> > > > coordinates to the sink tasks) doesn't seem too appealing either due
> to
> > > the
> > > > backward compatibility concerns while maintaining existing support
> and
> > > > functionality such as the possibility of implementing exactly-once
> > > > semantics, ability for tasks to rewind consumer offsets arbitrarily
> > > (which
> > > > might require the introduction of some form of persistence for the
> > > physical
> > > > <-> virtual coordinate mapping) etc. Unfortunately, even though this
> > is a
> > > > long standing problem that all of us want to fix, I'm considering
> > moving
> > > > this KIP into a dormant / inactive state since there doesn't seem to
> > be a
> > > > design that satisfies all the general principles that the Kafka
> Connect
> > > > framework has striven to uphold.
> > > >
> > > > Thanks,
> > > > Yash
> > > >
> > > > On Tue, Mar 14, 2023 at 3:31 AM Greg Harris
> > <greg.harris@aiven.io.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Yash,
> > > > >
> > > > > > 'm not sure I follow - are you asking about how the tests will be
> > > > updated
> > > > > post this change or about how upgrades will look like for clusters
> in
> > > > > production?
> > > > >
> > > > > Currently the AK tests have a lot of calls to, for example, new
> > > > > SinkRecord(String topic, int partition, Schema keySchema, Object
> key,
> > > > > Schema valueSchema, Object value, long kafkaOffset), a constructor
> > > > without
> > > > > the original T/P/O values. I assumed that for backwards
> compatibility
> > > > these
> > > > > constructors would still be usable in new runtimes.
> > > > > I imagine that there are also tests in downstream projects which
> make
> > > use
> > > > > of these constructors, whenever a Transform, Predicate, or Task is
> > > tested
> > > > > without a corresponding Converter. My question was about what
> values
> > > are
> > > > > chosen for the original T/P/O methods when these constructors are
> > used
> > > > > after an upgrade to the latest connect-api.
> > > > >
> > > > > > There shouldn't be any difference in behavior here - the
> framework
> > > will
> > > > > add
> > > > > the original T/P/O metadata to the record after the entire
> > > transformation
> > > > > chain has been applied and just before sending the record to the
> task
> > > for
> > > > > processing. The KIP doesn't propose that transformations themselves
> > > > should
> > > > > also be able to retrieve original T/P/O information for a sink
> > record.
> > > > >
> > > > > The KIP includes this: "Note that while the record's offset can't
> be
> > > > > modified via the standard SinkRecord::newRecord methods that SMTs
> are
> > > > > expected to use, SinkRecord has public constructors that would
> allow
> > > SMTs
> > > > > to return records with modified offsets. This is why the proposed
> > > changes
> > > > > include a new SinkRecord::originalKafkaOffset method as well."
> > > > > In order to use the new or old SinkRecord constructors outside of
> the
> > > > > newRecord methods, SMTs will downcast the previous record and may
> > > access
> > > > > the original T/P/O methods. They may or may not forward this to the
> > > next
> > > > > SMT, and they may or may not use it in their own computation.
> > > > > Since this is acknowledged as a possible implementation, I was just
> > > > asking
> > > > > about when one SMT changes the original T/P/O, what should later
> SMTs
> > > and
> > > > > predicates see from the original T/P/O methods?
> > > > > If you inject the original T/P/O only before and after the chain,
> > SMTs
> > > > > after an SMT which changes the original T/P/O will see whatever the
> > > > earlier
> > > > > SMT emitted. Is this intentional, or should this be avoided?
> > > > > For existing SMTs use the SinkRecord constructor, either directly
> or
> > > via
> > > > > subclasses of ConnectRecord, they will drop the original T/P/O and
> > fall
> > > > > back to the logic from question (1).
> > > > >
> > > > > > The rejected alternative basically says that we can't do a
> > > > > deterministic mapping from virtual coordinates to physical
> > coordinates
> > > > > without doing a lot of book-keeping.
> > > > >
> > > > > I suppose there is a possible implementation of metadata
> book-keeping
> > > > which
> > > > > provides a reasonable system of virtual coordinates, it just ended
> up
> > > > > equivalent to hydrating intermediate topics to compute a consistent
> > > > record
> > > > > ordering. I wasn't convinced by calling it "book-keeping" since
> i've
> > > seen
> > > > > that phrase used to disregard much less complicated state
> management,
> > > and
> > > > > had to see exactly where that solution becomes unreasonable.
> > > > >
> > > > > Thanks,
> > > > > Greg
> > > > >
> > > > > On Sun, Mar 12, 2023 at 6:30 AM Yash Mayya <ya...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Greg,
> > > > > >
> > > > > > Thanks for the detailed review!
> > > > > >
> > > > > > > What is the expected state/behavior for SinkRecords
> > > > > > > which do not have original T/P/O information after the
> > > > > > > upgrade? Just browsing, it appears that tests make
> > > > > > > extensive use of the existing public SinkRecord
> > > > > > > constructors  for both Transformations and Connectors.
> > > > > >
> > > > > > I'm not sure I follow - are you asking about how the tests will
> be
> > > > > updated
> > > > > > post this change or about how upgrades will look like for
> clusters
> > in
> > > > > > production? For the latter, we won't have to worry about sink
> > records
> > > > > > without original T/P/O information at all once a cluster is fully
> > > > rolled
> > > > > > and we will make it (hopefully) abundantly clear that connectors
> > need
> > > > to
> > > > > > account for missing original T/P/O getter methods if they expect
> to
> > > be
> > > > > > deployed on older Connect runtimes.
> > > > > >
> > > > > > > What is the expected behavior for Transformation
> > > > > > > implementations which do not use the newRecord
> > > > > > > methods and instead use public SinkRecord constructors?
> > > > > > > The KIP mentions this as a justification for the
> > > > > > > originalKafkaOffset method, but if existing implementations
> > > > > > > are using the existing constructors, those constructors won't
> > > > > > > forward the original T/P/O information to later transforms or
> > > > > > > the task.
> > > > > >
> > > > > > There shouldn't be any difference in behavior here - the
> framework
> > > will
> > > > > add
> > > > > > the original T/P/O metadata to the record after the entire
> > > > transformation
> > > > > > chain has been applied and just before sending the record to the
> > task
> > > > for
> > > > > > processing. The KIP doesn't propose that transformations
> themselves
> > > > > should
> > > > > > also be able to retrieve original T/P/O information for a sink
> > > record.
> > > > > >
> > > > > > > This reasoning and the KIP design seems to imply that the
> > > > > > > connector is better equipped to solve this problem than the
> > > > > > > framework, but the stated reasons are not convincing for me.
> > > > > >
> > > > > > This was added to the KIP by the original author, but I don't
> think
> > > the
> > > > > > intention was to imply that the connector is better equipped to
> > solve
> > > > > this
> > > > > > problem than the framework. The intention is to provide complete
> > > > > > information to the connector ("physical" and "virtual
> coordinates"
> > > > > instead
> > > > > > of the currently incomplete "virtual coordinates" as you've
> termed
> > > it)
> > > > so
> > > > > > that connectors can use the virtual coordinates for writing data
> to
> > > the
> > > > > > sink system and physical coordinates for offset reporting back to
> > the
> > > > > > framework. The rejected alternative basically says that we can't
> > do a
> > > > > > deterministic mapping from virtual coordinates to physical
> > > coordinates
> > > > > > without doing a lot of book-keeping.
> > > > > >
> > > > > > I agree with the rest of your analysis on the tradeoffs between
> the
> > > > > > proposed approach versus the seemingly more attractive approach
> of
> > > > > handling
> > > > > > everything purely in the framework and only exposing "virtual
> > > > > coordinates"
> > > > > > to the connectors. I think the biggest thorn here is maintaining
> > > > backward
> > > > > > compatibility with the considerable ecosystem of existing
> > connectors
> > > > > which
> > > > > > is something Connect has always been burdened by.
> > > > > >
> > > > > > Thanks,
> > > > > > Yash
> > > > > >
> > > > > > On Wed, Mar 8, 2023 at 6:54 AM Greg Harris
> > > > <greg.harris@aiven.io.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Yash,
> > > > > > >
> > > > > > > I always use this issue as an example of a bug being caused by
> > > design
> > > > > > > rather than by implementation error, and once it's fixed I'll
> > need
> > > to
> > > > > > find
> > > > > > > something else to talk about :)
> > > > > > > So glad to see this get fixed!
> > > > > > >
> > > > > > > I'll chime in to support some of the earlier discussions that
> > seem
> > > to
> > > > > > have
> > > > > > > been resolved:
> > > > > > >
> > > > > > > 1. With respect to SinkRecord methods vs an overloaded put(): I
> > > agree
> > > > > > with
> > > > > > > the current design but I justify it a little bit differently
> than
> > > has
> > > > > > > already been discussed.
> > > > > > > If we were designing this interface on day 1 without backwards
> > > > > > > compatibility in mind, which design would make more sense? Or
> > for a
> > > > > > > different framing: In the future when old runtimes and
> connectors
> > > are
> > > > > > > retired and the old interfaces are removed, which design is
> going
> > > to
> > > > > look
> > > > > > > more strange and unmotivated?
> > > > > > > Applied to this design decision, I would say that the original
> > > T/P/O
> > > > > are
> > > > > > > properties of a single SinkRecord and make sense as getters,
> and
> > it
> > > > > would
> > > > > > > be strange to store them in an auxiliary map.
> > > > > > >
> > > > > > > 2. Following up this change with a compatibility library to
> make
> > > the
> > > > > > > interface easier to use is the right choice to make here. This
> > > change
> > > > > > > should be focused on correctness in allowing developers to fix
> > the
> > > > > > > incompatibility and we can be concerned with coming up with a
> > more
> > > > > > > ergonomic solution in the compatibility library.
> > > > > > > The API should be focused on generality, correctness, and
> > > performance
> > > > > > > because those cannot be worked-around after the fact. Connector
> > > > > > > implementations and/or libraries can be concerned with trading
> > off
> > > > some
> > > > > > > generality and/or performance for ease-of-use.
> > > > > > >
> > > > > > > 3. I think that the difference in behavior of the new
> open/close
> > > > > methods
> > > > > > as
> > > > > > > compared to the old methods is significant, and requires good
> > > > > > documentation
> > > > > > > to help connector developers avoid lazy and incorrect
> > migrations. I
> > > > am
> > > > > > > happy to have that addressed in code review after the KIP is
> > > > approved.
> > > > > > >
> > > > > > > I had some questions:
> > > > > > >
> > > > > > > 4. What is the expected state/behavior for SinkRecords which do
> > not
> > > > > have
> > > > > > > original T/P/O information after the upgrade? Just browsing, it
> > > > appears
> > > > > > > that tests make extensive use of the existing public SinkRecord
> > > > > > > constructors for both Transformations and Connectors.
> > > > > > >
> > > > > > > 5. What is the expected behavior for Transformation
> > implementations
> > > > > which
> > > > > > > do not use the newRecord methods and instead use public
> > SinkRecord
> > > > > > > constructors? The KIP mentions this as a justification for the
> > > > > > > originalKafkaOffset method, but if existing implementations are
> > > using
> > > > > the
> > > > > > > existing constructors, those constructors won't forward the
> > > original
> > > > > > T/P/O
> > > > > > > information to later transforms or the task.
> > > > > > >
> > > > > > > For the last few points, I want to discuss this rejected
> > > alternative:
> > > > > > >
> > > > > > > > Address the offsets problem entirely within the framework,
> > doing
> > > > some
> > > > > > > kind of mapping from the transformed topic back to the original
> > > > topic.
> > > > > > > > * This would only work in the cases where there’s no overlap
> > > > between
> > > > > > the
> > > > > > > transformed topic names, but would break for the rest of the
> > > > > > > transformations (e.g. static transformation, topic = “a”).
> > > > > > > > * Even if we wanted to limit the support to those cases, it
> > would
> > > > > > require
> > > > > > > considerable bookkeeping to add a validation to verify that the
> > > > > > > transformation chain adheres to that expectation (and fail fast
> > if
> > > it
> > > > > > > doesn’t).
> > > > > > >
> > > > > > > 6. This reasoning and the KIP design seems to imply that the
> > > > connector
> > > > > is
> > > > > > > better equipped to solve this problem than the framework, but
> the
> > > > > stated
> > > > > > > reasons are not convincing for me.
> > > > > > > * A static transformation still causes an offset collision in
> the
> > > > > > connector
> > > > > > > * The connector is not permitted to see the transformation
> chain
> > to
> > > > do
> > > > > > any
> > > > > > > fail-fast assertions
> > > > > > >
> > > > > > > Suppose we were to think of the records at the end of the
> > > > > transformation
> > > > > > > chain as being in "virtual partitions" with "virtual offsets".
> > > > > > > For example, with identity-routing SMTs, the virtual
> coordinates
> > > are
> > > > > > > exactly the same as the underlying physical coordinates. For
> 1-1
> > > > > renames,
> > > > > > > each virtual topic would be the renamed topic corresponding to
> > the
> > > > > > > underlying topic. For fan-out from one topic to multiple
> virtual
> > > > > topics,
> > > > > > > virtual offsets would use the underlying kafka offsets with
> gaps
> > > for
> > > > > > > records going to other virtual partitions. Virtual topics with
> > > > dropped
> > > > > > > records have similar gaps in the offsets.
> > > > > > > Currently, these virtual coordinates are passed into the
> > connector
> > > > via
> > > > > > > SinkTask::put, but SinkTask::open/close/preCommit and
> > > > > > > SinkTaskContext::assignment/offsets/pause/resume all use
> physical
> > > > > > > coordinates.
> > > > > > > This proposal patches put,open, and close to have both physical
> > and
> > > > > > virtual
> > > > > > > coordinates, but leaves the other methods with physical
> > > coordinates.
> > > > > > After
> > > > > > > this proposal, connectors would be intentionally made aware of
> > the
> > > > > > > distinction between physical and virtual coordinates, and
> manage
> > > > their
> > > > > > own
> > > > > > > bookkeeping for the two systems.
> > > > > > >
> > > > > > > To avoid that connector logic, we could use virtual coordinates
> > in
> > > > all
> > > > > > > connector calls, never revealing that they are different from
> the
> > > > > > physical
> > > > > > > coordinates. There's a whole design shopping list that we'd
> need:
> > > > > > > * Renumbering mechanism for disambiguating and making virtual
> > > offsets
> > > > > > > monotonic in the case of topic/partition collisions
> > > > > > > * Data structure and strategy for translating virtual offsets
> > back
> > > to
> > > > > > > physical offsets
> > > > > > > * New limits on SinkTaskContext::offsets() calls to prevent
> > > rewinding
> > > > > > > before the latest commit
> > > > > > > * Backwards compatibility and upgrade design
> > > > > > >
> > > > > > > 7. This alternative was very appealing to me, because the
> > strength
> > > > of a
> > > > > > > plugin framework is the composability of different components.
> > > Among
> > > > a
> > > > > > > collection of N connectors and M transforms, it should ideally
> > only
> > > > > take
> > > > > > > N + M work to understand how the components combine to build
> the
> > > > whole.
> > > > > > > However, once you start adding special cases to some plugins to
> > > > support
> > > > > > > interactions with others, the whole system can take N * M work
> to
> > > > > > > understand. From a complexity standpoint, it would be very good
> > for
> > > > the
> > > > > > > framework to solve this in a way which was connector-agnostic.
> > > > > > > The current design compromises the logical isolation of the
> > plugins
> > > > > > > slightly, but they can collapse offsets very
> memory-efficiently,
> > > and
> > > > > > re-use
> > > > > > > the existing raw coordinate functions and keep everything else
> > > > > backwards
> > > > > > > compatible. After deriving all of the above, I think that's a
> > > > > reasonable
> > > > > > > tradeoff to make.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Greg
> > > > > > >
> > > > > > > On Tue, Feb 21, 2023 at 10:17 AM Chris Egerton
> > > > <chrise@aiven.io.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Yash,
> > > > > > > >
> > > > > > > > We'll probably want to make a few tweaks to the Javadocs for
> > the
> > > > new
> > > > > > > > methods (I'm imagining that notes on compatibility with older
> > > > > versions
> > > > > > > will
> > > > > > > > be required), but I believe what's proposed in the KIP is
> good
> > > > enough
> > > > > > to
> > > > > > > > approve with the understanding that it may not exactly match
> > what
> > > > > gets
> > > > > > > > implemented/merged.
> > > > > > > >
> > > > > > > > LGTM, thanks again for the KIP!
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Tue, Feb 21, 2023 at 12:18 PM Yash Mayya <
> > > yash.mayya@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Chris,
> > > > > > > > >
> > > > > > > > > > we might try to introduce a framework-level configuration
> > > > > > > > > > property to dictate which of the pre-transform and
> > > > post-transform
> > > > > > > > > > topic partitions are used for the fallback call to the
> > > > single-arg
> > > > > > > > > > variant if a task class has not overridden the multi-arg
> > > > variant
> > > > > > > > >
> > > > > > > > > Thanks for the explanation and I agree that this will be a
> > tad
> > > > bit
> > > > > > too
> > > > > > > > > convoluted. :)
> > > > > > > > >
> > > > > > > > > Please do let me know if you'd like any further amendments
> to
> > > the
> > > > > > KIP!
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yash
> > > > > > > > >
> > > > > > > > > On Tue, Feb 21, 2023 at 8:42 PM Chris Egerton
> > > > > > <chrise@aiven.io.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Yash,
> > > > > > > > > >
> > > > > > > > > > I think the use case for pre-transform TPO coordinates
> (and
> > > > topic
> > > > > > > > > partition
> > > > > > > > > > writers created/destroyed in close/open) tends to boil
> down
> > > to
> > > > > > > > > exactly-once
> > > > > > > > > > semantics, where it's desirable to preserve the
> guarantees
> > > that
> > > > > > Kafka
> > > > > > > > > > provides (every record has a unique TPO trio, and records
> > are
> > > > > > ordered
> > > > > > > > by
> > > > > > > > > > offset within a topic partition).
> > > > > > > > > >
> > > > > > > > > > It's my understanding that this approach is utilized in
> > > several
> > > > > > > > > connectors
> > > > > > > > > > out there today, and it might break these connectors to
> > start
> > > > > using
> > > > > > > the
> > > > > > > > > > post-transform topic partitions automatically in their
> > > > open/close
> > > > > > > > > methods.
> > > > > > > > > >
> > > > > > > > > > If we want to get really fancy with this and try to
> obviate
> > > or
> > > > at
> > > > > > > least
> > > > > > > > > > reduce the need for per-connector code changes, we might
> > try
> > > to
> > > > > > > > > introduce a
> > > > > > > > > > framework-level configuration property to dictate which
> of
> > > the
> > > > > > > > > > pre-transform and post-transform topic partitions are
> used
> > > for
> > > > > the
> > > > > > > > > fallback
> > > > > > > > > > call to the single-arg variant if a task class has not
> > > > overridden
> > > > > > the
> > > > > > > > > > multi-arg variant. But I think this is going a bit too
> far
> > > and
> > > > > > would
> > > > > > > > > prefer
> > > > > > > > > > to keep things simple(r) for now.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Chris
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Sun, Feb 19, 2023 at 2:34 AM Yash Mayya <
> > > > yash.mayya@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Chris,
> > > > > > > > > > >
> > > > > > > > > > > > I was actually envisioning something like `void
> > > > > > > > > > > > open(Collection<TopicPartition> originalPartitions,
> > > > > > > > > > > > Collection<TopicPartition> transformedPartitions)`
> > > > > > > > > > >
> > > > > > > > > > > Ah okay, this does make a lot more sense. Sorry, I
> think
> > I
> > > > > > > > > misunderstood
> > > > > > > > > > > you earlier. I do agree with you that this seems better
> > > than
> > > > > > > > splitting
> > > > > > > > > it
> > > > > > > > > > > off into two new sets of open / close methods from a
> > > > complexity
> > > > > > > > > > standpoint.
> > > > > > > > > > >
> > > > > > > > > > > > Plus, if a connector is intentionally designed to use
> > > > > > > > > > > > pre-transformation topic partitions in its open/close
> > > > > > > > > > > > methods, wouldn't we just be trading one form of the
> > > > > > > > > > > >  problem for another by making this switch?
> > > > > > > > > > >
> > > > > > > > > > > On thinking about this a bit more, I'm not so convinced
> > > that
> > > > we
> > > > > > > need
> > > > > > > > to
> > > > > > > > > > > expose the pre-transform / original topic partitions in
> > the
> > > > new
> > > > > > > open
> > > > > > > > /
> > > > > > > > > > > close methods. The purpose of the open / close methods
> is
> > > to
> > > > > > allow
> > > > > > > > sink
> > > > > > > > > > > tasks to allocate and deallocate resources for each
> topic
> > > > > > partition
> > > > > > > > > > > assigned to the task and the purpose of topic-mutating
> > SMTs
> > > > is
> > > > > to
> > > > > > > > > > > essentially modify the source topic name from the point
> > of
> > > > view
> > > > > > of
> > > > > > > > the
> > > > > > > > > > sink
> > > > > > > > > > > connector. Why would a sink connector ever need to or
> > want
> > > to
> > > > > > > > allocate
> > > > > > > > > > > resources for pre-transform topic partitions? Is the
> > > argument
> > > > > > here
> > > > > > > > that
> > > > > > > > > > > since we'll be exposing both the pre-transform and
> > > > > post-transform
> > > > > > > > topic
> > > > > > > > > > > partitions per record, we should also expose the same
> > info
> > > > via
> > > > > > > open /
> > > > > > > > > > close
> > > > > > > > > > > and allow sink connector implementations to disregard
> > > > > > > topic-mutating
> > > > > > > > > SMTs
> > > > > > > > > > > completely if they wanted to?
> > > > > > > > > > >
> > > > > > > > > > > Either way, I've gone ahead and updated the KIP to
> > reflect
> > > > all
> > > > > of
> > > > > > > > > > > our previous discussion here since it had become quite
> > > > > outdated.
> > > > > > > I've
> > > > > > > > > > also
> > > > > > > > > > > updated the KIP title from "Sink Connectors: Support
> > > > > > topic-mutating
> > > > > > > > > SMTs
> > > > > > > > > > > for async connectors (preCommit users)" to "Allow sink
> > > > > connectors
> > > > > > > to
> > > > > > > > be
> > > > > > > > > > > used with topic-mutating SMTs" since the improvements
> to
> > > the
> > > > > > open /
> > > > > > > > > close
> > > > > > > > > > > mechanism doesn't pertain only to asynchronous sink
> > > > connectors.
> > > > > > The
> > > > > > > > new
> > > > > > > > > > KIP
> > > > > > > > > > > URL is:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-793%3A+Allow+sink+connectors+to+be+used+with+topic-mutating+SMTs
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yash
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Feb 14, 2023 at 11:39 PM Chris Egerton
> > > > > > > > <chrise@aiven.io.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > >
> > > > > > > > > > > > I was actually envisioning something like `void
> > > > > > > > > > > > open(Collection<TopicPartition>
> > > > > > > > > > > > originalPartitions, Collection<TopicPartition>
> > > > > > > > > transformedPartitions)`,
> > > > > > > > > > > > since we already convert and transform each batch of
> > > > records
> > > > > > that
> > > > > > > > we
> > > > > > > > > > poll
> > > > > > > > > > > > from the sink task's consumer en masse, meaning we
> > could
> > > > > > discover
> > > > > > > > > > several
> > > > > > > > > > > > new transformed partitions in between consecutive
> calls
> > > to
> > > > > > > > > > SinkTask::put.
> > > > > > > > > > > >
> > > > > > > > > > > > It's also worth noting that we'll probably want to
> > > > deprecate
> > > > > > the
> > > > > > > > > > existing
> > > > > > > > > > > > open/close methods, at which point keeping one
> > > > non-deprecated
> > > > > > > > variant
> > > > > > > > > > of
> > > > > > > > > > > > each seems more appealing and less complex than
> keeping
> > > > two.
> > > > > > > > > > > >
> > > > > > > > > > > > Honestly though, I think we're both on the same page
> > > enough
> > > > > > that
> > > > > > > I
> > > > > > > > > > > wouldn't
> > > > > > > > > > > > object to either approach. We've probably reached the
> > > > > > saturation
> > > > > > > > > point
> > > > > > > > > > > for
> > > > > > > > > > > > ROI here and as long as we provide developers a way
> to
> > > get
> > > > > the
> > > > > > > > > > > information
> > > > > > > > > > > > they need from the runtime and take care to add
> > Javadocs
> > > > and
> > > > > > > update
> > > > > > > > > our
> > > > > > > > > > > > docs page (possibly including the connector
> development
> > > > > > > > quickstart),
> > > > > > > > > it
> > > > > > > > > > > > should be fine.
> > > > > > > > > > > >
> > > > > > > > > > > > At this point, it might be worth updating the KIP
> based
> > > on
> > > > > > recent
> > > > > > > > > > > > discussion so that others can see the latest
> proposal,
> > > and
> > > > we
> > > > > > can
> > > > > > > > > both
> > > > > > > > > > > take
> > > > > > > > > > > > a look and make sure everything looks good enough
> > before
> > > > > > opening
> > > > > > > a
> > > > > > > > > vote
> > > > > > > > > > > > thread.
> > > > > > > > > > > >
> > > > > > > > > > > > Finally, I think you make a convincing case for a
> > > > time-based
> > > > > > > > eviction
> > > > > > > > > > > > policy. I wasn't thinking about the fairly common SMT
> > > > pattern
> > > > > > of
> > > > > > > > > > > deriving a
> > > > > > > > > > > > topic name from, e.g., a record field or header.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Chris
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Feb 14, 2023 at 11:42 AM Yash Mayya <
> > > > > > > yash.mayya@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Plus, if a connector is intentionally designed to
> > > > > > > > > > > > > > use pre-transformation topic partitions in its
> > > > > > > > > > > > > > open/close methods, wouldn't we just be trading
> > > > > > > > > > > > > > one form of the problem for another by making
> this
> > > > > > > > > > > > > > switch?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks, this makes sense, and given that the KIP
> > > already
> > > > > > > > proposes a
> > > > > > > > > > way
> > > > > > > > > > > > for
> > > > > > > > > > > > > sink connector implementations to distinguish
> between
> > > > > > > > pre-transform
> > > > > > > > > > and
> > > > > > > > > > > > > post-transform topics per record, I think I'm
> > convinced
> > > > > that
> > > > > > > > going
> > > > > > > > > > with
> > > > > > > > > > > > new
> > > > > > > > > > > > > `open()` / `close()` methods is the right approach.
> > > > > However,
> > > > > > I
> > > > > > > > > still
> > > > > > > > > > > feel
> > > > > > > > > > > > > like having overloaded methods will make it a lot
> > less
> > > > > > > > unintuitive
> > > > > > > > > > > given
> > > > > > > > > > > > > that the two sets of methods would be different in
> > > terms
> > > > of
> > > > > > > when
> > > > > > > > > > > they're
> > > > > > > > > > > > > called and what arguments they are passed (also I'm
> > > > > presuming
> > > > > > > > that
> > > > > > > > > > the
> > > > > > > > > > > > > overloaded methods you're prescribing will only
> have
> > a
> > > > > single
> > > > > > > > > > > > > `TopicPartition` rather than a
> > > > `Collection<TopicPartition>`
> > > > > > as
> > > > > > > > > their
> > > > > > > > > > > > > parameters). I guess my concern is largely around
> the
> > > > fact
> > > > > > that
> > > > > > > > it
> > > > > > > > > > > won't
> > > > > > > > > > > > be
> > > > > > > > > > > > > possible to distinguish between the overloaded
> > methods'
> > > > use
> > > > > > > cases
> > > > > > > > > > just
> > > > > > > > > > > > from
> > > > > > > > > > > > > the method signatures. I agree that naming is going
> > to
> > > be
> > > > > > > > difficult
> > > > > > > > > > > here,
> > > > > > > > > > > > > but I think that having two sets of
> > > `SinkTask::openXyz` /
> > > > > > > > > > > > > `SinkTask::closeXyz` methods will be less
> complicated
> > > to
> > > > > > > > understand
> > > > > > > > > > > from
> > > > > > > > > > > > a
> > > > > > > > > > > > > connector developer perspective (as compared to
> > > > overloaded
> > > > > > > > methods
> > > > > > > > > > with
> > > > > > > > > > > > > only differing documentation). Of your suggested
> > > > options, I
> > > > > > > think
> > > > > > > > > > > > > `openPreTransform` / `openPostTransform` are the
> most
> > > > > > > > > comprehensible
> > > > > > > > > > > > ones.
> > > > > > > > > > > > >
> > > > > > > > > > > > > > BTW, I wouldn't say that we can't make
> assumptions
> > > > > > > > > > > > > > about the relationships between pre- and
> > > > > > post-transformation
> > > > > > > > > > > > > >  topic partitions.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I meant that the framework wouldn't be able to
> > > > > > > deterministically
> > > > > > > > > know
> > > > > > > > > > > > when
> > > > > > > > > > > > > to close a post-transform topic partition given
> that
> > > SMTs
> > > > > > could
> > > > > > > > use
> > > > > > > > > > > > > per-record data / metadata to manipulate the topic
> > > names
> > > > as
> > > > > > and
> > > > > > > > how
> > > > > > > > > > > > > required (which supports the suggestion to use an
> > > > eviction
> > > > > > > policy
> > > > > > > > > > based
> > > > > > > > > > > > > mechanism to call SinkTask::close for
> post-transform
> > > > topic
> > > > > > > > > > partitions).
> > > > > > > > > > > > >
> > > > > > > > > > > > > > We might utilize a policy that assumes a
> > > deterministic
> > > > > > > > > > > > > > mapping from the former to the latter, for
> example.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Wouldn't this be making the assumption that SMTs
> only
> > > use
> > > > > the
> > > > > > > > topic
> > > > > > > > > > > name
> > > > > > > > > > > > > itself and no other data / metadata while computing
> > the
> > > > new
> > > > > > > topic
> > > > > > > > > > name?
> > > > > > > > > > > > Are
> > > > > > > > > > > > > you suggesting that since this assumption could
> work
> > > for
> > > > a
> > > > > > > > majority
> > > > > > > > > > of
> > > > > > > > > > > > > SMTs, it might be more efficient overall in terms
> of
> > > > > reducing
> > > > > > > the
> > > > > > > > > > > number
> > > > > > > > > > > > of
> > > > > > > > > > > > > "false-positive" calls to
> > > `SinkTask::closePostTransform`
> > > > > (and
> > > > > > > > we'll
> > > > > > > > > > > also
> > > > > > > > > > > > be
> > > > > > > > > > > > > able to call `SinkTask::closePostTransform`
> > immediately
> > > > > after
> > > > > > > > topic
> > > > > > > > > > > > > partitions are revoked from the consumer)? I was
> > > thinking
> > > > > > > > something
> > > > > > > > > > > more
> > > > > > > > > > > > > generic along the lines of a simple time based
> > eviction
> > > > > > policy
> > > > > > > > that
> > > > > > > > > > > > > wouldn't be making any assumptions regarding the
> SMT
> > > > > > > > > implementations.
> > > > > > > > > > > > > Either way, I do like your earlier suggestion of
> > > keeping
> > > > > this
> > > > > > > > logic
> > > > > > > > > > > > > internal and not painting ourselves into a corner
> by
> > > > > > promising
> > > > > > > > any
> > > > > > > > > > > > > particular behavior in the KIP.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yash
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Feb 14, 2023 at 1:08 AM Chris Egerton
> > > > > > > > > > <chrise@aiven.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I think the key difference between adding
> > > > > methods/overloads
> > > > > > > > > related
> > > > > > > > > > > to
> > > > > > > > > > > > > > SinkTask::open/SinkTask::close and SinkTask::put
> is
> > > > that
> > > > > > this
> > > > > > > > > isn't
> > > > > > > > > > > > > > auxiliary information that may or may not be
> useful
> > > to
> > > > > > > > connector
> > > > > > > > > > > > > > developers. It's actually critical for them to
> > > > understand
> > > > > > the
> > > > > > > > > > > > difference
> > > > > > > > > > > > > > between the two concepts here, even if they look
> > very
> > > > > > > similar.
> > > > > > > > > And
> > > > > > > > > > > > yes, I
> > > > > > > > > > > > > > do believe that switching from pre-transform to
> > > > > > > post-transform
> > > > > > > > > > topic
> > > > > > > > > > > > > > partitions is too big a change in behavior here.
> > > Plus,
> > > > > if a
> > > > > > > > > > connector
> > > > > > > > > > > > is
> > > > > > > > > > > > > > intentionally designed to use pre-transformation
> > > topic
> > > > > > > > partitions
> > > > > > > > > > in
> > > > > > > > > > > > its
> > > > > > > > > > > > > > open/close methods, wouldn't we just be trading
> one
> > > > form
> > > > > of
> > > > > > > the
> > > > > > > > > > > problem
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > another by making this switch?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > One possible alternative to overloading the
> > existing
> > > > > > methods
> > > > > > > is
> > > > > > > > > to
> > > > > > > > > > > > split
> > > > > > > > > > > > > > SinkTask::open into openOriginal (or possibly
> > > > > openPhysical
> > > > > > or
> > > > > > > > > > > > > > openPreTransform) and openTransformed (or
> > openLogical
> > > > or
> > > > > > > > > > > > > > openPostTransform), with a similar change for
> > > > > > > SinkTask::close.
> > > > > > > > > The
> > > > > > > > > > > > > default
> > > > > > > > > > > > > > implementation for SinkTask::openOriginal can be
> to
> > > > call
> > > > > > > > > > > > SinkTask::open,
> > > > > > > > > > > > > > and the same can go for SinkTask::close.
> However, I
> > > > > prefer
> > > > > > > > > > > overloading
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > existing methods since this alternative increases
> > > > > > complexity
> > > > > > > > and
> > > > > > > > > > none
> > > > > > > > > > > > of
> > > > > > > > > > > > > > the names are very informative.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > BTW, I wouldn't say that we can't make
> assumptions
> > > > about
> > > > > > the
> > > > > > > > > > > > > relationships
> > > > > > > > > > > > > > between pre- and post-transformation topic
> > > partitions.
> > > > We
> > > > > > > might
> > > > > > > > > > > > utilize a
> > > > > > > > > > > > > > policy that assumes a deterministic mapping from
> > the
> > > > > former
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > > latter,
> > > > > > > > > > > > > > for example. The distinction I'd draw is that the
> > > > > > assumptions
> > > > > > > > we
> > > > > > > > > > make
> > > > > > > > > > > > can
> > > > > > > > > > > > > > and probably should favor some cases in terms of
> > > > > > performance
> > > > > > > > > (i.e.,
> > > > > > > > > > > > > > reducing the number of unnecessary calls to
> > > close/open
> > > > > > over a
> > > > > > > > > given
> > > > > > > > > > > > sink
> > > > > > > > > > > > > > task's lifetime), but should not lead to
> guaranteed
> > > > > > resource
> > > > > > > > > leaks
> > > > > > > > > > or
> > > > > > > > > > > > > > failure to obey API contract in any cases.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Mon, Feb 13, 2023 at 10:54 AM Yash Mayya <
> > > > > > > > > yash.mayya@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > especially if connectors are intentionally
> > > designed
> > > > > > > around
> > > > > > > > > > > > > > > > original topic partitions instead of
> > transformed
> > > > > ones.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Ha, that's a good point and reminds me of
> Hyrum's
> > > Law
> > > > > [1]
> > > > > > > :)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I think we have to provide connector
> developers
> > > > with
> > > > > > some
> > > > > > > > > > > > > > > > way to differentiate between the two, but
> maybe
> > > > > > there's a
> > > > > > > > way
> > > > > > > > > > > > > > > >  to do this that I haven't thought of yet
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I can't think of a better way to do this
> either;
> > > > would
> > > > > > > > invoking
> > > > > > > > > > the
> > > > > > > > > > > > > > > existing `SinkTask::open` and `SinkTask::close`
> > > > methods
> > > > > > > with
> > > > > > > > > > > > > > post-transform
> > > > > > > > > > > > > > > topic partitions instead of pre-transform topic
> > > > > > partitions
> > > > > > > > not
> > > > > > > > > be
> > > > > > > > > > > > > > > acceptable even in a minor / major AK release?
> I
> > > feel
> > > > > > like
> > > > > > > > the
> > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > approach of adding overloaded `SinkTask::open`
> /
> > > > > > > > > > `SinkTask::close`
> > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > to differentiate between pre-transform and
> > > > > post-transform
> > > > > > > > topic
> > > > > > > > > > > > > > partitions
> > > > > > > > > > > > > > > has similar pitfalls to the idea of the
> > overloaded
> > > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > > method
> > > > > > > > > > > > > > > we discarded earlier.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Either way, I'm glad that the general idea
> of a
> > > > cache
> > > > > > and
> > > > > > > > > > > > > > > > eviction policy for SinkTask::close seem
> > > > reasonable;
> > > > > if
> > > > > > > > > > > > > > > > we decide to go this route, it might make
> sense
> > > for
> > > > > the
> > > > > > > KIP
> > > > > > > > > > > > > > > > to include an outline of one or more
> high-level
> > > > > > > strategies
> > > > > > > > > > > > > > > > we might take, but without promising any
> > > particular
> > > > > > > > behavior
> > > > > > > > > > > > > > > > beyond occasionally calling SinkTask::close
> for
> > > > > > > > > post-transform
> > > > > > > > > > > > > > > > topic partitions. I'm hoping that this logic
> > can
> > > > stay
> > > > > > > > > internal,
> > > > > > > > > > > > > > > > and by notpainting ourselves into a corner
> with
> > > the
> > > > > > KIP,
> > > > > > > we
> > > > > > > > > > > > > > > > give ourselves leeway to tweak it in the
> future
> > > if
> > > > > > > > necessary
> > > > > > > > > > > > > > > > without filing another KIP or introducing a
> > > > pluggable
> > > > > > > > > > interface.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks, that's a good idea. Given the
> flexibility
> > > of
> > > > > > SMTs,
> > > > > > > > the
> > > > > > > > > > > > > framework
> > > > > > > > > > > > > > > can't really make any assumptions around topic
> > > > > partitions
> > > > > > > > post
> > > > > > > > > > > > > > > transformation nor does it have any way to
> > > > definitively
> > > > > > get
> > > > > > > > any
> > > > > > > > > > > such
> > > > > > > > > > > > > > > information from transformations which is why
> the
> > > > idea
> > > > > > of a
> > > > > > > > > cache
> > > > > > > > > > > > with
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > > eviction policy makes perfect sense!
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > [1] - https://www.hyrumslaw.com/
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 9:38 PM Chris Egerton
> > > > > > > > > > > <chrise@aiven.io.invalid
> > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > So it looks like with the current state of
> > > > affairs,
> > > > > > > sink
> > > > > > > > > > tasks
> > > > > > > > > > > > that
> > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > instantiate writers in the SinkTask::open
> > method
> > > > (and
> > > > > > > don't
> > > > > > > > > do
> > > > > > > > > > > the
> > > > > > > > > > > > > lazy
> > > > > > > > > > > > > > > > instantiation in SinkTask::put that you
> > > mentioned)
> > > > > > might
> > > > > > > > fail
> > > > > > > > > > > when
> > > > > > > > > > > > > used
> > > > > > > > > > > > > > > > with topic/partition mutating SMTs even if
> they
> > > > don't
> > > > > > do
> > > > > > > > any
> > > > > > > > > > > > > > asynchronous
> > > > > > > > > > > > > > > > processing?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Yep, exactly 👍
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > What do you think about retaining just the
> > > > existing
> > > > > > > > methods
> > > > > > > > > > > > > > > > but changing when they're called in the
> Connect
> > > > > > runtime?
> > > > > > > > For
> > > > > > > > > > > > > instance,
> > > > > > > > > > > > > > > > instead of calling SinkTask::open after
> > partition
> > > > > > > > assignment
> > > > > > > > > > > post a
> > > > > > > > > > > > > > > > consumer group rebalance, we could cache the
> > > > > currently
> > > > > > > > "seen"
> > > > > > > > > > > topic
> > > > > > > > > > > > > > > > partitions (post transformation) and before
> > each
> > > > call
> > > > > > to
> > > > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > > > > > check whether there's any new "unseen" topic
> > > > > > partitions,
> > > > > > > > and
> > > > > > > > > if
> > > > > > > > > > > so
> > > > > > > > > > > > > call
> > > > > > > > > > > > > > > > SinkTask::open (and also update the cache of
> > > > course).
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > IMO the issue here is that it's a drastic
> > change
> > > in
> > > > > > > > behavior
> > > > > > > > > to
> > > > > > > > > > > > start
> > > > > > > > > > > > > > > > invoking SinkTask::open and SinkTask::close
> > with
> > > > > > > > > post-transform
> > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > partitions instead of pre-transform,
> especially
> > > if
> > > > > > > > connectors
> > > > > > > > > > are
> > > > > > > > > > > > > > > > intentionally designed around original topic
> > > > > partitions
> > > > > > > > > instead
> > > > > > > > > > > of
> > > > > > > > > > > > > > > > transformed ones. I think we have to provide
> > > > > connector
> > > > > > > > > > developers
> > > > > > > > > > > > > with
> > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > way to differentiate between the two, but
> maybe
> > > > > > there's a
> > > > > > > > way
> > > > > > > > > > to
> > > > > > > > > > > do
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > that I haven't thought of yet. Interested to
> > hear
> > > > > your
> > > > > > > > > > thoughts.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Either way, I'm glad that the general idea
> of a
> > > > cache
> > > > > > and
> > > > > > > > > > > eviction
> > > > > > > > > > > > > > policy
> > > > > > > > > > > > > > > > for SinkTask::close seem reasonable; if we
> > decide
> > > > to
> > > > > go
> > > > > > > > this
> > > > > > > > > > > route,
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > might make sense for the KIP to include an
> > > outline
> > > > of
> > > > > > one
> > > > > > > > or
> > > > > > > > > > more
> > > > > > > > > > > > > > > > high-level strategies we might take, but
> > without
> > > > > > > promising
> > > > > > > > > any
> > > > > > > > > > > > > > particular
> > > > > > > > > > > > > > > > behavior beyond occasionally calling
> > > > SinkTask::close
> > > > > > for
> > > > > > > > > > > > > post-transform
> > > > > > > > > > > > > > > > topic partitions. I'm hoping that this logic
> > can
> > > > stay
> > > > > > > > > internal,
> > > > > > > > > > > and
> > > > > > > > > > > > > by
> > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > painting ourselves into a corner with the
> KIP,
> > we
> > > > > give
> > > > > > > > > > ourselves
> > > > > > > > > > > > > leeway
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > tweak it in the future if necessary without
> > > filing
> > > > > > > another
> > > > > > > > > KIP
> > > > > > > > > > or
> > > > > > > > > > > > > > > > introducing a pluggable interface.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Thu, Feb 9, 2023 at 7:39 AM Yash Mayya <
> > > > > > > > > > yash.mayya@gmail.com>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for the feedback.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 1) That's a fair point; while I did scan
> > > > everything
> > > > > > > > > publicly
> > > > > > > > > > > > > > available
> > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > GitHub, you're right in that it won't cover
> > all
> > > > > > > possible
> > > > > > > > > SMTs
> > > > > > > > > > > > that
> > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > out
> > > > > > > > > > > > > > > > > there. Thanks for the example use-case as
> > well,
> > > > > I've
> > > > > > > > > updated
> > > > > > > > > > > the
> > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > add
> > > > > > > > > > > > > > > > > the two new proposed methods.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 2) So it looks like with the current state
> of
> > > > > > affairs,
> > > > > > > > sink
> > > > > > > > > > > tasks
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > > instantiate writers in the SinkTask::open
> > > method
> > > > > (and
> > > > > > > > don't
> > > > > > > > > > do
> > > > > > > > > > > > the
> > > > > > > > > > > > > > lazy
> > > > > > > > > > > > > > > > > instantiation in SinkTask::put that you
> > > > mentioned)
> > > > > > > might
> > > > > > > > > fail
> > > > > > > > > > > > when
> > > > > > > > > > > > > > used
> > > > > > > > > > > > > > > > > with topic/partition mutating SMTs even if
> > they
> > > > > don't
> > > > > > > do
> > > > > > > > > any
> > > > > > > > > > > > > > > asynchronous
> > > > > > > > > > > > > > > > > processing? Since they could encounter
> > records
> > > in
> > > > > > > > > > SinkTask::put
> > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > topics/partitions that they might not have
> > > > created
> > > > > > > > writers
> > > > > > > > > > for.
> > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > pointing this out, it's definitely another
> > > > > > > > incompatibility
> > > > > > > > > > that
> > > > > > > > > > > > > needs
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > called out and fixed. The overloaded method
> > > > > approach
> > > > > > is
> > > > > > > > > > > > > interesting,
> > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > comes with the caveat of yet more new
> methods
> > > > that
> > > > > > will
> > > > > > > > > need
> > > > > > > > > > to
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > implemented by existing connectors if they
> > want
> > > > to
> > > > > > make
> > > > > > > > use
> > > > > > > > > > of
> > > > > > > > > > > > this
> > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > functionality. What do you think about
> > > retaining
> > > > > just
> > > > > > > the
> > > > > > > > > > > > existing
> > > > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > > but changing when they're called in the
> > Connect
> > > > > > > runtime?
> > > > > > > > > For
> > > > > > > > > > > > > > instance,
> > > > > > > > > > > > > > > > > instead of calling SinkTask::open after
> > > partition
> > > > > > > > > assignment
> > > > > > > > > > > > post a
> > > > > > > > > > > > > > > > > consumer group rebalance, we could cache
> the
> > > > > > currently
> > > > > > > > > "seen"
> > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > partitions (post transformation) and before
> > > each
> > > > > call
> > > > > > > to
> > > > > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > > > > > > check whether there's any new "unseen"
> topic
> > > > > > > partitions,
> > > > > > > > > and
> > > > > > > > > > if
> > > > > > > > > > > > so
> > > > > > > > > > > > > > call
> > > > > > > > > > > > > > > > > SinkTask::open (and also update the cache
> of
> > > > > > course). I
> > > > > > > > > don't
> > > > > > > > > > > > think
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > would break the existing contract with sink
> > > tasks
> > > > > > where
> > > > > > > > > > > > > > SinkTask::open
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > expected to be called for a topic partition
> > > > before
> > > > > > any
> > > > > > > > > > records
> > > > > > > > > > > > from
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > topic partition are sent via SinkTask::put?
> > The
> > > > > > > > > > SinkTask::close
> > > > > > > > > > > > > case
> > > > > > > > > > > > > > > is a
> > > > > > > > > > > > > > > > > lot trickier however, and would require
> some
> > > sort
> > > > > of
> > > > > > > > cache
> > > > > > > > > > > > eviction
> > > > > > > > > > > > > > > > policy
> > > > > > > > > > > > > > > > > that would be deemed appropriate as you
> > pointed
> > > > out
> > > > > > > too.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Mon, Feb 6, 2023 at 11:27 PM Chris
> Egerton
> > > > > > > > > > > > > > <chrise@aiven.io.invalid
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I've had some time to think on this KIP
> > and I
> > > > > think
> > > > > > > I'm
> > > > > > > > > in
> > > > > > > > > > > > > > agreement
> > > > > > > > > > > > > > > > > about
> > > > > > > > > > > > > > > > > > not blocking it on an official
> > compatibility
> > > > > > library
> > > > > > > or
> > > > > > > > > > > adding
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > "ack"
> > > > > > > > > > > > > > > > > > API for sink records.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I only have two more thoughts:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 1. Because it is possible to manipulate
> > sink
> > > > > record
> > > > > > > > > > > partitions
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > offsets
> > > > > > > > > > > > > > > > > > with the current API we provide for
> > > > > > transformations,
> > > > > > > I
> > > > > > > > > > still
> > > > > > > > > > > > > > believe
> > > > > > > > > > > > > > > > > > methods should be added to the SinkRecord
> > > class
> > > > > to
> > > > > > > > expose
> > > > > > > > > > the
> > > > > > > > > > > > > > > original
> > > > > > > > > > > > > > > > > > partition and offset, not just the
> original
> > > > > topic.
> > > > > > > The
> > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > > cognitive
> > > > > > > > > > > > > > > > > > burden from these two methods is going to
> > be
> > > > > > minimal
> > > > > > > > > > anyways;
> > > > > > > > > > > > > once
> > > > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > > > > understand the difference between the
> > > > transformed
> > > > > > > topic
> > > > > > > > > > name
> > > > > > > > > > > > and
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > original one, it's going to be trivial
> for
> > > them
> > > > > to
> > > > > > > > > > understand
> > > > > > > > > > > > how
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > same
> > > > > > > > > > > > > > > > > > difference applies for partitions and
> > > offsets.
> > > > > It's
> > > > > > > not
> > > > > > > > > > > enough
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > scan
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > set of SMTs provided out of the box with
> > > > Connect,
> > > > > > > ones
> > > > > > > > > > > > developed
> > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > Confluent, or even everything available
> on
> > > > > GitHub,
> > > > > > > > since
> > > > > > > > > > > there
> > > > > > > > > > > > > may
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > closed-source projects out there that
> rely
> > on
> > > > > this
> > > > > > > > > ability.
> > > > > > > > > > > One
> > > > > > > > > > > > > > > > potential
> > > > > > > > > > > > > > > > > > use case could be re-routing partitions
> > > between
> > > > > > Kafka
> > > > > > > > and
> > > > > > > > > > > some
> > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > sharded system.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 2. We still have to address the
> > > SinkTask::open
> > > > > [1]
> > > > > > > and
> > > > > > > > > > > > > > > SinkTask::close
> > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > > > methods. If a connector writes to the
> > > external
> > > > > > system
> > > > > > > > > using
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > transformed
> > > > > > > > > > > > > > > > > > topic partitions it reads from Kafka,
> then
> > > it's
> > > > > > > > possible
> > > > > > > > > > for
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > to lazily instantiate writers for topic
> > > > > partitions
> > > > > > as
> > > > > > > > it
> > > > > > > > > > > > > encounters
> > > > > > > > > > > > > > > > them
> > > > > > > > > > > > > > > > > > from records provided in SinkTask::put.
> > > > However,
> > > > > > > > > connectors
> > > > > > > > > > > > also
> > > > > > > > > > > > > > > need a
> > > > > > > > > > > > > > > > > way
> > > > > > > > > > > > > > > > > > to de-allocate those writers (and the
> > > resources
> > > > > > used
> > > > > > > by
> > > > > > > > > > them)
> > > > > > > > > > > > > over
> > > > > > > > > > > > > > > > time,
> > > > > > > > > > > > > > > > > > which they can't do as easily. One
> possible
> > > > > > approach
> > > > > > > > here
> > > > > > > > > > is
> > > > > > > > > > > to
> > > > > > > > > > > > > > > > overload
> > > > > > > > > > > > > > > > > > SinkTask::open and SinkTask::close with
> > > > variants
> > > > > > that
> > > > > > > > > > > > distinguish
> > > > > > > > > > > > > > > > between
> > > > > > > > > > > > > > > > > > transformed and original topic
> partitions,
> > > and
> > > > > > > default
> > > > > > > > to
> > > > > > > > > > > > > invoking
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > existing methods with just the original
> > topic
> > > > > > > > partitions.
> > > > > > > > > > We
> > > > > > > > > > > > > would
> > > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > > > have several options for how the Connect
> > > > runtime
> > > > > > can
> > > > > > > > > invoke
> > > > > > > > > > > > these
> > > > > > > > > > > > > > > > > methods,
> > > > > > > > > > > > > > > > > > but in general, an approach that
> guarantees
> > > > that
> > > > > > > tasks
> > > > > > > > > are
> > > > > > > > > > > > > notified
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > transformed topic partitions in
> > > SinkTask::open
> > > > > > before
> > > > > > > > any
> > > > > > > > > > > > records
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > partition are given to it in
> SinkTask::put,
> > > and
> > > > > > > makes a
> > > > > > > > > > > > > best-effort
> > > > > > > > > > > > > > > > > attempt
> > > > > > > > > > > > > > > > > > to close transformed topic partitions
> that
> > > > appear
> > > > > > to
> > > > > > > no
> > > > > > > > > > > longer
> > > > > > > > > > > > be
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > > based on some eviction policy, would
> > probably
> > > > be
> > > > > > > > > > sufficient.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > [1] -
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection)
> > > > > > > > > > > > > > > > > > [2] -
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://kafka.apache.org/33/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Sat, Nov 5, 2022 at 5:46 AM Yash
> Mayya <
> > > > > > > > > > > > yash.mayya@gmail.com>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks a lot for your inputs!
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > would provide a simple, clean
> interface
> > > for
> > > > > > > > > developers
> > > > > > > > > > to
> > > > > > > > > > > > > > > determine
> > > > > > > > > > > > > > > > > > > > which features are supported by the
> > > version
> > > > > of
> > > > > > > the
> > > > > > > > > > > Connect
> > > > > > > > > > > > > > > runtime
> > > > > > > > > > > > > > > > > > > > that their plugin has been deployed
> > onto
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I do like the idea of having such a
> > public
> > > > > > > > > compatibility
> > > > > > > > > > > > > library
> > > > > > > > > > > > > > -
> > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > > it would remove a lot of restrictions
> > from
> > > > > > > framework
> > > > > > > > > > > > > development
> > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > were
> > > > > > > > > > > > > > > > > > > to be widely adopted.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > we might consider adding an API to
> > "ack"
> > > > sink
> > > > > > > > records
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I agree that this does seem like a more
> > > > > intuitive
> > > > > > > and
> > > > > > > > > > clean
> > > > > > > > > > > > > API,
> > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > I'm
> > > > > > > > > > > > > > > > > > > concerned about the backward
> > compatibility
> > > > > > headache
> > > > > > > > > we'd
> > > > > > > > > > be
> > > > > > > > > > > > > > > imposing
> > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > > existing sink connectors. Connector
> > > > developers
> > > > > > will
> > > > > > > > > have
> > > > > > > > > > to
> > > > > > > > > > > > > > > maintain
> > > > > > > > > > > > > > > > > two
> > > > > > > > > > > > > > > > > > > separate ways of doing offset
> management
> > if
> > > > > they
> > > > > > > want
> > > > > > > > > to
> > > > > > > > > > > use
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > but continue supporting older versions
> of
> > > > Kafka
> > > > > > > > > Connect.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > For now, I've reverted the KIP to the
> > > > previous
> > > > > > > > > iteration
> > > > > > > > > > > > which
> > > > > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > addition of a new `SinkRecord` method
> to
> > > > obtain
> > > > > > the
> > > > > > > > > > > original
> > > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > > > pre-transformation. One thing to note
> is
> > > that
> > > > > > I've
> > > > > > > > > > removed
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > obtaining the original Kafka partition
> > > after
> > > > a
> > > > > > > > cursory
> > > > > > > > > > > search
> > > > > > > > > > > > > > > showed
> > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > use cases for partition modifying SMTs
> > are
> > > > > > > primarily
> > > > > > > > on
> > > > > > > > > > the
> > > > > > > > > > > > > > source
> > > > > > > > > > > > > > > > > > > connector side.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 9:22 PM Chris
> > > Egerton
> > > > > > > > > > > > > > > <chrise@aiven.io.invalid
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > I have more comments I'd like to make
> > on
> > > > this
> > > > > > KIP
> > > > > > > > > when
> > > > > > > > > > I
> > > > > > > > > > > > have
> > > > > > > > > > > > > > > time
> > > > > > > > > > > > > > > > > > (sorry
> > > > > > > > > > > > > > > > > > > > for the delay, Yash, and thanks for
> > your
> > > > > > > > patience!),
> > > > > > > > > > but
> > > > > > > > > > > I
> > > > > > > > > > > > > did
> > > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > chime in and say that I'm also not
> sure
> > > > about
> > > > > > > > > > overloading
> > > > > > > > > > > > > > > > > > SinkTask::put.
> > > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > share the concerns about creating an
> > > > > intuitive,
> > > > > > > > > simple
> > > > > > > > > > > API
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > > > > > raised. In addition, this approach
> > > doesn't
> > > > > seem
> > > > > > > > very
> > > > > > > > > > > > > > > > > sustainable--what
> > > > > > > > > > > > > > > > > > do
> > > > > > > > > > > > > > > > > > > > we do if we encounter another case in
> > the
> > > > > > future
> > > > > > > > that
> > > > > > > > > > > would
> > > > > > > > > > > > > > > > warrant a
> > > > > > > > > > > > > > > > > > > > similar solution? We probably don't
> > want
> > > to
> > > > > > > create
> > > > > > > > > > three,
> > > > > > > > > > > > > four,
> > > > > > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > > > > > > overloaded variants of the method,
> each
> > > of
> > > > > > which
> > > > > > > > > would
> > > > > > > > > > > have
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > implemented by connector developers
> who
> > > > want
> > > > > to
> > > > > > > > both
> > > > > > > > > > > > leverage
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > latest
> > > > > > > > > > > > > > > > > > > > and greatest connector APIs and
> > maintain
> > > > > > > > > compatibility
> > > > > > > > > > > with
> > > > > > > > > > > > > > > connect
> > > > > > > > > > > > > > > > > > > > Clusters running older versions.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > I haven't been able to flesh this out
> > > into
> > > > a
> > > > > > > design
> > > > > > > > > > worth
> > > > > > > > > > > > > > > > publishing
> > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > its
> > > > > > > > > > > > > > > > > > > > own KIP yet, but one alternative I've
> > > > pitched
> > > > > > to
> > > > > > > a
> > > > > > > > > few
> > > > > > > > > > > > people
> > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > generally positive interest has been
> to
> > > > > develop
> > > > > > > an
> > > > > > > > > > > official
> > > > > > > > > > > > > > > > > > compatibility
> > > > > > > > > > > > > > > > > > > > library for Connect developers. This
> > > > library
> > > > > > > would
> > > > > > > > be
> > > > > > > > > > > > > released
> > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > its
> > > > > > > > > > > > > > > > > > own
> > > > > > > > > > > > > > > > > > > > Maven artifact (separate from
> > > connect-api,
> > > > > > > > > > > connect-runtime,
> > > > > > > > > > > > > > etc.)
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > provide a simple, clean interface for
> > > > > > developers
> > > > > > > to
> > > > > > > > > > > > determine
> > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > > > > features are supported by the version
> > of
> > > > the
> > > > > > > > Connect
> > > > > > > > > > > > runtime
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > their
> > > > > > > > > > > > > > > > > > > > plugin has been deployed onto. Under
> > the
> > > > > hood,
> > > > > > > this
> > > > > > > > > > > library
> > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > > > > > reflection to determine whether
> > classes,
> > > > > > methods,
> > > > > > > > > etc.
> > > > > > > > > > > are
> > > > > > > > > > > > > > > > available,
> > > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > the developer wouldn't have to do
> > > anything
> > > > > more
> > > > > > > > than
> > > > > > > > > > > check
> > > > > > > > > > > > > (for
> > > > > > > > > > > > > > > > > > example)
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > `Features.SINK_TASK_ERRANT_RECORD_REPORTER.enabled()`
> > > > > > > > > > to
> > > > > > > > > > > > know
> > > > > > > > > > > > > > at
> > > > > > > > > > > > > > > > any
> > > > > > > > > > > > > > > > > > > point
> > > > > > > > > > > > > > > > > > > > in the lifetime of their
> connector/task
> > > > > whether
> > > > > > > > that
> > > > > > > > > > > > feature
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > provided
> > > > > > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > > > > > the runtime.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > One other high-level comment: this
> > > doesn't
> > > > > > > address
> > > > > > > > > > every
> > > > > > > > > > > > > case,
> > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > > > consider adding an API to "ack" sink
> > > > records.
> > > > > > > This
> > > > > > > > > > could
> > > > > > > > > > > > use
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > SubmittedRecords class [1] (with some
> > > > slight
> > > > > > > > tweaks)
> > > > > > > > > > > under
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > hood
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > track the latest-acked offset for
> each
> > > > topic
> > > > > > > > > partition.
> > > > > > > > > > > > This
> > > > > > > > > > > > > > way,
> > > > > > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > > > developers won't be responsible for
> > > > tracking
> > > > > > > > offsets
> > > > > > > > > at
> > > > > > > > > > > all
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > their
> > > > > > > > > > > > > > > > > > sink
> > > > > > > > > > > > > > > > > > > > tasks (eliminating issues with the
> > > accuracy
> > > > > of
> > > > > > > > > > > > > > > post-transformation
> > > > > > > > > > > > > > > > > > T/P/O
> > > > > > > > > > > > > > > > > > > > sink record information), and they'll
> > > only
> > > > > have
> > > > > > > to
> > > > > > > > > > notify
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > > > > > framework when a record has been
> > > > successfully
> > > > > > > > > > dispatched
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > external
> > > > > > > > > > > > > > > > > > > > system. This provides a cleaner,
> > > friendlier
> > > > > > API,
> > > > > > > > and
> > > > > > > > > > also
> > > > > > > > > > > > > > enables
> > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > > fine-grained metrics like the ones
> > > proposed
> > > > > in
> > > > > > > > > KIP-767
> > > > > > > > > > > [2].
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > [1] -
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/9ab140d5419d735baae45aff56ffce7f5622744f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
> > > > > > > > > > > > > > > > > > > > [2] -
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-767%3A+Connect+Latency+Metrics
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Tue, Nov 1, 2022 at 11:21 AM Yash
> > > Mayya
> > > > <
> > > > > > > > > > > > > > yash.mayya@gmail.com
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > It's been a while for this one but
> > the
> > > > > more I
> > > > > > > > think
> > > > > > > > > > > about
> > > > > > > > > > > > > it,
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > more
> > > > > > > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > > > > feel like the current approach
> with a
> > > new
> > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > > > > > might not be optimal. We're trying
> to
> > > > fix a
> > > > > > > > pretty
> > > > > > > > > > > corner
> > > > > > > > > > > > > > case
> > > > > > > > > > > > > > > > bug
> > > > > > > > > > > > > > > > > > here
> > > > > > > > > > > > > > > > > > > > > (usage of topic mutating SMTs with
> > sink
> > > > > > > > connectors
> > > > > > > > > > that
> > > > > > > > > > > > do
> > > > > > > > > > > > > > > their
> > > > > > > > > > > > > > > > > own
> > > > > > > > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > > > > > > tracking) and I'm not sure that
> > > warrants
> > > > a
> > > > > > > change
> > > > > > > > > to
> > > > > > > > > > > > such a
> > > > > > > > > > > > > > > > central
> > > > > > > > > > > > > > > > > > > > > interface method. The new
> > > `SinkTask::put`
> > > > > > > method
> > > > > > > > > just
> > > > > > > > > > > > seems
> > > > > > > > > > > > > > > > > somewhat
> > > > > > > > > > > > > > > > > > > odd
> > > > > > > > > > > > > > > > > > > > > and it may not be very
> understandable
> > > > for a
> > > > > > new
> > > > > > > > > > reader
> > > > > > > > > > > -
> > > > > > > > > > > > I
> > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > should be the case for a public
> > > interface
> > > > > > > method.
> > > > > > > > > > > > > > Furthermore,
> > > > > > > > > > > > > > > > even
> > > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > elaborate documentation in place,
> I'm
> > > not
> > > > > > sure
> > > > > > > if
> > > > > > > > > > it'll
> > > > > > > > > > > > be
> > > > > > > > > > > > > > very
> > > > > > > > > > > > > > > > > > obvious
> > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > most people what the purpose of
> > having
> > > > > these
> > > > > > > two
> > > > > > > > > > `put`
> > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > > > > > > they should be used by sink task
> > > > > > > implementations.
> > > > > > > > > > What
> > > > > > > > > > > do
> > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > think?
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Mon, Oct 10, 2022 at 9:33 PM
> Yash
> > > > Mayya
> > > > > <
> > > > > > > > > > > > > > > yash.mayya@gmail.com
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Hi Randall,
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Thanks a lot for your valuable
> > > feedback
> > > > > so
> > > > > > > far!
> > > > > > > > > > I've
> > > > > > > > > > > > > > updated
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > KIP
> > > > > > > > > > > > > > > > > > > > > based
> > > > > > > > > > > > > > > > > > > > > > on our discussion above. Could
> you
> > > > please
> > > > > > > take
> > > > > > > > > > > another
> > > > > > > > > > > > > > look?
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 4, 2022 at 12:40 AM
> > > Randall
> > > > > > > Hauch <
> > > > > > > > > > > > > > > > rhauch@gmail.com>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >> On Mon, Oct 3, 2022 at 11:45 AM
> > Yash
> > > > > > Mayya <
> > > > > > > > > > > > > > > > > yash.mayya@gmail.com>
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> > Hi Randall,
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > Thanks for elaborating. I
> think
> > > > these
> > > > > > are
> > > > > > > > all
> > > > > > > > > > very
> > > > > > > > > > > > > good
> > > > > > > > > > > > > > > > points
> > > > > > > > > > > > > > > > > > > and I
> > > > > > > > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > > > > > > >> > why the overloaded
> > `SinkTask::put`
> > > > > > method
> > > > > > > > is a
> > > > > > > > > > > > cleaner
> > > > > > > > > > > > > > > > > solution
> > > > > > > > > > > > > > > > > > > > > overall.
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > > public void
> > > > > put(Collection<SinkRecord>
> > > > > > > > > > records,
> > > > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > > >> > TopicPartition>
> > > > > updatedTopicPartitions)
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > I think this should be
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > `public void
> > > > > put(Collection<SinkRecord>
> > > > > > > > > records,
> > > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > > >> > TopicPartition>
> > > > > > originalTopicPartitions)`
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > instead because the sink
> records
> > > > > > > themselves
> > > > > > > > > have
> > > > > > > > > > > the
> > > > > > > > > > > > > > > updated
> > > > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > > > > > >> > partitions (i.e. after all
> > > > > > transformations
> > > > > > > > > have
> > > > > > > > > > > been
> > > > > > > > > > > > > > > > applied)
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> KIP
> > > > > > > > > > > > > > > > > > > > > >> > is proposing a way for the
> tasks
> > > to
> > > > be
> > > > > > > able
> > > > > > > > to
> > > > > > > > > > > > access
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > original
> > > > > > > > > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > > > > > > >> > partition (i.e. before
> > > > transformations
> > > > > > > have
> > > > > > > > > been
> > > > > > > > > > > > > > applied).
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> Sounds good.
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > > Of course, if the developer
> > does
> > > > not
> > > > > > > need
> > > > > > > > > > > separate
> > > > > > > > > > > > > > > > methods,
> > > > > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > >> > easily have the older `put`
> > method
> > > > > > simply
> > > > > > > > > > delegate
> > > > > > > > > > > > to
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > newer
> > > > > > > > > > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > If the developer does not need
> > > > > separate
> > > > > > > > > methods
> > > > > > > > > > > > (i.e.
> > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > >> > use this new addition), they
> can
> > > > > simply
> > > > > > > > > continue
> > > > > > > > > > > > > > > > implementing
> > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> > older `put` method right?
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> Correct. We should update the
> > > JavaDoc
> > > > of
> > > > > > > both
> > > > > > > > > > > methods
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > make
> > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > clear,
> > > > > > > > > > > > > > > > > > > > > >> and in general how the two
> methods
> > > > > should
> > > > > > > are
> > > > > > > > > used
> > > > > > > > > > > and
> > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > >> implemented. That can be part of
> > the
> > > > PR,
> > > > > > and
> > > > > > > > the
> > > > > > > > > > KIP
> > > > > > > > > > > > > > doesn't
> > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > >> wording.
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > > Finally, this gives us a
> > roadmap
> > > > for
> > > > > > > > > > > *eventually*
> > > > > > > > > > > > > > > > > deprecating
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > > > > > > > >> > method, once the Connect
> runtime
> > > > > > versions
> > > > > > > > > > without
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > > change
> > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > > > > > > > >> > enough.
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > I'm not sure we'd ever want to
> > > > > deprecate
> > > > > > > the
> > > > > > > > > > older
> > > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > > Most
> > > > > > > > > > > > > > > > > > > > common
> > > > > > > > > > > > > > > > > > > > > >> sink
> > > > > > > > > > > > > > > > > > > > > >> > connector implementations do
> not
> > > do
> > > > > > their
> > > > > > > > own
> > > > > > > > > > > offset
> > > > > > > > > > > > > > > > tracking
> > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > >> > asynchronous processing and
> will
> > > > > > probably
> > > > > > > > > never
> > > > > > > > > > > > have a
> > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> > additional parameter
> > > > `Map<SinkRecord,
> > > > > > > > > > > > TopicPartition>
> > > > > > > > > > > > > > > > > > > > > >> > originalTopicPartitions` in
> the
> > > > > proposed
> > > > > > > new
> > > > > > > > > > `put`
> > > > > > > > > > > > > > method.
> > > > > > > > > > > > > > > > > These
> > > > > > > > > > > > > > > > > > > > > >> connectors
> > > > > > > > > > > > > > > > > > > > > >> > can continue implementing only
> > the
> > > > > > > existing
> > > > > > > > > > > > > > > `SinkTask::put`
> > > > > > > > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > > > > > >> > will be called by the default
> > > > > > > implementation
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > > newer
> > > > > > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > > > > > > > >> > method.
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> +1
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > > the pre-commit methods use
> the
> > > > same
> > > > > > > > > > > > > > `Map<TopicPartition,
> > > > > > > > > > > > > > > > > > > > > >> > OffsetAndMetadata>
> > currentOffsets`
> > > > > data
> > > > > > > > > > structure
> > > > > > > > > > > > I'm
> > > > > > > > > > > > > > > > > suggesting
> > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > >> used.
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > The data structure you're
> > > suggesting
> > > > > be
> > > > > > > used
> > > > > > > > > is
> > > > > > > > > > a
> > > > > > > > > > > > > > > > > > `Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > > >> > TopicPartition>` which will
> map
> > > > > > > `SinkRecord`
> > > > > > > > > > > objects
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > original
> > > > > > > > > > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > > > > > > > > > >> > partition of the corresponding
> > > > > > > > > `ConsumerRecord`
> > > > > > > > > > > > right?
> > > > > > > > > > > > > > To
> > > > > > > > > > > > > > > > > > clarify,
> > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > >> is
> > > > > > > > > > > > > > > > > > > > > >> > a new data structure that will
> > > need
> > > > to
> > > > > > be
> > > > > > > > > > managed
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> `WorkerSinkTask`.
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> Ah, you're right. Thanks for the
> > > > > > correction.
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> Best regards,
> > > > > > > > > > > > > > > > > > > > > >> Randall
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > > > > > > > > > >> > Yash
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >> > On Mon, Oct 3, 2022 at 1:20 AM
> > > > Randall
> > > > > > > > Hauch <
> > > > > > > > > > > > > > > > > rhauch@gmail.com>
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >> > > Hi, Yash.
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > I'm not sure I quite
> > understand
> > > > why
> > > > > it
> > > > > > > > would
> > > > > > > > > > be
> > > > > > > > > > > > > > "easier"
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > > > > >> > > > developers to account for
> > > > > > implementing
> > > > > > > > two
> > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > > > > > > > >> > > > methods (assuming that
> they
> > > want
> > > > > to
> > > > > > > use
> > > > > > > > > this
> > > > > > > > > > > new
> > > > > > > > > > > > > > > > feature)
> > > > > > > > > > > > > > > > > > > versus
> > > > > > > > > > > > > > > > > > > > > >> using
> > > > > > > > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > > > > > > > >> > > > try-catch block around
> > > > > `SinkRecord`
> > > > > > > > access
> > > > > > > > > > > > > methods?
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > Using a try-catch to try
> > around
> > > an
> > > > > API
> > > > > > > > > method
> > > > > > > > > > > that
> > > > > > > > > > > > > > > *might*
> > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > > > > > >> is a
> > > > > > > > > > > > > > > > > > > > > >> > > very unusual thing for most
> > > > > > developers.
> > > > > > > > > > > > > Unfortunately,
> > > > > > > > > > > > > > > > we've
> > > > > > > > > > > > > > > > > > had
> > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > >> > resort
> > > > > > > > > > > > > > > > > > > > > >> > > to this atypical approach
> with
> > > > > Connect
> > > > > > > in
> > > > > > > > > > places
> > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > > was
> > > > > > > > > > > > > > > > > > > no
> > > > > > > > > > > > > > > > > > > > > >> good
> > > > > > > > > > > > > > > > > > > > > >> > > alternative. We seem to
> > relying
> > > > upon
> > > > > > > > pattern
> > > > > > > > > > > > because
> > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > easier
> > > > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > >> us,
> > > > > > > > > > > > > > > > > > > > > >> > > not because it offers a
> better
> > > > > > > experience
> > > > > > > > > for
> > > > > > > > > > > > > > Connector
> > > > > > > > > > > > > > > > > > > > developers.
> > > > > > > > > > > > > > > > > > > > > >> IMO,
> > > > > > > > > > > > > > > > > > > > > >> > if
> > > > > > > > > > > > > > > > > > > > > >> > > there's a practical
> > alternative
> > > > that
> > > > > > > uses
> > > > > > > > > > normal
> > > > > > > > > > > > > > > > development
> > > > > > > > > > > > > > > > > > > > > practices
> > > > > > > > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > > > > > > > >> > > techniques, then we should
> use
> > > > that
> > > > > > > > > > alternative.
> > > > > > > > > > > > > IIUC,
> > > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > at
> > > > > > > > > > > > > > > > > > > > > >> least
> > > > > > > > > > > > > > > > > > > > > >> > > one practical alternative
> for
> > > this
> > > > > KIP
> > > > > > > > that
> > > > > > > > > > > would
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > require
> > > > > > > > > > > > > > > > > > > > > >> developers
> > > > > > > > > > > > > > > > > > > > > >> > to
> > > > > > > > > > > > > > > > > > > > > >> > > use the unusual try-catch to
> > > > handle
> > > > > > the
> > > > > > > > case
> > > > > > > > > > > where
> > > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > >> found.
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > I also think having two
> `put`
> > > > > methods
> > > > > > is
> > > > > > > > > > easier
> > > > > > > > > > > > when
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > Connector
> > > > > > > > > > > > > > > > > > > > > >> has to
> > > > > > > > > > > > > > > > > > > > > >> > > do different things for
> > > different
> > > > > > > Connect
> > > > > > > > > > > > runtimes,
> > > > > > > > > > > > > > too.
> > > > > > > > > > > > > > > > One
> > > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > > those
> > > > > > > > > > > > > > > > > > > > > >> > > methods is called by newer
> > > Connect
> > > > > > > > runtimes
> > > > > > > > > > with
> > > > > > > > > > > > the
> > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > > behavior,
> > > > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > > > > > > >> > > other method is called by an
> > > older
> > > > > > > Connect
> > > > > > > > > > > > runtime.
> > > > > > > > > > > > > Of
> > > > > > > > > > > > > > > > > course,
> > > > > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> > > developer does not need
> > separate
> > > > > > > methods,
> > > > > > > > > they
> > > > > > > > > > > can
> > > > > > > > > > > > > > > easily
> > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > > > > > > > >> > > `put` method simply delegate
> > to
> > > > the
> > > > > > > newer
> > > > > > > > > > > method.
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > Finally, this gives us a
> > roadmap
> > > > for
> > > > > > > > > > > *eventually*
> > > > > > > > > > > > > > > > > deprecating
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> older
> > > > > > > > > > > > > > > > > > > > > >> > > method, once the Connect
> > runtime
> > > > > > > versions
> > > > > > > > > > > without
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > change
> > > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > > > > > > > >> > > enough.
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > I think the advantage of
> going
> > > > with
> > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> > > > proposed approach in the
> KIP
> > > is
> > > > > that
> > > > > > > it
> > > > > > > > > > > wouldn't
> > > > > > > > > > > > > > > require
> > > > > > > > > > > > > > > > > > extra
> > > > > > > > > > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > > >> > > > TopicPartition> in
> > > > > `WorkerSinkTask`
> > > > > > in
> > > > > > > > > your
> > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > > > approach)
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > The connector does have to
> do
> > > some
> > > > > of
> > > > > > > this
> > > > > > > > > > > > > bookkeeping
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > > > > > > >> track
> > > > > > > > > > > > > > > > > > > > > >> > > the topic partition offsets
> > used
> > > > in
> > > > > > the
> > > > > > > > > > > > `preCommit`,
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> pre-commit
> > > > > > > > > > > > > > > > > > > > > >> > > methods use the same
> > > > > > > `Map<TopicPartition,
> > > > > > > > > > > > > > > > OffsetAndMetadata>
> > > > > > > > > > > > > > > > > > > > > >> > > currentOffsets`
> > > > > > > > > > > > > > > > > > > > > >> > > data structure I'm
> suggesting
> > be
> > > > > used.
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > I hope that helps.
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > Best regards,
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > Randall
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > On Mon, Sep 26, 2022 at 9:38
> > AM
> > > > Yash
> > > > > > > > Mayya <
> > > > > > > > > > > > > > > > > > > yash.mayya@gmail.com>
> > > > > > > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> > > > Hi Randall,
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > Thanks for reviewing the
> > KIP!
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > That latter logic can
> get
> > > > quite
> > > > > > > ugly.
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > I'm not sure I quite
> > > understand
> > > > > why
> > > > > > it
> > > > > > > > > would
> > > > > > > > > > > be
> > > > > > > > > > > > > > > "easier"
> > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > >> connector
> > > > > > > > > > > > > > > > > > > > > >> > > > developers to account for
> > > > > > implementing
> > > > > > > > two
> > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > > > > > >> `put`
> > > > > > > > > > > > > > > > > > > > > >> > > > methods (assuming that
> they
> > > want
> > > > > to
> > > > > > > use
> > > > > > > > > this
> > > > > > > > > > > new
> > > > > > > > > > > > > > > > feature)
> > > > > > > > > > > > > > > > > > > versus
> > > > > > > > > > > > > > > > > > > > > >> using
> > > > > > > > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > > > > > > > >> > > > try-catch block around
> > > > > `SinkRecord`
> > > > > > > > access
> > > > > > > > > > > > > methods?
> > > > > > > > > > > > > > In
> > > > > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > > > > > > cases, a
> > > > > > > > > > > > > > > > > > > > > >> > > > connector developer would
> > need
> > > > to
> > > > > > > write
> > > > > > > > > > > > additional
> > > > > > > > > > > > > > > code
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > order
> > > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > >> > > ensure
> > > > > > > > > > > > > > > > > > > > > >> > > > that their connector
> > continues
> > > > > > working
> > > > > > > > > with
> > > > > > > > > > > > older
> > > > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > > > > > > runtimes.
> > > > > > > > > > > > > > > > > > > > > >> > > > Furthermore, we would
> > probably
> > > > > need
> > > > > > to
> > > > > > > > > > > carefully
> > > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > >> > > > implementation for the
> older
> > > > `put`
> > > > > > > > method
> > > > > > > > > > > should
> > > > > > > > > > > > > > look
> > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > >> > connectors
> > > > > > > > > > > > > > > > > > > > > >> > > > that want to use this new
> > > > > feature. I
> > > > > > > > think
> > > > > > > > > > the
> > > > > > > > > > > > > > > advantage
> > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > > going
> > > > > > > > > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > > > > > > >> > > > proposed approach in the
> KIP
> > > is
> > > > > that
> > > > > > > it
> > > > > > > > > > > wouldn't
> > > > > > > > > > > > > > > require
> > > > > > > > > > > > > > > > > > extra
> > > > > > > > > > > > > > > > > > > > > >> > > book-keeping
> > > > > > > > > > > > > > > > > > > > > >> > > > (the Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > > >> > > > TopicPartition> in
> > > > > `WorkerSinkTask`
> > > > > > in
> > > > > > > > > your
> > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > > > approach)
> > > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > >> also
> > > > > > > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > > > > > > >> > > > fact that the try-catch
> > based
> > > > > logic
> > > > > > is
> > > > > > > > an
> > > > > > > > > > > > already
> > > > > > > > > > > > > > > > > > established
> > > > > > > > > > > > > > > > > > > > > >> pattern
> > > > > > > > > > > > > > > > > > > > > >> > > > through
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
> > > > > > > > > > > > > > > > > > > > > >> > > > and other KIPs which added
> > > > methods
> > > > > > to
> > > > > > > > > > > > source/sink
> > > > > > > > > > > > > > > > > > > connector/task
> > > > > > > > > > > > > > > > > > > > > >> > > contexts.
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > Let me know if you still
> > feel
> > > > that
> > > > > > > > having
> > > > > > > > > a
> > > > > > > > > > > new
> > > > > > > > > > > > > > > > overloaded
> > > > > > > > > > > > > > > > > > put
> > > > > > > > > > > > > > > > > > > > > >> method
> > > > > > > > > > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > > > > > > > > > >> > > a
> > > > > > > > > > > > > > > > > > > > > >> > > > cleaner solution and I'd
> be
> > > > happy
> > > > > to
> > > > > > > > > > > reconsider!
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > >> > > > Yash
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > On Thu, Sep 22, 2022 at
> > 11:18
> > > PM
> > > > > > > Randall
> > > > > > > > > > > Hauch <
> > > > > > > > > > > > > > > > > > > > rhauch@gmail.com>
> > > > > > > > > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > Hi, Yash. Thanks for
> > picking
> > > > up
> > > > > > this
> > > > > > > > KIP
> > > > > > > > > > and
> > > > > > > > > > > > > > > > discussion.
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > The KIP includes this
> > > rejected
> > > > > > > > > > alternative:
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > 4. Update SinkTask.put
> > in
> > > > any
> > > > > > way
> > > > > > > to
> > > > > > > > > > pass
> > > > > > > > > > > > the
> > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > > >> > outside
> > > > > > > > > > > > > > > > > > > > > >> > > > > > SinkRecord (e.g. a Map
> > or
> > > a
> > > > > > > derived
> > > > > > > > > > class)
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > >    -
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > >    Much more
> disruptive
> > > > change
> > > > > > > > without
> > > > > > > > > > > > > > > considerable
> > > > > > > > > > > > > > > > > pros
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > One advantage about
> doing
> > > this
> > > > > is
> > > > > > > that
> > > > > > > > > > sink
> > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > > > > > > >> implementations
> > > > > > > > > > > > > > > > > > > > > >> > > can
> > > > > > > > > > > > > > > > > > > > > >> > > > > more easily implement
> two
> > > > > > different
> > > > > > > > > > > "put(...)"
> > > > > > > > > > > > > > > methods
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > handle
> > > > > > > > > > > > > > > > > > > > > >> > > running
> > > > > > > > > > > > > > > > > > > > > >> > > > in
> > > > > > > > > > > > > > > > > > > > > >> > > > > a variety of runtimes,
> > > without
> > > > > > > having
> > > > > > > > to
> > > > > > > > > > use
> > > > > > > > > > > > > > > try-catch
> > > > > > > > > > > > > > > > > > logic
> > > > > > > > > > > > > > > > > > > > > >> around
> > > > > > > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > > > > > > >> > > > > newer SinkRecord access
> > > > methods.
> > > > > > > That
> > > > > > > > > > latter
> > > > > > > > > > > > > logic
> > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > get
> > > > > > > > > > > > > > > > > > > > quite
> > > > > > > > > > > > > > > > > > > > > >> > ugly.
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > For example, the
> existing
> > > > `put`
> > > > > > > method
> > > > > > > > > has
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > signature:
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > public abstract void
> > > > > > > > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > > > > > > > records);
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > If we added an
> overloaded
> > > > method
> > > > > > > that
> > > > > > > > > > passed
> > > > > > > > > > > > in
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > > map
> > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > old
> > > > > > > > > > > > > > > > > > > > > >> > > > > topic+partition for each
> > > > record
> > > > > > (and
> > > > > > > > > > defined
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > absence
> > > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > > >> entry
> > > > > > > > > > > > > > > > > > > > > >> > as
> > > > > > > > > > > > > > > > > > > > > >> > > > > having an unchanged
> topic
> > > and
> > > > > > > > > partition):
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > public void
> > > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > > records,
> > > > > > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > > >> > > > > TopicPartition>
> > > > > > > > updatedTopicPartitions)
> > > > > > > > > {
> > > > > > > > > > > > > > > > > > > > > >> > > > > put(records);
> > > > > > > > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > then a `SinkTask`
> > > > implementation
> > > > > > > that
> > > > > > > > > > wants
> > > > > > > > > > > to
> > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > > > > >> feature
> > > > > > > > > > > > > > > > > > > > > >> > > could
> > > > > > > > > > > > > > > > > > > > > >> > > > > simply implement both
> > > methods:
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > public void
> > > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > > records)
> > > > > > > > > > > > > {
> > > > > > > > > > > > > > > > > > > > > >> > > > > // Running in an older
> > > > runtime,
> > > > > so
> > > > > > > no
> > > > > > > > > > > tracking
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > > SMT-modified
> > > > > > > > > > > > > > > > > > > > > >> topic
> > > > > > > > > > > > > > > > > > > > > >> > > > names
> > > > > > > > > > > > > > > > > > > > > >> > > > > or partitions
> > > > > > > > > > > > > > > > > > > > > >> > > > > put(records, Map.of());
> > > > > > > > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > public void
> > > > > > > put(Collection<SinkRecord>
> > > > > > > > > > > > records,
> > > > > > > > > > > > > > > > > > > > Map<SinkRecord,
> > > > > > > > > > > > > > > > > > > > > >> > > > > TopicPartition>
> > > > > > > > updatedTopicPartitions)
> > > > > > > > > {
> > > > > > > > > > > > > > > > > > > > > >> > > > > // real logic here
> > > > > > > > > > > > > > > > > > > > > >> > > > > }
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > This seems a lot easier
> > than
> > > > > > having
> > > > > > > to
> > > > > > > > > use
> > > > > > > > > > > > > > try-catch
> > > > > > > > > > > > > > > > > > logic,
> > > > > > > > > > > > > > > > > > > > yet
> > > > > > > > > > > > > > > > > > > > > >> still
> > > > > > > > > > > > > > > > > > > > > >> > > > > allows sink connectors
> to
> > > > > utilize
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > > > > > > functionality
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > > > > > > >> > work
> > > > > > > > > > > > > > > > > > > > > >> > > > with
> > > > > > > > > > > > > > > > > > > > > >> > > > > older Connect runtimes.
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > WDYT?
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > Randall
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > On Thu, Sep 8, 2022 at
> > 7:03
> > > AM
> > > > > > Yash
> > > > > > > > > Mayya
> > > > > > > > > > <
> > > > > > > > > > > > > > > > > > > > yash.mayya@gmail.com
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > Hi all,
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > I would like to
> > (re)start
> > > a
> > > > > new
> > > > > > > > > > discussion
> > > > > > > > > > > > > > thread
> > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > KIP-793
> > > > > > > > > > > > > > > > > > > > > >> (Kafka
> > > > > > > > > > > > > > > > > > > > > >> > > > > > Connect) which
> proposes
> > > some
> > > > > > > > additions
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > public
> > > > > > > > > > > > > > > > > > > > > SinkRecord
> > > > > > > > > > > > > > > > > > > > > >> > > > interface
> > > > > > > > > > > > > > > > > > > > > >> > > > > > in order to support
> > topic
> > > > > > mutating
> > > > > > > > > SMTs
> > > > > > > > > > > for
> > > > > > > > > > > > > sink
> > > > > > > > > > > > > > > > > > > connectors
> > > > > > > > > > > > > > > > > > > > > >> that do
> > > > > > > > > > > > > > > > > > > > > >> > > > their
> > > > > > > > > > > > > > > > > > > > > >> > > > > > own offset tracking.
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > Links:
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > KIP:
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336830
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > Older discussion
> thread:
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > https://lists.apache.org/thread/00kcth6057jdcsyzgy1x8nb2s1cymy8h
> > > > > > > > > > > > ,
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > https://lists.apache.org/thread/rzqkm0q5y5v3vdjhg8wqppxbkw7nyopj
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > Jira:
> > > > > > > > > > > > > > > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-13431
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > >> > > > > > Yash
> > > > > > > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>