You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mingtao Zhang <ma...@gmail.com> on 2014/08/11 23:33:21 UTC

Consumer Parallelism

Hi,

We are using the following method on ConsumerConnector to get multiple
streams per topic, and we have multiple partitions per topic. It looks like
only one of the runnable is active through a relative long time period. Is
there anything we could possible missed?

public <K,V> Map<String, List<KafkaStream<K,V>>>
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K>
keyDecoder, Decoder<V> valueDecoder);

Then we loop through the streams and create multiple runnable to consumer
the data.

        for (KafkaStream<Object, Object> kafkaStream : streams) {
            ConsumerRunnable consumerRunnable = runnableBuilder.get()
                    .setMessageConsumer(consumer)
                    .setKafkaStream(kafkaStream)
                    .build();

            executor.submit(consumerRunnable);
        }

Best Regards,
Mingtao

Re: Consumer Parallelism

Posted by Guozhang Wang <wa...@gmail.com>.
Mingtao,

We have also noticed this issue and are trying to fix it in the new
producer:

KAFKA-1586 <https://issues.apache.org/jira/browse/KAFKA-1586>

Guozhang


On Tue, Aug 12, 2014 at 9:41 AM, Mingtao Zhang <ma...@gmail.com>
wrote:

> Great! I am in the 10 min category for sure. I do see there is NO
> partition key provided in our code.
>
> I feel it's too much 'customization' when Kafka provides a 'randomness'
> default partition strategy while have another layer doing the 10 min tricky
> to optimize socket stuff.
>
> Anyway, thank you so much for the help!
>
> Mingtao Sent from iPhone
>
> > On Aug 12, 2014, at 12:22 PM, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > I see your question now. You may want to read this FAQ:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
> > ?
> >
> >
> > On Tue, Aug 12, 2014 at 8:11 AM, Mingtao Zhang <ma...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >> I think what I am looking for is the real 'randomness' when producer
> write
> >> to the partitions. Based on my log, through a long time period, only one
> >> partition got the write, while the other side, only one consumer is
> active.
> >> In my case the consumer is slow, so when it comes back for the next
> >> message, the whole pipeline is slowed down.
> >>
> >> The 'round robin' works for my case. Is it Email a good thread to
> follow?
> >>
> >>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCANZY1Phu8_QZqHCw-ivGp=MXMZPZE9+2VJXRLewZzEbjoR0Ynw@mail.gmail.com%3E
> >>
> >> Best Regards,
> >> Mingtao
> >>
> >>
> >> On Tue, Aug 12, 2014 at 7:55 AM, Mingtao Zhang <ma...@gmail.com>
> >> wrote:
> >>
> >>> Hi Guozhang,
> >>>
> >>> Thank you!
> >>>
> >>> Could I say the consumer 'take turns to consume' is resulted by the
> >>> correspond partition got the 'message write'?
> >>>
> >>> The problem I am facing is my 'enrichment' (getting more data based on
> >> raw
> >>> data) consumer took too much time to complete one message consumption.
> To
> >>> explore more parallel, could I say my only choice is 'decouple consumer
> >>> consumption with enrichment'?
> >>>
> >>> Mingtao Sent from iPhone
> >>>
> >>>> On Aug 12, 2014, at 1:10 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>>>
> >>>> Hello Mingtao,
> >>>>
> >>>> The partition will not be re-assigned to other consumers unless the
> >>> current
> >>>> consumer fails, so the behavior you described will not be expected.
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Mon, Aug 11, 2014 at 6:27 PM, Mingtao Zhang <
> mail2mingtao@gmail.com
> >>>
> >>>> wrote:
> >>>>
> >>>>> Hi Guozhang,
> >>>>>
> >>>>> I do have another Email talking about Partitions per topic. I paste
> it
> >>>>> within this Email.
> >>>>>
> >>>>> I am expecting those consumers will work concurrently. The behavior I
> >>>>> observed here is consumer thread-1 will work a while, then thread-3
> >> will
> >>>>> work, then thread-0 ..., is it normal?
> >>>>>
> >>>>> version is 2.2.0.
> >>>>>
> >>>>> Best Regards,
> >>>>> Mingtao
> >>>>>
> >>>>>> On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>> num.partitions is only used as a default value when the createTopic
> >>>>> command
> >>>>>> does not specify the num.partitions or it is automatically created.
> >> In
> >>>>> your
> >>>>>> case since you always use its value in the createTopic you will
> >> always
> >>>>> can
> >>>>>> one partition. Try change your code to sth. like:
> >>>>>>
> >>>>>>       String[] args = new String[]{
> >>>>>>           "--zookeeper", config.getString("zookeeper"),
> >>>>>>           "--topic", config.getString("topic"),
> >>>>>>           "--replica", config.getString("replicas"),
> >>>>>>           "--partition", "8"
> >>>>>>       };
> >>>>>>
> >>>>>>       CreateTopicCommand.main(args);
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang <
> >> mail2mingtao@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> In kafka.properties, I put (forgot to change):
> >>>>>>>
> >>>>>>> num.partitions=1
> >>>>>>>
> >>>>>>> While I create topics programatically:
> >>>>>>>
> >>>>>>>       String[] args = new String[]{
> >>>>>>>           "--zookeeper", config.getString("zookeeper"),
> >>>>>>>           "--topic", config.getString("topic"),
> >>>>>>>           "--replica", config.getString("replicas"),
> >>>>>>>           "--partition", config.getString("partitions")
> >>>>>>>       };
> >>>>>>>
> >>>>>>>       CreateTopicCommand.main(args);
> >>>>>>>
> >>>>>>> The performance engineer told me only one consumer thread is
> >> actively
> >>>>>>> working even I have 4 consumer threads started (could see when
> >>>>> debugging
> >>>>>> or
> >>>>>>> in thread dump); and 4 partitions configured from the args.
> >>>>>>>
> >>>>>>> It seems that num.partitions is still controlling the parallelism.
> >> Do
> >>> I
> >>>>>>> need to change this num.partitions accordingly? Could I remove it?
> >>> What
> >>>>>> is
> >>>>>>> I have different parallel requirement for different topic?
> >>>>>>>
> >>>>>>> Thank you in advance!
> >>>>>>>
> >>>>>>> Best Regards,
> >>>>>>> Mingtao
> >>>>>
> >>>>>
> >>>>>> On Mon, Aug 11, 2014 at 7:37 PM, Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>>>
> >>>>>> Mingtao,
> >>>>>>
> >>>>>> How many partitions of the consumed topic has? Basically the data is
> >>>>>> distributed per-partition, and hence if the number of consumers is
> >>> larger
> >>>>>> than the number of partitions, some consumers will not get any data.
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <
> >> mail2mingtao@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Is it anyhow related to the issue?
> >>>>>>>
> >>>>>>> WARN No previously checkpointed highwatermark value found for topic
> >>> RAW
> >>>>>>> partition 0. Returning 0 as the highwatermark
> >>>>>>> (kafka.server.HighwaterMarkCheckpoint)
> >>>>>>>
> >>>>>>> Mingtao
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: Consumer Parallelism

