You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by tao xiao <xi...@gmail.com> on 2015/03/06 10:23:07 UTC
How does num.consumer.fetchers get used
Hi team,
After reading the source code of AbstractFetcherManager I found out that
the usage of num.consumer.fetchers may not match what is described in the
Kafka doc. My interpretation of the Kafka doc is that the number of
fetcher threads is controlled by the value of
property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are
4 fetcher threads in total created after consumer is initialized.
But what I found from the source code tells me a different thing. Below
code is copied from AbstractFetcherManager
private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
}
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
BrokerAndInitialOffset]) {
mapLock synchronized {
val partitionsPerFetcher = partitionAndOffsets.groupBy{
case(topicAndPartition,
brokerAndInitialOffset) =>
BrokerAndFetcherId(brokerAndInitialOffset.broker,
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
for ((brokerAndFetcherId, partitionAndOffsets) <-
partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None =>
fetcherThread =
createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start
}
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
{ case (topicAndPartition, brokerAndInitOffset) =>
topicAndPartition -> brokerAndInitOffset.initOffset
})
}
}
If I have one topic with one partition and num.consumer.fetchers set to 4
there is actually only one fetcher thread created not 4.
num.consumer.fetchers essentially set the max value of number of fetcher
threads not the actual number of fetcher threads. The actual number of
fetcher threads is controlled by this line of code
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
Is my assumption correct?
--
Regards,
Tao
Re: How does num.consumer.fetchers get used
Posted by tao xiao <xi...@gmail.com>.
Created https://issues.apache.org/jira/browse/KAFKA-2008
On Sat, Mar 7, 2015 at 1:17 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:
> Hi Tao,
>
> Yes, your understanding is correct. We probably should update the document
> to make it more clear. Could you open a ticket for it?
>
> Jiangjie (Becket) Qin
>
> On 3/6/15, 1:23 AM, "tao xiao" <xi...@gmail.com> wrote:
>
> >Hi team,
> >
> >After reading the source code of AbstractFetcherManager I found out that
> >the usage of num.consumer.fetchers may not match what is described in the
> >Kafka doc. My interpretation of the Kafka doc is that the number of
> >fetcher threads is controlled by the value of
> > property num.consumer.fetchers. If I set num.consumer.fetchers=4 there
> >are
> >4 fetcher threads in total created after consumer is initialized.
> >
> >But what I found from the source code tells me a different thing. Below
> >code is copied from AbstractFetcherManager
> >
> >private def getFetcherId(topic: String, partitionId: Int) : Int = {
> >
> > Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
> >
> > }
> >
> >
> >def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
> >BrokerAndInitialOffset]) {
> >
> > mapLock synchronized {
> >
> > val partitionsPerFetcher = partitionAndOffsets.groupBy{
> >case(topicAndPartition,
> >brokerAndInitialOffset) =>
> >
> > BrokerAndFetcherId(brokerAndInitialOffset.broker,
> >getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
> >
> > for ((brokerAndFetcherId, partitionAndOffsets) <-
> >partitionsPerFetcher) {
> >
> > var fetcherThread: AbstractFetcherThread = null
> >
> > fetcherThreadMap.get(brokerAndFetcherId) match {
> >
> > case Some(f) => fetcherThread = f
> >
> > case None =>
> >
> > fetcherThread =
> >createFetcherThread(brokerAndFetcherId.fetcherId,
> >brokerAndFetcherId.broker)
> >
> > fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
> >
> > fetcherThread.start
> >
> > }
> >
> >
> >
> >fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
> >{ case (topicAndPartition, brokerAndInitOffset) =>
> >
> > topicAndPartition -> brokerAndInitOffset.initOffset
> >
> > })
> >
> > }
> >
> > }
> >
> > If I have one topic with one partition and num.consumer.fetchers set to 4
> >there is actually only one fetcher thread created not 4.
> >num.consumer.fetchers essentially set the max value of number of fetcher
> >threads not the actual number of fetcher threads. The actual number of
> >fetcher threads is controlled by this line of code
> >Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
> >
> >Is my assumption correct?
> >
> >--
> >Regards,
> >Tao
>
>
--
Regards,
Tao
Re: How does num.consumer.fetchers get used
Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Hi Tao,
Yes, your understanding is correct. We probably should update the document
to make it more clear. Could you open a ticket for it?
Jiangjie (Becket) Qin
On 3/6/15, 1:23 AM, "tao xiao" <xi...@gmail.com> wrote:
>Hi team,
>
>After reading the source code of AbstractFetcherManager I found out that
>the usage of num.consumer.fetchers may not match what is described in the
>Kafka doc. My interpretation of the Kafka doc is that the number of
>fetcher threads is controlled by the value of
> property num.consumer.fetchers. If I set num.consumer.fetchers=4 there
>are
>4 fetcher threads in total created after consumer is initialized.
>
>But what I found from the source code tells me a different thing. Below
>code is copied from AbstractFetcherManager
>
>private def getFetcherId(topic: String, partitionId: Int) : Int = {
>
> Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>
> }
>
>
>def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
>BrokerAndInitialOffset]) {
>
> mapLock synchronized {
>
> val partitionsPerFetcher = partitionAndOffsets.groupBy{
>case(topicAndPartition,
>brokerAndInitialOffset) =>
>
> BrokerAndFetcherId(brokerAndInitialOffset.broker,
>getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
>
> for ((brokerAndFetcherId, partitionAndOffsets) <-
>partitionsPerFetcher) {
>
> var fetcherThread: AbstractFetcherThread = null
>
> fetcherThreadMap.get(brokerAndFetcherId) match {
>
> case Some(f) => fetcherThread = f
>
> case None =>
>
> fetcherThread =
>createFetcherThread(brokerAndFetcherId.fetcherId,
>brokerAndFetcherId.broker)
>
> fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
>
> fetcherThread.start
>
> }
>
>
>
>fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
>{ case (topicAndPartition, brokerAndInitOffset) =>
>
> topicAndPartition -> brokerAndInitOffset.initOffset
>
> })
>
> }
>
> }
>
> If I have one topic with one partition and num.consumer.fetchers set to 4
>there is actually only one fetcher thread created not 4.
>num.consumer.fetchers essentially set the max value of number of fetcher
>threads not the actual number of fetcher threads. The actual number of
>fetcher threads is controlled by this line of code
>Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>
>Is my assumption correct?
>
>--
>Regards,
>Tao