You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Diego Erdody <er...@gmail.com> on 2021/11/03 22:17:06 UTC

[DISCUSS] KIP-793: Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

Hello,

I'd like to propose a small KIP to add a new field to SinkRecord in order
to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous
Sink Connectors (the ones that override preCommit for internal offset
tracking, like S3
<https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L274>
).

Links:

- KIP-793: Sink Connectors: Support topic-mutating SMTs for async
connectors (preCommit users)
<https://cwiki.apache.org/confluence/x/fpFnCw>
- PR #11464 <https://github.com/apache/kafka/pull/11464>

Thanks,

Diego

Re: [DISCUSS] KIP-793: Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

Posted by Randall Hauch <rh...@gmail.com>.
Thanks for the proposed KIP, Diego. I plan to review this in more
detail in the coming weeks.

Best regards,

Randall

On Wed, Nov 3, 2021 at 5:42 PM Chris Egerton
<ch...@confluent.io.invalid> wrote:
>
> Hi Diego,
>
> This is a long time coming and I'm glad to see someone's finally gotten
> around to filling in this feature gap for Connect.
>
> It looks like this KIP does not take the SinkTask::open and SinkTask::close
> methods into account (
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection
> /
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)).
> Is this intentional? If so, it'd be nice to see a rationale for leaving
> this out in the rejected alternatives so; if not, I think we may want to
> add this type of support to the KIP so that we can solve the mutating
> SMT/asynchronous sink connector problem once and for all, instead of
> narrowing but not closing the existing feature gap. We may want to take the
> current effort to add support for cooperative consumer groups (
> https://issues.apache.org/jira/browse/KAFKA-12487 /
> https://github.com/apache/kafka/pull/10563) into account if we opt to add
> support for open/close, since the current behavior of Connect (which
> involves invoking SinkTask::close for every topic partition every time a
> consumer rebalance occurs, then invoking SinkTask::open for all
> still-assigned partitions) may be easier to reason about, but is likely
> going to change soon (although it is an option to hold off on that work if
> this KIP is given priority, which is definitely a valid option).
>
> It also looks like we're only exposing the original topic partition to
> connector developers. I agree with the rationale for not exposing more of
> the original consumer record for the most part, but what about the record's
> offset? Although it's not possible to override the Kafka offset for a sink
> record via the standard SinkRecord::newRecord methods (
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long)
> /
> https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long,java.lang.Iterable)),
> there are still public constructors available for the SinkRecord class that
> can be leveraged by SMTs to return new SinkRecord instances that don't have
> the same Kafka offset as the one that they've mutated. Do you think it may
> be worth the additional maintenance burden and API complexity to
> accommodate this case, with something like a SinkTask::originalKafkaOffset
> method?
>
> I'm also wondering about how exactly this method will be implemented. Will
> we automatically create a new SinkRecord instance at the end of the
> transformation chain in order to provide the correct topic partition (and
> possibly offset)? If so, this should be called out since it means that
> transformations that return custom subclasses of SinkRecord will no longer
> be able to do so (or rather, they will still be able to, but these custom
> subclasses will never be visible to sink tasks).
>
> Finally, a small nit: do you think it'd make sense to separate out the
> newly-proposed SinkTask::originalTopicPartition method into separate
> SinkTask::originalTopic and SinkTask::originalKafkaPartition methods, to
> stay in line with the convention that's been loosely set by the existing,
> separate SinkTask::topic and SinkTask::kafkaPartition methods?
>
> I'm personally looking forward to leveraging this improvement in the
> BigQuery sink connector I help maintain because we recently added a new
> write mode that uses asynchronous writes and SinkTask::preCommit, but
> encourage users to use SMTs to redirect records to different
> datasets/tables in BigQuery, which is currently impossible in that write
> mode. Thanks for taking this on!
>
> Cheers,
>
> Chris
>
> On Wed, Nov 3, 2021 at 6:17 PM Diego Erdody <er...@gmail.com> wrote:
>
> > Hello,
> >
> > I'd like to propose a small KIP to add a new field to SinkRecord in order
> > to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous
> > Sink Connectors (the ones that override preCommit for internal offset
> > tracking, like S3
> > <
> > https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L274
> > >
> > ).
> >
> > Links:
> >
> > - KIP-793: Sink Connectors: Support topic-mutating SMTs for async
> > connectors (preCommit users)
> > <https://cwiki.apache.org/confluence/x/fpFnCw>
> > - PR #11464 <https://github.com/apache/kafka/pull/11464>
> >
> > Thanks,
> >
> > Diego
> >