Posted by Mingtao Zhang <ma...@gmail.com>.
Great! I am in the 10 min category for sure. I do see there is NO partition key provided in our code.

I feel it's too much 'customization' when Kafka provides a 'randomness' default partition strategy while have another layer doing the 10 min tricky to optimize socket stuff.

Anyway, thank you so much for the help!

Mingtao Sent from iPhone

> On Aug 12, 2014, at 12:22 PM, Guozhang Wang <wa...@gmail.com> wrote:
> 
> I see your question now. You may want to read this FAQ:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
> ?
> 
> 
> On Tue, Aug 12, 2014 at 8:11 AM, Mingtao Zhang <ma...@gmail.com>
> wrote:
> 
>> Hi Guozhang,
>> 
>> I think what I am looking for is the real 'randomness' when producer write
>> to the partitions. Based on my log, through a long time period, only one
>> partition got the write, while the other side, only one consumer is active.
>> In my case the consumer is slow, so when it comes back for the next
>> message, the whole pipeline is slowed down.
>> 
>> The 'round robin' works for my case. Is it Email a good thread to follow?
>> 
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCANZY1Phu8_QZqHCw-ivGp=MXMZPZE9+2VJXRLewZzEbjoR0Ynw@mail.gmail.com%3E
>> 
>> Best Regards,
>> Mingtao
>> 
>> 
>> On Tue, Aug 12, 2014 at 7:55 AM, Mingtao Zhang <ma...@gmail.com>
>> wrote:
>> 
>>> Hi Guozhang,
>>> 
>>> Thank you!
>>> 
>>> Could I say the consumer 'take turns to consume' is resulted by the
>>> correspond partition got the 'message write'?
>>> 
>>> The problem I am facing is my 'enrichment' (getting more data based on
>> raw
>>> data) consumer took too much time to complete one message consumption. To
>>> explore more parallel, could I say my only choice is 'decouple consumer
>>> consumption with enrichment'?
>>> 
>>> Mingtao Sent from iPhone
>>> 
>>>> On Aug 12, 2014, at 1:10 AM, Guozhang Wang <wa...@gmail.com> wrote:
>>>> 
>>>> Hello Mingtao,
>>>> 
>>>> The partition will not be re-assigned to other consumers unless the
>>> current
>>>> consumer fails, so the behavior you described will not be expected.
>>>> 
>>>> Guozhang
>>>> 
>>>> 
>>>> On Mon, Aug 11, 2014 at 6:27 PM, Mingtao Zhang <mail2mingtao@gmail.com
>>> 
>>>> wrote:
>>>> 
>>>>> Hi Guozhang,
>>>>> 
>>>>> I do have another Email talking about Partitions per topic. I paste it
>>>>> within this Email.
>>>>> 
>>>>> I am expecting those consumers will work concurrently. The behavior I
>>>>> observed here is consumer thread-1 will work a while, then thread-3
>> will
>>>>> work, then thread-0 ..., is it normal?
>>>>> 
>>>>> version is 2.2.0.
>>>>> 
>>>>> Best Regards,
>>>>> Mingtao
>>>>> 
>>>>>> On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>>>> 
>>>>>> num.partitions is only used as a default value when the createTopic
>>>>> command
>>>>>> does not specify the num.partitions or it is automatically created.
>> In
>>>>> your
>>>>>> case since you always use its value in the createTopic you will
>> always
>>>>> can
>>>>>> one partition. Try change your code to sth. like:
>>>>>> 
>>>>>>       String[] args = new String[]{
>>>>>>           "--zookeeper", config.getString("zookeeper"),
>>>>>>           "--topic", config.getString("topic"),
>>>>>>           "--replica", config.getString("replicas"),
>>>>>>           "--partition", "8"
>>>>>>       };
>>>>>> 
>>>>>>       CreateTopicCommand.main(args);
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang <
>> mail2mingtao@gmail.com
>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi All,
>>>>>>> 
>>>>>>> In kafka.properties, I put (forgot to change):
>>>>>>> 
>>>>>>> num.partitions=1
>>>>>>> 
>>>>>>> While I create topics programatically:
>>>>>>> 
>>>>>>>       String[] args = new String[]{
>>>>>>>           "--zookeeper", config.getString("zookeeper"),
>>>>>>>           "--topic", config.getString("topic"),
>>>>>>>           "--replica", config.getString("replicas"),
>>>>>>>           "--partition", config.getString("partitions")
>>>>>>>       };
>>>>>>> 
>>>>>>>       CreateTopicCommand.main(args);
>>>>>>> 
>>>>>>> The performance engineer told me only one consumer thread is
>> actively
>>>>>>> working even I have 4 consumer threads started (could see when
>>>>> debugging
>>>>>> or
>>>>>>> in thread dump); and 4 partitions configured from the args.
>>>>>>> 
>>>>>>> It seems that num.partitions is still controlling the parallelism.
>> Do
>>> I
>>>>>>> need to change this num.partitions accordingly? Could I remove it?
>>> What
>>>>>> is
>>>>>>> I have different parallel requirement for different topic?
>>>>>>> 
>>>>>>> Thank you in advance!
>>>>>>> 
>>>>>>> Best Regards,
>>>>>>> Mingtao
>>>>> 
>>>>> 
>>>>>> On Mon, Aug 11, 2014 at 7:37 PM, Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>>>> 
>>>>>> Mingtao,
>>>>>> 
>>>>>> How many partitions of the consumed topic has? Basically the data is
>>>>>> distributed per-partition, and hence if the number of consumers is
>>> larger
>>>>>> than the number of partitions, some consumers will not get any data.
>>>>>> 
>>>>>> Guozhang
>>>>>> 
>>>>>> 
>>>>>> On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <
>> mail2mingtao@gmail.com
>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Is it anyhow related to the issue?
>>>>>>> 
>>>>>>> WARN No previously checkpointed highwatermark value found for topic
>>> RAW
>>>>>>> partition 0. Returning 0 as the highwatermark
>>>>>>> (kafka.server.HighwaterMarkCheckpoint)
>>>>>>> 
>>>>>>> Mingtao
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> -- Guozhang
>>>> 
>>>> 
>>>> 
>>>> --
>>>> -- Guozhang
> 
> 
> 
> -- 
> -- Guozhang

