You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dean Arnold <re...@gmail.com> on 2016/09/09 16:55:37 UTC

Sink Connector feature request: SinkTask.putAndReport()

I have a need for volume based commits in a few sink connectors, and the
current interval-only based commit strategy creates some headaches. After
skimming the code, it appears that an alternate put() method that returned
a Map<TopicPartition, Long> might be used to allow a sink connector to keep
Kafka up to date wrt committed offsets in the sink system, so that Kafka
might defer or reset its commit interval for topics/partitions (at least,
for the consumer used for SinkTasks). It wouldn't replace interval based
flush(), but hopefully flush() would be invoked much less frequently, and
permit the flush interval to be increased, so the sink connector can better
optimize its commit batches. Eg, the sink could almost always commit 5000
records, rather than whatever happened to be buffered up when flush() was
called, which might be very small or very large.

I'm thinking of something like

Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record)

Put the records in the sink, and, if the operation results in committing
records to the sink system, report the largest committed offset of each
committed topic/partition. Returns null if no commit occurs.

I'm not certain how a rebalance might effect the processing; since a sink
would still need to support existing interval based commits, the rebalance
(and any sink recovery) would presumably work the same.

Am I overlooking any systemic issues that would preclude such a feature ?

Re: Sink Connector feature request: SinkTask.putAndReport()

Posted by Dean Arnold <re...@gmail.com>.
Great, thanks.

On Tue, Sep 13, 2016 at 5:35 PM, Shikhar Bhushan <sh...@confluent.io>
wrote:

> Good point about size-based.
>
> I created a JIRA to track this feature:
> https://issues.apache.org/jira/browse/KAFKA-4161
>
> On Tue, Sep 13, 2016 at 4:19 PM Dean Arnold <re...@gmail.com> wrote:
>
> > Yes, using the SinkTaskContext as a notification channel works as well,
> so
> > thats fine.
> >
> > While a config value might be useful, its probably not safe to assume
> that
> > a sink would always want the same number of msgs/records for each commit,
> > since the commit volume might be defined in bytes, e.g., accumulating
> > enough data to fill a 128 MB HDFS chunk could be any number of
> > msgs/records.
> >
> > In fact, this mechanism is really more general than just volume based
> > commits; its really about providing sinks a flexible commit capability
> > (e.g., some sink event requires premature commit, or otherwise requires
> > modification of the commit interval).
> >
> > On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <sh...@confluent.io>
> > wrote:
> >
> > > Hi Dean,
> > >
> > > I agree, it would be good to support volume-based offset commits.
> > >
> > > For giving more control on flushes to a sink connector, rather than
> > adding
> > > a new task.put() variant, I think it may be better to add an API like
> > > "requestFlush()" to the `SinkTaskContext` (and perhaps also
> > > `SinkTaskContext`).
> > >
> > > Another option could be to add something like
> "offset.flush.record.count"
> > > in addition to the existing "offset.flush.interval.ms". Both options
> > could
> > > be configured but whichever happens first would reset the other.
> > >
> > > What do you think?
> > >
> > > Best,
> > >
> > > Shikhar
> > >
> > > On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <re...@gmail.com>
> > wrote:
> > >
> > > > I have a need for volume based commits in a few sink connectors, and
> > the
> > > > current interval-only based commit strategy creates some headaches.
> > After
> > > > skimming the code, it appears that an alternate put() method that
> > > returned
> > > > a Map<TopicPartition, Long> might be used to allow a sink connector
> to
> > > keep
> > > > Kafka up to date wrt committed offsets in the sink system, so that
> > Kafka
> > > > might defer or reset its commit interval for topics/partitions (at
> > least,
> > > > for the consumer used for SinkTasks). It wouldn't replace interval
> > based
> > > > flush(), but hopefully flush() would be invoked much less frequently,
> > and
> > > > permit the flush interval to be increased, so the sink connector can
> > > better
> > > > optimize its commit batches. Eg, the sink could almost always commit
> > 5000
> > > > records, rather than whatever happened to be buffered up when flush()
> > was
> > > > called, which might be very small or very large.
> > > >
> > > > I'm thinking of something like
> > > >
> > > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord>
> record)
> > > >
> > > > Put the records in the sink, and, if the operation results in
> > committing
> > > > records to the sink system, report the largest committed offset of
> each
> > > > committed topic/partition. Returns null if no commit occurs.
> > > >
> > > > I'm not certain how a rebalance might effect the processing; since a
> > sink
> > > > would still need to support existing interval based commits, the
> > > rebalance
> > > > (and any sink recovery) would presumably work the same.
> > > >
> > > > Am I overlooking any systemic issues that would preclude such a
> > feature ?
> > > >
> > >
> >
>

