You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Bateman, Matt" <ma...@ebay.com> on 2011/09/30 02:14:10 UTC

Managing KafkaMessageStreams

Hi All,

Apologies if I've missed something obvious.

First, is there any reason not to call the ConsumerConnector.createMessageStreams() more than once to create streams for the same topic? Or should all streams be created at a single point in time?

Second, is there a "safe" way to stop reading from a KafkaMessageStream? Currently I'm just interrupting the thread when I need to stop reading from the stream. There is no "close" or "shutdown" method though. (Also, the Scala class doesn't annotate that InterruptedException can be thrown, requiring some gymnastics to be performed into to correctly handle that exception.)

Third, (depending on the answer to 2) is it better to create more than one ConsumerConnector to manage sets of streams? The ConsumerConnector seems to have a clear "shutdown" method. Calling that would shut down all the streams that are associated with it.

Fourth, I see the "consumerTimeoutMs" attribute to timeout a consumer. Is there a cleaner way to achieve this at a more granular level? For instance, providing a timeout duration in the call to ConsumerIterator.next() that will simply return null?

Thanks,

Matt

Re: Managing KafkaMessageStreams

Posted by Jun Rao <ju...@gmail.com>.
Matt,

KafkaMessageStream is only cleaned up if you shut down connector. So if you
want to be able to stop different streams independently, you can use
multiple connectors.

Jun

On Fri, Sep 30, 2011 at 10:05 AM, Bateman, Matt <ma...@ebay.com> wrote:

> Hi Jun,
>
> Thanks for such a quick response.
>
> For reference, our current plan is to over partition a topic and start or
> stop consumers as necessary to match the load. Our processing of a message
> takes a bit of time.
>
> As to stopping a stream, I'm aware of the consumerTimeoutMs variable and
> the procedure you describe. I was hoping that there was some way to "close"
> a stream, causing a null to be returned from the call to next(), indicating
> the end of the stream. No need to wake threads to check if they should stop
> reading.
>
> As for KafkaMessageStream, when is it cleaned up (and the partition
> redistributed)? When the ConsumerConnector is shut down? After some
> configurable time out?
>
> Thanks,
>
> Matt
>
> -----Original Message-----
> From: Jun Rao [mailto:junrao@gmail.com]
> Sent: Thursday, September 29, 2011 8:21 PM
> To: kafka-users@incubator.apache.org
> Subject: Re: Managing KafkaMessageStreams
>
> Matt,
>
> Typically, one would just create all streams in a single
> ConsumerConnector.createMessageStreams().
> There is no harm to call createMessageStreams multiple times, even on the
> same topic, although I can't think of a good reason why you need that.
>
> There is not much difference whether you use one connector or multiple
> ones.
> Typically, you just need one connector.
>
> To consume from a stream, you would just iterate through the messages in a
> loop in a thread. If you want to stop, you can just check an isStopping
> variable. The problem is that if there is no new message, the next call on
> the stream blocks and you don't get a chance to check the variable. That's
> what consumerTimeoutMs is used for. If consumerTimeoutMs is set, the next
> call will throw an exception if no message is received within the timeout
> period. Then you can catch the exception and then check the isStopping
> variable.
>
> Thanks,
>
> Jun
>
> On Thu, Sep 29, 2011 at 5:14 PM, Bateman, Matt <ma...@ebay.com> wrote:
>
> > Hi All,
> >
> > Apologies if I've missed something obvious.
> >
> > First, is there any reason not to call the
> > ConsumerConnector.createMessageStreams() more than once to create
> > streams for the same topic? Or should all streams be created at a
> > single point in time?
> >
> > Second, is there a "safe" way to stop reading from a KafkaMessageStream?
> > Currently I'm just interrupting the thread when I need to stop reading
> > from the stream. There is no "close" or "shutdown" method though.
> > (Also, the Scala class doesn't annotate that InterruptedException can
> > be thrown, requiring some gymnastics to be performed into to correctly
> > handle that
> > exception.)
> >
> > Third, (depending on the answer to 2) is it better to create more than
> > one ConsumerConnector to manage sets of streams? The ConsumerConnector
> > seems to have a clear "shutdown" method. Calling that would shut down
> > all the streams that are associated with it.
> >
> > Fourth, I see the "consumerTimeoutMs" attribute to timeout a consumer.
> > Is there a cleaner way to achieve this at a more granular level? For
> > instance, providing a timeout duration in the call to
> > ConsumerIterator.next() that will simply return null?
> >
> > Thanks,
> >
> > Matt
> >
>

RE: Managing KafkaMessageStreams

Posted by "Bateman, Matt" <ma...@ebay.com>.
Hi Jun,

Thanks for such a quick response.