Re: Consumer Parallelism

Posted by Guozhang Wang <wa...@gmail.com>.
I see your question now. You may want to read this FAQ:

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
?


On Tue, Aug 12, 2014 at 8:11 AM, Mingtao Zhang <ma...@gmail.com>
wrote:

> Hi Guozhang,
>
> I think what I am looking for is the real 'randomness' when producer write
> to the partitions. Based on my log, through a long time period, only one
> partition got the write, while the other side, only one consumer is active.
> In my case the consumer is slow, so when it comes back for the next
> message, the whole pipeline is slowed down.
>
> The 'round robin' works for my case. Is it Email a good thread to follow?
>
> http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCANZY1Phu8_QZqHCw-ivGp=MXMZPZE9+2VJXRLewZzEbjoR0Ynw@mail.gmail.com%3E
>
> Best Regards,
> Mingtao
>
>
> On Tue, Aug 12, 2014 at 7:55 AM, Mingtao Zhang <ma...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Thank you!
> >
> > Could I say the consumer 'take turns to consume' is resulted by the
> > correspond partition got the 'message write'?
> >
> > The problem I am facing is my 'enrichment' (getting more data based on
> raw
> > data) consumer took too much time to complete one message consumption. To
> > explore more parallel, could I say my only choice is 'decouple consumer
> > consumption with enrichment'?
> >
> > Mingtao Sent from iPhone
> >
> > > On Aug 12, 2014, at 1:10 AM, Guozhang Wang <wa...@gmail.com> wrote:
> > >
> > > Hello Mingtao,
> > >
> > > The partition will not be re-assigned to other consumers unless the
> > current
> > > consumer fails, so the behavior you described will not be expected.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Aug 11, 2014 at 6:27 PM, Mingtao Zhang <mail2mingtao@gmail.com
> >
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> I do have another Email talking about Partitions per topic. I paste it
> > >> within this Email.
> > >>
> > >> I am expecting those consumers will work concurrently. The behavior I
> > >> observed here is consumer thread-1 will work a while, then thread-3
> will
> > >> work, then thread-0 ..., is it normal?
> > >>
> > >> version is 2.2.0.
> > >>
> > >> Best Regards,
> > >> Mingtao
> > >>
> > >>> On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >>>
> > >>> num.partitions is only used as a default value when the createTopic
> > >> command
> > >>> does not specify the num.partitions or it is automatically created.
> In
> > >> your
> > >>> case since you always use its value in the createTopic you will
> always
> > >> can
> > >>> one partition. Try change your code to sth. like:
> > >>>
> > >>>        String[] args = new String[]{
> > >>>            "--zookeeper", config.getString("zookeeper"),
> > >>>            "--topic", config.getString("topic"),
> > >>>            "--replica", config.getString("replicas"),
> > >>>            "--partition", "8"
> > >>>        };
> > >>>
> > >>>        CreateTopicCommand.main(args);
> > >>>
> > >>>
> > >>>
> > >>> On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang <
> mail2mingtao@gmail.com
> > >
> > >>> wrote:
> > >>>
> > >>>> Hi All,
> > >>>>
> > >>>> In kafka.properties, I put (forgot to change):
> > >>>>
> > >>>> num.partitions=1
> > >>>>
> > >>>> While I create topics programatically:
> > >>>>
> > >>>>        String[] args = new String[]{
> > >>>>            "--zookeeper", config.getString("zookeeper"),
> > >>>>            "--topic", config.getString("topic"),
> > >>>>            "--replica", config.getString("replicas"),
> > >>>>            "--partition", config.getString("partitions")
> > >>>>        };
> > >>>>
> > >>>>        CreateTopicCommand.main(args);
> > >>>>
> > >>>> The performance engineer told me only one consumer thread is
> actively
> > >>>> working even I have 4 consumer threads started (could see when
> > >> debugging
> > >>> or
> > >>>> in thread dump); and 4 partitions configured from the args.
> > >>>>
> > >>>> It seems that num.partitions is still controlling the parallelism.
> Do
> > I
> > >>>> need to change this num.partitions accordingly? Could I remove it?
> > What
> > >>> is
> > >>>> I have different parallel requirement for different topic?
> > >>>>
> > >>>> Thank you in advance!
> > >>>>
> > >>>> Best Regards,
> > >>>> Mingtao
> > >>
> > >>
> > >>> On Mon, Aug 11, 2014 at 7:37 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >>>
> > >>> Mingtao,
> > >>>
> > >>> How many partitions of the consumed topic has? Basically the data is
> > >>> distributed per-partition, and hence if the number of consumers is
> > larger
> > >>> than the number of partitions, some consumers will not get any data.
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <
> mail2mingtao@gmail.com
> > >
> > >>> wrote:
> > >>>
> > >>>> Is it anyhow related to the issue?
> > >>>>
> > >>>> WARN No previously checkpointed highwatermark value found for topic
> > RAW
> > >>>> partition 0. Returning 0 as the highwatermark
> > >>>> (kafka.server.HighwaterMarkCheckpoint)
> > >>>>
> > >>>> Mingtao
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Consumer Parallelism