Re: Sink Connector feature request: SinkTask.putAndReport()

Posted by Shikhar Bhushan <sh...@confluent.io>.
Good point about size-based.

I created a JIRA to track this feature:
https://issues.apache.org/jira/browse/KAFKA-4161

On Tue, Sep 13, 2016 at 4:19 PM Dean Arnold <re...@gmail.com> wrote:

> Yes, using the SinkTaskContext as a notification channel works as well, so
> thats fine.
>
> While a config value might be useful, its probably not safe to assume that
> a sink would always want the same number of msgs/records for each commit,
> since the commit volume might be defined in bytes, e.g., accumulating
> enough data to fill a 128 MB HDFS chunk could be any number of
> msgs/records.
>
> In fact, this mechanism is really more general than just volume based
> commits; its really about providing sinks a flexible commit capability
> (e.g., some sink event requires premature commit, or otherwise requires
> modification of the commit interval).
>
> On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <sh...@confluent.io>
> wrote:
>
> > Hi Dean,
> >
> > I agree, it would be good to support volume-based offset commits.
> >
> > For giving more control on flushes to a sink connector, rather than
> adding
> > a new task.put() variant, I think it may be better to add an API like
> > "requestFlush()" to the `SinkTaskContext` (and perhaps also
> > `SinkTaskContext`).
> >
> > Another option could be to add something like "offset.flush.record.count"
> > in addition to the existing "offset.flush.interval.ms". Both options
> could
> > be configured but whichever happens first would reset the other.
> >
> > What do you think?
> >
> > Best,
> >
> > Shikhar
> >
> > On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <re...@gmail.com>
> wrote:
> >
> > > I have a need for volume based commits in a few sink connectors, and
> the
> > > current interval-only based commit strategy creates some headaches.
> After
> > > skimming the code, it appears that an alternate put() method that
> > returned
> > > a Map<TopicPartition, Long> might be used to allow a sink connector to
> > keep
> > > Kafka up to date wrt committed offsets in the sink system, so that
> Kafka
> > > might defer or reset its commit interval for topics/partitions (at
> least,
> > > for the consumer used for SinkTasks). It wouldn't replace interval
> based
> > > flush(), but hopefully flush() would be invoked much less frequently,
> and
> > > permit the flush interval to be increased, so the sink connector can
> > better
> > > optimize its commit batches. Eg, the sink could almost always commit
> 5000
> > > records, rather than whatever happened to be buffered up when flush()
> was
> > > called, which might be very small or very large.
> > >
> > > I'm thinking of something like
> > >
> > > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record)
> > >
> > > Put the records in the sink, and, if the operation results in
> committing
> > > records to the sink system, report the largest committed offset of each
> > > committed topic/partition. Returns null if no commit occurs.
> > >
> > > I'm not certain how a rebalance might effect the processing; since a
> sink
> > > would still need to support existing interval based commits, the
> > rebalance
> > > (and any sink recovery) would presumably work the same.
> > >
> > > Am I overlooking any systemic issues that would preclude such a
> feature ?
> > >
> >
>

Re: Sink Connector feature request: SinkTask.putAndReport()

Posted by Dean Arnold <re...@gmail.com>.
Yes, using the SinkTaskContext as a notification channel works as well, so
thats fine.

While a config value might be useful, its probably not safe to assume that
a sink would always want the same number of msgs/records for each commit,
since the commit volume might be defined in bytes, e.g., accumulating
enough data to fill a 128 MB HDFS chunk could be any number of msgs/records.

