You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chen Wang <ch...@gmail.com> on 2014/08/14 19:39:15 UTC

How many threads should I use per topic

Hey, Guys,
I am using the high level consumer. I have a daemon process that checks the
lag for a topic.
Suppose I have a topic with 5 partitions, and partition 0, 1 has lag of 0,
while the other 3 all have lags. In this case, should I best start 3
threads, or 5 threads to read from this topic again to achieve best
performance?
I am currently using 3 threads in this case, but it seems that each thread
still first try to get hold of partition0, 1 first.(which seems unnecessary
in my case)

Another question is that I am currently using a signal thread to spawn
different thread to read from kafka. So if a topic  has 5 partitions, 5
signals will be sent, and 5 different threads will start polling the topic
in the following manner:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(kafkaTopic, new* Integer(1))*;

 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer

  .createMessageStreams(topicCountMap);

 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);

KafkaStream<byte[], byte[]> stream = streams.*get(0);*

 ConsumerIterator<byte[], byte[]> it = stream.iterator();

As you can see, I specify *Integer(1) to ensure there is only one stream in
the polling thread.*

But in my testing, I am using:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

 topicCountMap.put(topic, *new Integer(numOfPartitions));*

 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer

  .createMessageStreams(topicCountMap);

 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

 executor = Executors.newFixedThreadPool(*numOfPartitions*);

 for (final KafkaStream stream : streams) {

 executor.submit(new ConsumerTest(stream, threadNumber,this.targetTopic));

 threadNumber++;

 }

Are these  two  methods fundamentally the same, or the later one is
preferred?

Thanks much!

Chen

Re: How many threads should I use per topic

Posted by Joel Koshy <jj...@gmail.com>.
On Thu, Aug 14, 2014 at 10:39:15AM -0700, Chen Wang wrote:
> Suppose I have a topic with 5 partitions, and partition 0, 1 has lag of 0,
> while the other 3 all have lags. In this case, should I best start 3
> threads, or 5 threads to read from this topic again to achieve best
> performance?
> I am currently using 3 threads in this case, but it seems that each thread
> still first try to get hold of partition0, 1 first.(which seems unnecessary
> in my case)

The default partition assignment is described here:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-CanIpredicttheresultsoftheconsumerrebalance?

I'm not sure what you mean by each thread tries to get hold of
partition 0, 1 first - by threads you mean stream count on a single
consumer instance right? Since these threads are in the same consumer
group, there should be a uniform spread across the streams of that
instance. i.e., if there are five partitions and five threads on one
consumer instance that would give you the most parallelism (provided
you have enough CPU cores to support that).


Also, wrt your question below:
> Are these  two  methods fundamentally the same, or the later one is
> preferred?

You mean 'n' consumer instances (under the same group) with one stream
each vs one consumer instance with 'n' streams. If so, the latter is
preferred and easier to work with. You would use the former approach
when the consumer instances are on different machines.

Joel

> 
> Another question is that I am currently using a signal thread to spawn
> different thread to read from kafka. So if a topic  has 5 partitions, 5
> signals will be sent, and 5 different threads will start polling the topic
> in the following manner:
> 
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> 
> topicCountMap.put(kafkaTopic, new* Integer(1))*;
> 
>  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
> 
>   .createMessageStreams(topicCountMap);
> 
>  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);
> 
> KafkaStream<byte[], byte[]> stream = streams.*get(0);*
> 
>  ConsumerIterator<byte[], byte[]> it = stream.iterator();
> 
> As you can see, I specify *Integer(1) to ensure there is only one stream in
> the polling thread.*
> 
> But in my testing, I am using:
> 
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> 
>  topicCountMap.put(topic, *new Integer(numOfPartitions));*
> 
>  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
> 
>   .createMessageStreams(topicCountMap);
> 
>  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
> 
>  executor = Executors.newFixedThreadPool(*numOfPartitions*);
> 
>  for (final KafkaStream stream : streams) {
> 
>  executor.submit(new ConsumerTest(stream, threadNumber,this.targetTopic));
> 
>  threadNumber++;
> 
>  }
> 
> Are these  two  methods fundamentally the same, or the later one is
> preferred?
> 
> Thanks much!
> 
> Chen