Posted by Mingtao Zhang <ma...@gmail.com>.
Hi Guozhang,

I think what I am looking for is the real 'randomness' when producer write
to the partitions. Based on my log, through a long time period, only one
partition got the write, while the other side, only one consumer is active.
In my case the consumer is slow, so when it comes back for the next
message, the whole pipeline is slowed down.

The 'round robin' works for my case. Is it Email a good thread to follow?
http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3CCANZY1Phu8_QZqHCw-ivGp=MXMZPZE9+2VJXRLewZzEbjoR0Ynw@mail.gmail.com%3E

Best Regards,
Mingtao


On Tue, Aug 12, 2014 at 7:55 AM, Mingtao Zhang <ma...@gmail.com>
wrote:

> Hi Guozhang,
>
> Thank you!
>
> Could I say the consumer 'take turns to consume' is resulted by the
> correspond partition got the 'message write'?
>
> The problem I am facing is my 'enrichment' (getting more data based on raw
> data) consumer took too much time to complete one message consumption. To
> explore more parallel, could I say my only choice is 'decouple consumer
> consumption with enrichment'?
>
> Mingtao Sent from iPhone
>
> > On Aug 12, 2014, at 1:10 AM, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Hello Mingtao,
> >
> > The partition will not be re-assigned to other consumers unless the
> current
> > consumer fails, so the behavior you described will not be expected.
> >
> > Guozhang
> >
> >
> > On Mon, Aug 11, 2014 at 6:27 PM, Mingtao Zhang <ma...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >> I do have another Email talking about Partitions per topic. I paste it
> >> within this Email.
> >>
> >> I am expecting those consumers will work concurrently. The behavior I
> >> observed here is consumer thread-1 will work a while, then thread-3 will
> >> work, then thread-0 ..., is it normal?
> >>
> >> version is 2.2.0.
> >>
> >> Best Regards,
> >> Mingtao
> >>
> >>> On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>>
> >>> num.partitions is only used as a default value when the createTopic
> >> command
> >>> does not specify the num.partitions or it is automatically created. In
> >> your
> >>> case since you always use its value in the createTopic you will always
> >> can
> >>> one partition. Try change your code to sth. like:
> >>>
> >>>        String[] args = new String[]{
> >>>            "--zookeeper", config.getString("zookeeper"),
> >>>            "--topic", config.getString("topic"),
> >>>            "--replica", config.getString("replicas"),
> >>>            "--partition", "8"
> >>>        };
> >>>
> >>>        CreateTopicCommand.main(args);
> >>>
> >>>
> >>>
> >>> On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang <mail2mingtao@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> In kafka.properties, I put (forgot to change):
> >>>>
> >>>> num.partitions=1
> >>>>
> >>>> While I create topics programatically:
> >>>>
> >>>>        String[] args = new String[]{
> >>>>            "--zookeeper", config.getString("zookeeper"),
> >>>>            "--topic", config.getString("topic"),
> >>>>            "--replica", config.getString("replicas"),
> >>>>            "--partition", config.getString("partitions")
> >>>>        };
> >>>>
> >>>>        CreateTopicCommand.main(args);
> >>>>
> >>>> The performance engineer told me only one consumer thread is actively
> >>>> working even I have 4 consumer threads started (could see when
> >> debugging
> >>> or
> >>>> in thread dump); and 4 partitions configured from the args.
> >>>>
> >>>> It seems that num.partitions is still controlling the parallelism. Do
> I
> >>>> need to change this num.partitions accordingly? Could I remove it?
> What
> >>> is
> >>>> I have different parallel requirement for different topic?
> >>>>
> >>>> Thank you in advance!
> >>>>
> >>>> Best Regards,
> >>>> Mingtao
> >>
> >>
> >>> On Mon, Aug 11, 2014 at 7:37 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>>
> >>> Mingtao,
> >>>
> >>> How many partitions of the consumed topic has? Basically the data is
> >>> distributed per-partition, and hence if the number of consumers is
> larger
> >>> than the number of partitions, some consumers will not get any data.
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <mail2mingtao@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Is it anyhow related to the issue?
> >>>>
> >>>> WARN No previously checkpointed highwatermark value found for topic
> RAW
> >>>> partition 0. Returning 0 as the highwatermark
> >>>> (kafka.server.HighwaterMarkCheckpoint)
> >>>>
> >>>> Mingtao
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >
> >
> >
> > --
> > -- Guozhang
>