For reference, our current plan is to over partition a topic and start or stop consumers as necessary to match the load. Our processing of a message takes a bit of time.

As to stopping a stream, I'm aware of the consumerTimeoutMs variable and the procedure you describe. I was hoping that there was some way to "close" a stream, causing a null to be returned from the call to next(), indicating the end of the stream. No need to wake threads to check if they should stop reading.

As for KafkaMessageStream, when is it cleaned up (and the partition redistributed)? When the ConsumerConnector is shut down? After some configurable time out?

Thanks,

Matt

-----Original Message-----
From: Jun Rao [mailto:junrao@gmail.com] 
Sent: Thursday, September 29, 2011 8:21 PM
To: kafka-users@incubator.apache.org
Subject: Re: Managing KafkaMessageStreams

Matt,

Typically, one would just create all streams in a single ConsumerConnector.createMessageStreams().
There is no harm to call createMessageStreams multiple times, even on the same topic, although I can't think of a good reason why you need that.

There is not much difference whether you use one connector or multiple ones.
Typically, you just need one connector.

To consume from a stream, you would just iterate through the messages in a loop in a thread. If you want to stop, you can just check an isStopping variable. The problem is that if there is no new message, the next call on the stream blocks and you don't get a chance to check the variable. That's what consumerTimeoutMs is used for. If consumerTimeoutMs is set, the next call will throw an exception if no message is received within the timeout period. Then you can catch the exception and then check the isStopping variable.

Thanks,

Jun

On Thu, Sep 29, 2011 at 5:14 PM, Bateman, Matt <ma...@ebay.com> wrote:

> Hi All,
>
> Apologies if I've missed something obvious.
>
> First, is there any reason not to call the
> ConsumerConnector.createMessageStreams() more than once to create 
> streams for the same topic? Or should all streams be created at a 
> single point in time?
>
> Second, is there a "safe" way to stop reading from a KafkaMessageStream?
> Currently I'm just interrupting the thread when I need to stop reading 
> from the stream. There is no "close" or "shutdown" method though. 
> (Also, the Scala class doesn't annotate that InterruptedException can 
> be thrown, requiring some gymnastics to be performed into to correctly 
> handle that
> exception.)
>
> Third, (depending on the answer to 2) is it better to create more than 
> one ConsumerConnector to manage sets of streams? The ConsumerConnector 
> seems to have a clear "shutdown" method. Calling that would shut down 
> all the streams that are associated with it.
>
> Fourth, I see the "consumerTimeoutMs" attribute to timeout a consumer. 
> Is there a cleaner way to achieve this at a more granular level? For 
> instance, providing a timeout duration in the call to 
> ConsumerIterator.next() that will simply return null?
>
> Thanks,
>
> Matt
>

Re: Managing KafkaMessageStreams

Posted by Jun Rao <ju...@gmail.com>.
Matt,

Typically, one would just create all streams in a single
ConsumerConnector.createMessageStreams().
There is no harm to call createMessageStreams multiple times, even on the
same topic, although I can't think of a good reason why you need that.

There is not much difference whether you use one connector or multiple ones.
Typically, you just need one connector.

To consume from a stream, you would just iterate through the messages in a
loop in a thread. If you want to stop, you can just check an isStopping
variable. The problem is that if there is no new message, the next call on
the stream blocks and you don't get a chance to check the variable. That's
what consumerTimeoutMs is used for. If consumerTimeoutMs is set, the next
call will throw an exception if no message is received within the timeout
period. Then you can catch the exception and then check the isStopping
variable.

Thanks,

Jun

On Thu, Sep 29, 2011 at 5:14 PM, Bateman, Matt <ma...@ebay.com> wrote:

> Hi All,
>
> Apologies if I've missed something obvious.
>
> First, is there any reason not to call the
> ConsumerConnector.createMessageStreams() more than once to create streams
> for the same topic? Or should all streams be created at a single point in
> time?
>
> Second, is there a "safe" way to stop reading from a KafkaMessageStream?
> Currently I'm just interrupting the thread when I need to stop reading from
> the stream. There is no "close" or "shutdown" method though. (Also, the
> Scala class doesn't annotate that InterruptedException can be thrown,
> requiring some gymnastics to be performed into to correctly handle that
> exception.)
>
> Third, (depending on the answer to 2) is it better to create more than one
> ConsumerConnector to manage sets of streams? The ConsumerConnector seems to
> have a clear "shutdown" method. Calling that would shut down all the streams
> that are associated with it.
>
> Fourth, I see the "consumerTimeoutMs" attribute to timeout a consumer. Is
> there a cleaner way to achieve this at a more granular level? For instance,
> providing a timeout duration in the call to ConsumerIterator.next() that
> will simply return null?
>
> Thanks,
>
> Matt
>