In fact, this mechanism is really more general than just volume based
commits; its really about providing sinks a flexible commit capability
(e.g., some sink event requires premature commit, or otherwise requires
modification of the commit interval).

On Tue, Sep 13, 2016 at 11:37 AM, Shikhar Bhushan <sh...@confluent.io>
wrote:

> Hi Dean,
>
> I agree, it would be good to support volume-based offset commits.
>
> For giving more control on flushes to a sink connector, rather than adding
> a new task.put() variant, I think it may be better to add an API like
> "requestFlush()" to the `SinkTaskContext` (and perhaps also
> `SinkTaskContext`).
>
> Another option could be to add something like "offset.flush.record.count"
> in addition to the existing "offset.flush.interval.ms". Both options could
> be configured but whichever happens first would reset the other.
>
> What do you think?
>
> Best,
>
> Shikhar
>
> On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <re...@gmail.com> wrote:
>
> > I have a need for volume based commits in a few sink connectors, and the
> > current interval-only based commit strategy creates some headaches. After
> > skimming the code, it appears that an alternate put() method that
> returned
> > a Map<TopicPartition, Long> might be used to allow a sink connector to
> keep
> > Kafka up to date wrt committed offsets in the sink system, so that Kafka
> > might defer or reset its commit interval for topics/partitions (at least,
> > for the consumer used for SinkTasks). It wouldn't replace interval based
> > flush(), but hopefully flush() would be invoked much less frequently, and
> > permit the flush interval to be increased, so the sink connector can
> better
> > optimize its commit batches. Eg, the sink could almost always commit 5000
> > records, rather than whatever happened to be buffered up when flush() was
> > called, which might be very small or very large.
> >
> > I'm thinking of something like
> >
> > Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record)
> >
> > Put the records in the sink, and, if the operation results in committing
> > records to the sink system, report the largest committed offset of each
> > committed topic/partition. Returns null if no commit occurs.
> >
> > I'm not certain how a rebalance might effect the processing; since a sink
> > would still need to support existing interval based commits, the
> rebalance
> > (and any sink recovery) would presumably work the same.
> >
> > Am I overlooking any systemic issues that would preclude such a feature ?
> >
>

Re: Sink Connector feature request: SinkTask.putAndReport()

Posted by Shikhar Bhushan <sh...@confluent.io>.
Hi Dean,

I agree, it would be good to support volume-based offset commits.

For giving more control on flushes to a sink connector, rather than adding
a new task.put() variant, I think it may be better to add an API like
"requestFlush()" to the `SinkTaskContext` (and perhaps also
`SinkTaskContext`).

Another option could be to add something like "offset.flush.record.count"
in addition to the existing "offset.flush.interval.ms". Both options could
be configured but whichever happens first would reset the other.

What do you think?

Best,

Shikhar

On Fri, Sep 9, 2016 at 9:55 AM Dean Arnold <re...@gmail.com> wrote:

> I have a need for volume based commits in a few sink connectors, and the
> current interval-only based commit strategy creates some headaches. After
> skimming the code, it appears that an alternate put() method that returned
> a Map<TopicPartition, Long> might be used to allow a sink connector to keep
> Kafka up to date wrt committed offsets in the sink system, so that Kafka
> might defer or reset its commit interval for topics/partitions (at least,
> for the consumer used for SinkTasks). It wouldn't replace interval based
> flush(), but hopefully flush() would be invoked much less frequently, and
> permit the flush interval to be increased, so the sink connector can better
> optimize its commit batches. Eg, the sink could almost always commit 5000
> records, rather than whatever happened to be buffered up when flush() was
> called, which might be very small or very large.
>
> I'm thinking of something like
>
> Map<TopicPartition, Long> putAndReport(Collection<SinkRecord> record)
>
> Put the records in the sink, and, if the operation results in committing
> records to the sink system, report the largest committed offset of each
> committed topic/partition. Returns null if no commit occurs.
>
> I'm not certain how a rebalance might effect the processing; since a sink
> would still need to support existing interval based commits, the rebalance
> (and any sink recovery) would presumably work the same.
>
> Am I overlooking any systemic issues that would preclude such a feature ?
>