Re: Consumer Parallelism

Posted by Mingtao Zhang <ma...@gmail.com>.
Hi Guozhang,

Thank you!

Could I say the consumer 'take turns to consume' is resulted by the correspond partition got the 'message write'?

The problem I am facing is my 'enrichment' (getting more data based on raw data) consumer took too much time to complete one message consumption. To explore more parallel, could I say my only choice is 'decouple consumer consumption with enrichment'?

Mingtao Sent from iPhone

> On Aug 12, 2014, at 1:10 AM, Guozhang Wang <wa...@gmail.com> wrote:
> 
> Hello Mingtao,
> 
> The partition will not be re-assigned to other consumers unless the current
> consumer fails, so the behavior you described will not be expected.
> 
> Guozhang
> 
> 
> On Mon, Aug 11, 2014 at 6:27 PM, Mingtao Zhang <ma...@gmail.com>
> wrote:
> 
>> Hi Guozhang,
>> 
>> I do have another Email talking about Partitions per topic. I paste it
>> within this Email.
>> 
>> I am expecting those consumers will work concurrently. The behavior I
>> observed here is consumer thread-1 will work a while, then thread-3 will
>> work, then thread-0 ..., is it normal?
>> 
>> version is 2.2.0.
>> 
>> Best Regards,
>> Mingtao
>> 
>>> On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>> 
>>> num.partitions is only used as a default value when the createTopic
>> command
>>> does not specify the num.partitions or it is automatically created. In
>> your
>>> case since you always use its value in the createTopic you will always
>> can
>>> one partition. Try change your code to sth. like:
>>> 
>>>        String[] args = new String[]{
>>>            "--zookeeper", config.getString("zookeeper"),
>>>            "--topic", config.getString("topic"),
>>>            "--replica", config.getString("replicas"),
>>>            "--partition", "8"
>>>        };
>>> 
>>>        CreateTopicCommand.main(args);
>>> 
>>> 
>>> 
>>> On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang <ma...@gmail.com>
>>> wrote:
>>> 
>>>> Hi All,
>>>> 
>>>> In kafka.properties, I put (forgot to change):
>>>> 
>>>> num.partitions=1
>>>> 
>>>> While I create topics programatically:
>>>> 
>>>>        String[] args = new String[]{
>>>>            "--zookeeper", config.getString("zookeeper"),
>>>>            "--topic", config.getString("topic"),
>>>>            "--replica", config.getString("replicas"),
>>>>            "--partition", config.getString("partitions")
>>>>        };
>>>> 
>>>>        CreateTopicCommand.main(args);
>>>> 
>>>> The performance engineer told me only one consumer thread is actively
>>>> working even I have 4 consumer threads started (could see when
>> debugging
>>> or
>>>> in thread dump); and 4 partitions configured from the args.
>>>> 
>>>> It seems that num.partitions is still controlling the parallelism. Do I
>>>> need to change this num.partitions accordingly? Could I remove it? What
>>> is
>>>> I have different parallel requirement for different topic?
>>>> 
>>>> Thank you in advance!
>>>> 
>>>> Best Regards,
>>>> Mingtao
>> 
>> 
>>> On Mon, Aug 11, 2014 at 7:37 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>> 
>>> Mingtao,
>>> 
>>> How many partitions of the consumed topic has? Basically the data is
>>> distributed per-partition, and hence if the number of consumers is larger
>>> than the number of partitions, some consumers will not get any data.
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <ma...@gmail.com>
>>> wrote:
>>> 
>>>> Is it anyhow related to the issue?
>>>> 
>>>> WARN No previously checkpointed highwatermark value found for topic RAW
>>>> partition 0. Returning 0 as the highwatermark
>>>> (kafka.server.HighwaterMarkCheckpoint)
>>>> 
>>>> Mingtao
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
> 
> 
> 
> -- 
> -- Guozhang