Re: [DISCUSS] KIP-793: Sink Connectors: Support topic-mutating SMTs for async connectors (preCommit users)

Posted by Chris Egerton <ch...@confluent.io.INVALID>.
Hi Diego,

This is a long time coming and I'm glad to see someone's finally gotten
around to filling in this feature gap for Connect.

It looks like this KIP does not take the SinkTask::open and SinkTask::close
methods into account (
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open(java.util.Collection
/
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close(java.util.Collection)).
Is this intentional? If so, it'd be nice to see a rationale for leaving
this out in the rejected alternatives so; if not, I think we may want to
add this type of support to the KIP so that we can solve the mutating
SMT/asynchronous sink connector problem once and for all, instead of
narrowing but not closing the existing feature gap. We may want to take the
current effort to add support for cooperative consumer groups (
https://issues.apache.org/jira/browse/KAFKA-12487 /
https://github.com/apache/kafka/pull/10563) into account if we opt to add
support for open/close, since the current behavior of Connect (which
involves invoking SinkTask::close for every topic partition every time a
consumer rebalance occurs, then invoking SinkTask::open for all
still-assigned partitions) may be easier to reason about, but is likely
going to change soon (although it is an option to hold off on that work if
this KIP is given priority, which is definitely a valid option).

It also looks like we're only exposing the original topic partition to
connector developers. I agree with the rationale for not exposing more of
the original consumer record for the most part, but what about the record's
offset? Although it's not possible to override the Kafka offset for a sink
record via the standard SinkRecord::newRecord methods (
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long)
/
https://kafka.apache.org/30/javadoc/org/apache/kafka/connect/sink/SinkRecord.html#newRecord(java.lang.String,java.lang.Integer,org.apache.kafka.connect.data.Schema,java.lang.Object,org.apache.kafka.connect.data.Schema,java.lang.Object,java.lang.Long,java.lang.Iterable)),
there are still public constructors available for the SinkRecord class that
can be leveraged by SMTs to return new SinkRecord instances that don't have
the same Kafka offset as the one that they've mutated. Do you think it may
be worth the additional maintenance burden and API complexity to
accommodate this case, with something like a SinkTask::originalKafkaOffset
method?

I'm also wondering about how exactly this method will be implemented. Will
we automatically create a new SinkRecord instance at the end of the
transformation chain in order to provide the correct topic partition (and
possibly offset)? If so, this should be called out since it means that
transformations that return custom subclasses of SinkRecord will no longer
be able to do so (or rather, they will still be able to, but these custom
subclasses will never be visible to sink tasks).

Finally, a small nit: do you think it'd make sense to separate out the
newly-proposed SinkTask::originalTopicPartition method into separate
SinkTask::originalTopic and SinkTask::originalKafkaPartition methods, to
stay in line with the convention that's been loosely set by the existing,
separate SinkTask::topic and SinkTask::kafkaPartition methods?

I'm personally looking forward to leveraging this improvement in the
BigQuery sink connector I help maintain because we recently added a new
write mode that uses asynchronous writes and SinkTask::preCommit, but
encourage users to use SMTs to redirect records to different
datasets/tables in BigQuery, which is currently impossible in that write
mode. Thanks for taking this on!

Cheers,

Chris

On Wed, Nov 3, 2021 at 6:17 PM Diego Erdody <er...@gmail.com> wrote:

> Hello,
>
> I'd like to propose a small KIP to add a new field to SinkRecord in order
> to add support for topic-mutating SMTs (e.g. RegexRouter) to asynchronous
> Sink Connectors (the ones that override preCommit for internal offset
> tracking, like S3
> <
> https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L274
> >
> ).
>
> Links:
>
> - KIP-793: Sink Connectors: Support topic-mutating SMTs for async
> connectors (preCommit users)
> <https://cwiki.apache.org/confluence/x/fpFnCw>
> - PR #11464 <https://github.com/apache/kafka/pull/11464>
>
> Thanks,
>
> Diego
>