You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jason Rosenberg <jb...@squareup.com> on 2013/10/03 22:13:14 UTC

Re: is it possible to commit offsets on a per stream basis?

I added a comment/suggestion to:
https://issues.apache.org/jira/browse/KAFKA-966

Basically to expose an api for marking an offset for commit, such that the
auto-commit would only commit offsets up to the last message
'markedForCommit', and not the last 'consumed' offset, which may or may not
have succeeded.  This way, consumer code can just call 'markForCommit()'
after successfully processing each message successfully.

Does that make sense?


On Mon, Sep 9, 2013 at 5:21 PM, Yu, Libo <li...@citi.com> wrote:

> Thanks, Neha. That number of connections formula is very helpful.
>
> Regards,
>
> Libo
>
>
> -----Original Message-----
> From: Neha Narkhede [mailto:neha.narkhede@gmail.com]
> Sent: Monday, September 09, 2013 12:17 PM
> To: users@kafka.apache.org
> Subject: Re: is it possible to commit offsets on a per stream basis?
>
> Memory might become an issue if all the connectors are part of the same
> process. But this is easily solvable by distributing the connectors over
> several machines.
> Number of connections would be (# of connectors) * (# of brokers) and will
> proportionately increase with the # of connectors.
>
> Thanks,
> Neha
>
>
> On Mon, Sep 9, 2013 at 9:08 AM, Yu, Libo <li...@citi.com> wrote:
>
> > If one connector is used for a single stream, when there are many
> > topics/streams, will that cause any performance issue, e.g. too many
> > connections or too much memory or big latency?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -----Original Message-----
> > From: Neha Narkhede [mailto:neha.narkhede@gmail.com]
> > Sent: Sunday, September 08, 2013 12:46 PM
> > To: users@kafka.apache.org
> > Subject: Re: is it possible to commit offsets on a per stream basis?
> >
> > That should be fine too.
> >
> >
> >
> >
> > On Sat, Sep 7, 2013 at 8:33 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > To be clear, it looks like I forgot to add to my question, that I am
> > > asking about creating multiple connectors, within the same consumer
> > > process (as I realize I can obviously have multiple connectors
> > > running on multiple hosts, etc.).  But I'm guessing that should be
> fine too?
> > >
> > > Jason
> > >
> > >
> > >
> > >
> > > On Sat, Sep 7, 2013 at 3:09 PM, Neha Narkhede
> > > <neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > >> Can I create multiple connectors, and have each use the same
> > > > >> Regex
> > > > for the TopicFilter?  Will each connector share the set of
> > > > available topics?  Is this safe to do?
> > > >
> > > > >> Or is it necessary to create mutually non-intersecting regex's
> > > > >> for
> > > each
> > > > connector?
> > > >
> > > > As long as each of those consumer connectors share the same group
> > > > id,
> > > Kafka
> > > > consumer rebalancing should automatically re-distribute the
> > > > topic/partitions amongst the consumer connectors/streams evenly.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Mon, Sep 2, 2013 at 1:35 PM, Jason Rosenberg <jb...@squareup.com>
> > > wrote:
> > > >
> > > > > Will this work if we are using a TopicFilter, that can map to
> > > > > multiple topics.  Can I create multiple connectors, and have
> > > > > each use the same
> > > > Regex
> > > > > for the TopicFilter?  Will each connector share the set of
> > > > > available topics?  Is this safe to do?
> > > > >
> > > > > Or is it necessary to create mutually non-intersecting regex's
> > > > > for each connector?
> > > > >
> > > > > It seems I have a similar issue.  I have been using auto commit
> > > > > mode,
> > > but
> > > > > it doesn't guarantee that all messages committed have been
> > > > > successfully processed (seems a change to the connector itself
> > > > > might expose a way to
> > > > use
> > > > > auto offset commit, and have it never commit a message until it
> > > > > is processed).  But that would be a change to the
> > > > > ZookeeperConsumerConnector....Essentially, it would be great if
> > > > > after processing each message, we could mark the message as
> > > > > 'processed', and
> > > > thus
> > > > > use that status as the max offset to commit when the auto offset
> > > > > commit background thread wakes up each time.
> > > > >
> > > > > Jason
> > > > >
> > > > >
> > > > > On Thu, Aug 29, 2013 at 11:58 AM, Yu, Libo <li...@citi.com>
> wrote:
> > > > >
> > > > > > Thanks, Neha. That is a great answer.
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Libo
> > > > > >
> > > > > >
> > > > > > -----Original Message-----
> > > > > > From: Neha Narkhede [mailto:neha.narkhede@gmail.com]
> > > > > > Sent: Thursday, August 29, 2013 1:55 PM
> > > > > > To: users@kafka.apache.org
> > > > > > Subject: Re: is it possible to commit offsets on a per stream
> > basis?
> > > > > >
> > > > > > 1 We can create multiple connectors. From each connector
> > > > > > create only
> > > > one
> > > > > > stream.
> > > > > > 2 Use a single thread for a stream. In this case, the
> > > > > > connector in
> > > each
> > > > > > thread can commit freely without any dependence on the other
> > threads.
> > > >  Is
> > > > > > this the right way to go? Will it introduce any dead lock when
> > > multiple
> > > > > > connectors commit at the same time?
> > > > > >
> > > > > > This is a better approach as there is no complex locking
> involved.
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 29, 2013 at 10:28 AM, Yu, Libo <li...@citi.com>
> > wrote:
> > > > > >
> > > > > > > Hi team,
> > > > > > >
> > > > > > > This is our current use case:
> > > > > > > Assume there is a topic with multiple partitions.
> > > > > > > 1 Create a connector first and create multiple streams from
> > > > > > > the connector for a topic.
> > > > > > > 2 Create multiple threads, one for each stream. You can
> > > > > > > assume the thread's job is to save the message into the
> database.
> > > > > > > 3 When it is time to commit offsets, all threads have to
> > > synchronize
> > > > > > > on a barrier before committing the offsets. This is to
> > > > > > > ensure no message loss in case of process crash.
> > > > > > >
> > > > > > > As all threads need to synchronize before committing, it is
> > > > > > > not
> > > > > > efficient.
> > > > > > > This is a workaround:
> > > > > > >
> > > > > > > 1 We can create multiple connectors. From each connector
> > > > > > > create
> > > only
> > > > > > > one stream.
> > > > > > > 2 Use a single thread for a stream. In this case, the
> > > > > > > connector in each thread can commit freely without any
> > > > > > > dependence on the other threads.  Is this the right way to go?
> > > > > > > Will it introduce any dead
> > > > lock
> > > > > > > when multiple connectors commit at the same time?
> > > > > > >
> > > > > > > It would be great to allow committing on a per stream basis.
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > > Libo
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

RE: is it possible to commit offsets on a per stream basis?

Posted by "Yu, Libo " <li...@citi.com>.
This will improve efficiency on the client side greatly. And multiple threads don't have to synchronize
before committing offsets. Thanks, Jason.

Regards,

Libo


-----Original Message-----
From: Jason Rosenberg [mailto:jbr@squareup.com] 
Sent: Thursday, October 03, 2013 4:13 PM
To: users@kafka.apache.org
Subject: Re: is it possible to commit offsets on a per stream basis?

I added a comment/suggestion to:
https://issues.apache.org/jira/browse/KAFKA-966

Basically to expose an api for marking an offset for commit, such that the auto-commit would only commit offsets up to the last message 'markedForCommit', and not the last 'consumed' offset, which may or may not have succeeded.  This way, consumer code can just call 'markForCommit()'
after successfully processing each message successfully.

Does that make sense?


On Mon, Sep 9, 2013 at 5:21 PM, Yu, Libo <li...@citi.com> wrote:

> Thanks, Neha. That number of connections formula is very helpful.
>
> Regards,
>
> Libo
>
>
> -----Original Message-----
> From: Neha Narkhede [mailto:neha.narkhede@gmail.com]
> Sent: Monday, September 09, 2013 12:17 PM
> To: users@kafka.apache.org
> Subject: Re: is it possible to commit offsets on a per stream basis?
>
> Memory might become an issue if all the connectors are part of the 
> same process. But this is easily solvable by distributing the 
> connectors over several machines.
> Number of connections would be (# of connectors) * (# of brokers) and 
> will proportionately increase with the # of connectors.
>
> Thanks,
> Neha
>
>
> On Mon, Sep 9, 2013 at 9:08 AM, Yu, Libo <li...@citi.com> wrote:
>
> > If one connector is used for a single stream, when there are many 
> > topics/streams, will that cause any performance issue, e.g. too many 
> > connections or too much memory or big latency?
> >
> > Regards,
> >
> > Libo
> >
> >
> > -----Original Message-----
> > From: Neha Narkhede [mailto:neha.narkhede@gmail.com]
> > Sent: Sunday, September 08, 2013 12:46 PM
> > To: users@kafka.apache.org
> > Subject: Re: is it possible to commit offsets on a per stream basis?
> >
> > That should be fine too.
> >
> >
> >
> >
> > On Sat, Sep 7, 2013 at 8:33 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
> >
> > > To be clear, it looks like I forgot to add to my question, that I 
> > > am asking about creating multiple connectors, within the same 
> > > consumer process (as I realize I can obviously have multiple 
> > > connectors running on multiple hosts, etc.).  But I'm guessing 
> > > that should be
> fine too?
> > >
> > > Jason
> > >
> > >
> > >
> > >
> > > On Sat, Sep 7, 2013 at 3:09 PM, Neha Narkhede 
> > > <neha.narkhede@gmail.com
> > > >wrote:
> > >
> > > > >> Can I create multiple connectors, and have each use the same 
> > > > >> Regex
> > > > for the TopicFilter?  Will each connector share the set of 
> > > > available topics?  Is this safe to do?
> > > >
> > > > >> Or is it necessary to create mutually non-intersecting 
> > > > >> regex's for
> > > each
> > > > connector?
> > > >
> > > > As long as each of those consumer connectors share the same 
> > > > group id,
> > > Kafka
> > > > consumer rebalancing should automatically re-distribute the 
> > > > topic/partitions amongst the consumer connectors/streams evenly.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Mon, Sep 2, 2013 at 1:35 PM, Jason Rosenberg 
> > > > <jb...@squareup.com>
> > > wrote:
> > > >
> > > > > Will this work if we are using a TopicFilter, that can map to 
> > > > > multiple topics.  Can I create multiple connectors, and have 
> > > > > each use the same
> > > > Regex
> > > > > for the TopicFilter?  Will each connector share the set of 
> > > > > available topics?  Is this safe to do?
> > > > >
> > > > > Or is it necessary to create mutually non-intersecting regex's 
> > > > > for each connector?
> > > > >
> > > > > It seems I have a similar issue.  I have been using auto 
> > > > > commit mode,
> > > but
> > > > > it doesn't guarantee that all messages committed have been 
> > > > > successfully processed (seems a change to the connector itself 
> > > > > might expose a way to
> > > > use
> > > > > auto offset commit, and have it never commit a message until 
> > > > > it is processed).  But that would be a change to the 
> > > > > ZookeeperConsumerConnector....Essentially, it would be great 
> > > > > if after processing each message, we could mark the message as 
> > > > > 'processed', and
> > > > thus
> > > > > use that status as the max offset to commit when the auto 
> > > > > offset commit background thread wakes up each time.
> > > > >
> > > > > Jason
> > > > >
> > > > >
> > > > > On Thu, Aug 29, 2013 at 11:58 AM, Yu, Libo <li...@citi.com>
> wrote:
> > > > >
> > > > > > Thanks, Neha. That is a great answer.
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Libo
> > > > > >
> > > > > >
> > > > > > -----Original Message-----
> > > > > > From: Neha Narkhede [mailto:neha.narkhede@gmail.com]
> > > > > > Sent: Thursday, August 29, 2013 1:55 PM
> > > > > > To: users@kafka.apache.org
> > > > > > Subject: Re: is it possible to commit offsets on a per 
> > > > > > stream
> > basis?
> > > > > >
> > > > > > 1 We can create multiple connectors. From each connector 
> > > > > > create only
> > > > one
> > > > > > stream.
> > > > > > 2 Use a single thread for a stream. In this case, the 
> > > > > > connector in
> > > each
> > > > > > thread can commit freely without any dependence on the other
> > threads.
> > > >  Is
> > > > > > this the right way to go? Will it introduce any dead lock 
> > > > > > when
> > > multiple
> > > > > > connectors commit at the same time?
> > > > > >
> > > > > > This is a better approach as there is no complex locking
> involved.
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 29, 2013 at 10:28 AM, Yu, Libo 
> > > > > > <li...@citi.com>
> > wrote:
> > > > > >
> > > > > > > Hi team,
> > > > > > >
> > > > > > > This is our current use case:
> > > > > > > Assume there is a topic with multiple partitions.
> > > > > > > 1 Create a connector first and create multiple streams 
> > > > > > > from the connector for a topic.
> > > > > > > 2 Create multiple threads, one for each stream. You can 
> > > > > > > assume the thread's job is to save the message into the
> database.
> > > > > > > 3 When it is time to commit offsets, all threads have to
> > > synchronize
> > > > > > > on a barrier before committing the offsets. This is to 
> > > > > > > ensure no message loss in case of process crash.
> > > > > > >
> > > > > > > As all threads need to synchronize before committing, it 
> > > > > > > is not
> > > > > > efficient.
> > > > > > > This is a workaround:
> > > > > > >
> > > > > > > 1 We can create multiple connectors. From each connector 
> > > > > > > create
> > > only
> > > > > > > one stream.
> > > > > > > 2 Use a single thread for a stream. In this case, the 
> > > > > > > connector in each thread can commit freely without any 
> > > > > > > dependence on the other threads.  Is this the right way to go?
> > > > > > > Will it introduce any dead
> > > > lock
> > > > > > > when multiple connectors commit at the same time?
> > > > > > >
> > > > > > > It would be great to allow committing on a per stream basis.
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > > Libo
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>