Re: Consumer Parallelism

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Mingtao,

The partition will not be re-assigned to other consumers unless the current
consumer fails, so the behavior you described will not be expected.

Guozhang


On Mon, Aug 11, 2014 at 6:27 PM, Mingtao Zhang <ma...@gmail.com>
wrote:

> Hi Guozhang,
>
> I do have another Email talking about Partitions per topic. I paste it
> within this Email.
>
> I am expecting those consumers will work concurrently. The behavior I
> observed here is consumer thread-1 will work a while, then thread-3 will
> work, then thread-0 ..., is it normal?
>
> version is 2.2.0.
>
> Best Regards,
> Mingtao
>
> On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > num.partitions is only used as a default value when the createTopic
> command
> > does not specify the num.partitions or it is automatically created. In
> your
> > case since you always use its value in the createTopic you will always
> can
> > one partition. Try change your code to sth. like:
> >
> >         String[] args = new String[]{
> >             "--zookeeper", config.getString("zookeeper"),
> >             "--topic", config.getString("topic"),
> >             "--replica", config.getString("replicas"),
> >             "--partition", "8"
> >         };
> >
> >         CreateTopicCommand.main(args);
> >
> >
> >
> > On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang <ma...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > In kafka.properties, I put (forgot to change):
> > >
> > > num.partitions=1
> > >
> > > While I create topics programatically:
> > >
> > >         String[] args = new String[]{
> > >             "--zookeeper", config.getString("zookeeper"),
> > >             "--topic", config.getString("topic"),
> > >             "--replica", config.getString("replicas"),
> > >             "--partition", config.getString("partitions")
> > >         };
> > >
> > >         CreateTopicCommand.main(args);
> > >
> > > The performance engineer told me only one consumer thread is actively
> > > working even I have 4 consumer threads started (could see when
> debugging
> > or
> > > in thread dump); and 4 partitions configured from the args.
> > >
> > > It seems that num.partitions is still controlling the parallelism. Do I
> > > need to change this num.partitions accordingly? Could I remove it? What
> > is
> > > I have different parallel requirement for different topic?
> > >
> > > Thank you in advance!
> > >
> > > Best Regards,
> > > Mingtao
> > >
> >
>
>
> On Mon, Aug 11, 2014 at 7:37 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Mingtao,
> >
> > How many partitions of the consumed topic has? Basically the data is
> > distributed per-partition, and hence if the number of consumers is larger
> > than the number of partitions, some consumers will not get any data.
> >
> > Guozhang
> >
> >
> > On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <ma...@gmail.com>
> > wrote:
> >
> > > Is it anyhow related to the issue?
> > >
> > > WARN No previously checkpointed highwatermark value found for topic RAW
> > > partition 0. Returning 0 as the highwatermark
> > > (kafka.server.HighwaterMarkCheckpoint)
> > >
> > > Mingtao
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: Consumer Parallelism

