You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ian Friedman <ia...@flurry.com> on 2013/09/04 22:43:38 UTC

Sometimes get no message streams in consumer

Hey gang, I have a strange intermittent issue with our 0.7.2 consumers that is perpelxing me:

It seems like, sometimes very infrequently, when restarting our Consumer fleet, some consumers threads just fail to start. The consumer connector itself starts fine, and seems to be involved in subsequent rebalances, but based on the logging I'm seeing, it looks like our one call to ConsumerConnector.createMessageStreams is returning an empty map.  

Is that something that should happen normally and should we be retrying in our calling code? Our consumer code looks like this:

Map<String, List<KafkaStream<Message>>> topicMessageStreams =  
   fConsumerConnector.createMessageStreams( getTopicMap() );

for( Map.Entry<String, List<KafkaStream<Message>>> entry : topicMessageStreams.entrySet() )  
{
final String topicName = entry.getKey();  
log.info( "starting stream for topic: " + topicName );
final KafkaStream<Message> stream = entry.getValue().get(0);

….

Are we doing anything dumb here? In the case I described, we're never getting the "starting stream for topic: " message in our log output, so my only guess is that createMessageStreams is returning an empty map.  

Thanks in advance guys!

--  
Ian Friedman


Re: Sometimes get no message streams in consumer

Posted by Ian Friedman <ia...@flurry.com>.
Sadly it's not easily reproducible, it seems rather random while we're starting large numbers of consumer processes. Out of the 300 or so we restarted during our last release, I saw this happen to 3 of them. And we stagger our restarts in groups of 100.   

--  
Ian Friedman


On Thursday, September 5, 2013 at 11:27 PM, Jun Rao wrote:

> Could you reproduce this easily?
>  
> Thanks,
>  
> Jun
>  
>  
> On Thu, Sep 5, 2013 at 10:52 AM, Ian Friedman <ian@flurry.com (mailto:ian@flurry.com)> wrote:
>  
> > It shouldn't be, no… it returns an ImmutableMap..
> >  
> > setTopicMap(ImmutableMap.of(KafkaLogQueuer.sKafkaDataLogPathsTopic, 1));
> >  
> > --
> > Ian Friedman
> >  
> >  
> > On Wednesday, September 4, 2013 at 11:40 PM, Jun Rao wrote:
> >  
> > > What createMessageStreams() returns should match what's in the input. Is
> > it
> > > possible for getTopicMap() to return an empty map?
> > >  
> > > Thanks,
> > >  
> > > Jun
> > >  
> > >  
> > > On Wed, Sep 4, 2013 at 1:43 PM, Ian Friedman <ian@flurry.com (mailto:ian@flurry.com) (mailto:
> > ian@flurry.com (mailto:ian@flurry.com))> wrote:
> > >  
> > > > Hey gang, I have a strange intermittent issue with our 0.7.2 consumers
> > > > that is perpelxing me:
> > > >  
> > > > It seems like, sometimes very infrequently, when restarting our
> > Consumer
> > > > fleet, some consumers threads just fail to start. The consumer
> > >  
> >  
> > connector
> > > > itself starts fine, and seems to be involved in subsequent rebalances,
> > >  
> >  
> > but
> > > > based on the logging I'm seeing, it looks like our one call to
> > > > ConsumerConnector.createMessageStreams is returning an empty map.
> > > >  
> > > > Is that something that should happen normally and should we be
> > retrying in
> > > > our calling code? Our consumer code looks like this:
> > > >  
> > > > Map<String, List<KafkaStream<Message>>> topicMessageStreams =
> > > > fConsumerConnector.createMessageStreams( getTopicMap() );
> > > >  
> > > > for( Map.Entry<String, List<KafkaStream<Message>>> entry :
> > > > topicMessageStreams.entrySet() )
> > > > {
> > > > final String topicName = entry.getKey();
> > > > log.info (http://log.info)( "starting stream for topic: " + topicName
> > > >  
> > >  
> >  
> > );
> > > > final KafkaStream<Message> stream = entry.getValue().get(0);
> > > >  
> > > > ….
> > > >  
> > > > Are we doing anything dumb here? In the case I described, we're never
> > > > getting the "starting stream for topic: " message in our log output,
> > > >  
> > >  
> >  
> > so my
> > > > only guess is that createMessageStreams is returning an empty map.
> > > >  
> > > > Thanks in advance guys!
> > > >  
> > > > --
> > > > Ian Friedman
> > > >  
> > >  
> >  
> >  
>  
>  
>  



Re: Sometimes get no message streams in consumer

Posted by Jun Rao <ju...@gmail.com>.
Could you reproduce this easily?

Thanks,

Jun


On Thu, Sep 5, 2013 at 10:52 AM, Ian Friedman <ia...@flurry.com> wrote:

> It shouldn't be, no… it returns an ImmutableMap..
>
> setTopicMap(ImmutableMap.of(KafkaLogQueuer.sKafkaDataLogPathsTopic, 1));
>
> --
> Ian Friedman
>
>
> On Wednesday, September 4, 2013 at 11:40 PM, Jun Rao wrote:
>
> > What createMessageStreams() returns should match what's in the input. Is
> it
> > possible for getTopicMap() to return an empty map?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Sep 4, 2013 at 1:43 PM, Ian Friedman <ian@flurry.com (mailto:
> ian@flurry.com)> wrote:
> >
> > > Hey gang, I have a strange intermittent issue with our 0.7.2 consumers
> > > that is perpelxing me:
> > >
> > > It seems like, sometimes very infrequently, when restarting our
> Consumer
> > > fleet, some consumers threads just fail to start. The consumer
> connector
> > > itself starts fine, and seems to be involved in subsequent rebalances,
> but
> > > based on the logging I'm seeing, it looks like our one call to
> > > ConsumerConnector.createMessageStreams is returning an empty map.
> > >
> > > Is that something that should happen normally and should we be
> retrying in
> > > our calling code? Our consumer code looks like this:
> > >
> > > Map<String, List<KafkaStream<Message>>> topicMessageStreams =
> > > fConsumerConnector.createMessageStreams( getTopicMap() );
> > >
> > > for( Map.Entry<String, List<KafkaStream<Message>>> entry :
> > > topicMessageStreams.entrySet() )
> > > {
> > > final String topicName = entry.getKey();
> > > log.info (http://log.info)( "starting stream for topic: " + topicName
> );
> > > final KafkaStream<Message> stream = entry.getValue().get(0);
> > >
> > > ….
> > >
> > > Are we doing anything dumb here? In the case I described, we're never
> > > getting the "starting stream for topic: " message in our log output,
> so my
> > > only guess is that createMessageStreams is returning an empty map.
> > >
> > > Thanks in advance guys!
> > >
> > > --
> > > Ian Friedman
> > >
> >
> >
> >
>
>
>

Re: Sometimes get no message streams in consumer

Posted by Ian Friedman <ia...@flurry.com>.
It shouldn't be, no… it returns an ImmutableMap..  

setTopicMap(ImmutableMap.of(KafkaLogQueuer.sKafkaDataLogPathsTopic, 1));  

--  
Ian Friedman


On Wednesday, September 4, 2013 at 11:40 PM, Jun Rao wrote:

> What createMessageStreams() returns should match what's in the input. Is it
> possible for getTopicMap() to return an empty map?
>  
> Thanks,
>  
> Jun
>  
>  
> On Wed, Sep 4, 2013 at 1:43 PM, Ian Friedman <ian@flurry.com (mailto:ian@flurry.com)> wrote:
>  
> > Hey gang, I have a strange intermittent issue with our 0.7.2 consumers
> > that is perpelxing me:
> >  
> > It seems like, sometimes very infrequently, when restarting our Consumer
> > fleet, some consumers threads just fail to start. The consumer connector
> > itself starts fine, and seems to be involved in subsequent rebalances, but
> > based on the logging I'm seeing, it looks like our one call to
> > ConsumerConnector.createMessageStreams is returning an empty map.
> >  
> > Is that something that should happen normally and should we be retrying in
> > our calling code? Our consumer code looks like this:
> >  
> > Map<String, List<KafkaStream<Message>>> topicMessageStreams =
> > fConsumerConnector.createMessageStreams( getTopicMap() );
> >  
> > for( Map.Entry<String, List<KafkaStream<Message>>> entry :
> > topicMessageStreams.entrySet() )
> > {
> > final String topicName = entry.getKey();
> > log.info (http://log.info)( "starting stream for topic: " + topicName );
> > final KafkaStream<Message> stream = entry.getValue().get(0);
> >  
> > ….
> >  
> > Are we doing anything dumb here? In the case I described, we're never
> > getting the "starting stream for topic: " message in our log output, so my
> > only guess is that createMessageStreams is returning an empty map.
> >  
> > Thanks in advance guys!
> >  
> > --
> > Ian Friedman
> >  
>  
>  
>  



Re: Sometimes get no message streams in consumer

Posted by Jun Rao <ju...@gmail.com>.
What createMessageStreams() returns should match what's in the input. Is it
possible for getTopicMap() to return an empty map?

Thanks,

Jun


On Wed, Sep 4, 2013 at 1:43 PM, Ian Friedman <ia...@flurry.com> wrote:

> Hey gang, I have a strange intermittent issue with our 0.7.2 consumers
> that is perpelxing me:
>
> It seems like, sometimes very infrequently, when restarting our Consumer
> fleet, some consumers threads just fail to start. The consumer connector
> itself starts fine, and seems to be involved in subsequent rebalances, but
> based on the logging I'm seeing, it looks like our one call to
> ConsumerConnector.createMessageStreams is returning an empty map.
>
> Is that something that should happen normally and should we be retrying in
> our calling code? Our consumer code looks like this:
>
> Map<String, List<KafkaStream<Message>>> topicMessageStreams =
>    fConsumerConnector.createMessageStreams( getTopicMap() );
>
> for( Map.Entry<String, List<KafkaStream<Message>>> entry :
> topicMessageStreams.entrySet() )
> {
> final String topicName = entry.getKey();
> log.info( "starting stream for topic: " + topicName );
> final KafkaStream<Message> stream = entry.getValue().get(0);
>
> ….
>
> Are we doing anything dumb here? In the case I described, we're never
> getting the "starting stream for topic: " message in our log output, so my
> only guess is that createMessageStreams is returning an empty map.
>
> Thanks in advance guys!
>
> --
> Ian Friedman
>
>