Posted by Mingtao Zhang <ma...@gmail.com>.
Hi Guozhang,

I do have another Email talking about Partitions per topic. I paste it
within this Email.

I am expecting those consumers will work concurrently. The behavior I
observed here is consumer thread-1 will work a while, then thread-3 will
work, then thread-0 ..., is it normal?

version is 2.2.0.

Best Regards,
Mingtao

On Wed, Jul 23, 2014 at 7:57 PM, Guozhang Wang <wa...@gmail.com> wrote:

> num.partitions is only used as a default value when the createTopic command
> does not specify the num.partitions or it is automatically created. In your
> case since you always use its value in the createTopic you will always can
> one partition. Try change your code to sth. like:
>
>         String[] args = new String[]{
>             "--zookeeper", config.getString("zookeeper"),
>             "--topic", config.getString("topic"),
>             "--replica", config.getString("replicas"),
>             "--partition", "8"
>         };
>
>         CreateTopicCommand.main(args);
>
>
>
> On Wed, Jul 23, 2014 at 4:38 PM, Mingtao Zhang <ma...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > In kafka.properties, I put (forgot to change):
> >
> > num.partitions=1
> >
> > While I create topics programatically:
> >
> >         String[] args = new String[]{
> >             "--zookeeper", config.getString("zookeeper"),
> >             "--topic", config.getString("topic"),
> >             "--replica", config.getString("replicas"),
> >             "--partition", config.getString("partitions")
> >         };
> >
> >         CreateTopicCommand.main(args);
> >
> > The performance engineer told me only one consumer thread is actively
> > working even I have 4 consumer threads started (could see when debugging
> or
> > in thread dump); and 4 partitions configured from the args.
> >
> > It seems that num.partitions is still controlling the parallelism. Do I
> > need to change this num.partitions accordingly? Could I remove it? What
> is
> > I have different parallel requirement for different topic?
> >
> > Thank you in advance!
> >
> > Best Regards,
> > Mingtao
> >
>


On Mon, Aug 11, 2014 at 7:37 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Mingtao,
>
> How many partitions of the consumed topic has? Basically the data is
> distributed per-partition, and hence if the number of consumers is larger
> than the number of partitions, some consumers will not get any data.
>
> Guozhang
>
>
> On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <ma...@gmail.com>
> wrote:
>
> > Is it anyhow related to the issue?
> >
> > WARN No previously checkpointed highwatermark value found for topic RAW
> > partition 0. Returning 0 as the highwatermark
> > (kafka.server.HighwaterMarkCheckpoint)
> >
> > Mingtao
> >
>
>
>
> --
> -- Guozhang
>

Re: Consumer Parallelism

Posted by Guozhang Wang <wa...@gmail.com>.
Mingtao,

How many partitions of the consumed topic has? Basically the data is
distributed per-partition, and hence if the number of consumers is larger
than the number of partitions, some consumers will not get any data.

Guozhang


On Mon, Aug 11, 2014 at 3:29 PM, Mingtao Zhang <ma...@gmail.com>
wrote:

> Is it anyhow related to the issue?
>
> WARN No previously checkpointed highwatermark value found for topic RAW
> partition 0. Returning 0 as the highwatermark
> (kafka.server.HighwaterMarkCheckpoint)
>
> Mingtao
>



-- 
-- Guozhang

Re: Consumer Parallelism

Posted by Mingtao Zhang <ma...@gmail.com>.
Is it anyhow related to the issue?

WARN No previously checkpointed highwatermark value found for topic RAW
partition 0. Returning 0 as the highwatermark
(kafka.server.HighwaterMarkCheckpoint)

Mingtao