You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by tao xiao <xi...@gmail.com> on 2015/03/06 10:23:07 UTC

How does num.consumer.fetchers get used

Hi team,

After reading the source code of AbstractFetcherManager I found out that
the usage of num.consumer.fetchers may not match what is described in the
Kafka doc. My interpretation of the Kafka doc is that  the number of
fetcher threads is controlled by the value of
 property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are
4 fetcher threads in total created after consumer is initialized.

But what I found from the source code tells me a different thing. Below
code is copied from AbstractFetcherManager

private def getFetcherId(topic: String, partitionId: Int) : Int = {

    Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

  }


def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
BrokerAndInitialOffset]) {

    mapLock synchronized {

      val partitionsPerFetcher = partitionAndOffsets.groupBy{
case(topicAndPartition,
brokerAndInitialOffset) =>

        BrokerAndFetcherId(brokerAndInitialOffset.broker,
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}

      for ((brokerAndFetcherId, partitionAndOffsets) <-
partitionsPerFetcher) {

        var fetcherThread: AbstractFetcherThread = null

        fetcherThreadMap.get(brokerAndFetcherId) match {

          case Some(f) => fetcherThread = f

          case None =>

            fetcherThread =
createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)

            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)

            fetcherThread.start

        }



fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
{ case (topicAndPartition, brokerAndInitOffset) =>

          topicAndPartition -> brokerAndInitOffset.initOffset

        })

      }

    }

 If I have one topic with one partition and num.consumer.fetchers set to 4
there is actually only one fetcher thread created not 4.
num.consumer.fetchers essentially set the max value of number of fetcher
threads not the actual number of fetcher threads. The actual number of
fetcher threads is controlled by this line of code
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

Is my assumption correct?

-- 
Regards,
Tao

Re: How does num.consumer.fetchers get used

Posted by tao xiao <xi...@gmail.com>.
Created https://issues.apache.org/jira/browse/KAFKA-2008

On Sat, Mar 7, 2015 at 1:17 AM, Jiangjie Qin <jq...@linkedin.com.invalid>
wrote:

> Hi Tao,
>
> Yes, your understanding is correct. We probably should update the document
> to make it more clear. Could you open a ticket for it?
>
> Jiangjie (Becket) Qin
>
> On 3/6/15, 1:23 AM, "tao xiao" <xi...@gmail.com> wrote:
>
> >Hi team,
> >
> >After reading the source code of AbstractFetcherManager I found out that
> >the usage of num.consumer.fetchers may not match what is described in the
> >Kafka doc. My interpretation of the Kafka doc is that  the number of
> >fetcher threads is controlled by the value of
> > property num.consumer.fetchers. If I set num.consumer.fetchers=4 there
> >are
> >4 fetcher threads in total created after consumer is initialized.
> >
> >But what I found from the source code tells me a different thing. Below
> >code is copied from AbstractFetcherManager
> >
> >private def getFetcherId(topic: String, partitionId: Int) : Int = {
> >
> >    Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
> >
> >  }
> >
> >
> >def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
> >BrokerAndInitialOffset]) {
> >
> >    mapLock synchronized {
> >
> >      val partitionsPerFetcher = partitionAndOffsets.groupBy{
> >case(topicAndPartition,
> >brokerAndInitialOffset) =>
> >
> >        BrokerAndFetcherId(brokerAndInitialOffset.broker,
> >getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
> >
> >      for ((brokerAndFetcherId, partitionAndOffsets) <-
> >partitionsPerFetcher) {
> >
> >        var fetcherThread: AbstractFetcherThread = null
> >
> >        fetcherThreadMap.get(brokerAndFetcherId) match {
> >
> >          case Some(f) => fetcherThread = f
> >
> >          case None =>
> >
> >            fetcherThread =
> >createFetcherThread(brokerAndFetcherId.fetcherId,
> >brokerAndFetcherId.broker)
> >
> >            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
> >
> >            fetcherThread.start
> >
> >        }
> >
> >
> >
> >fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
> >{ case (topicAndPartition, brokerAndInitOffset) =>
> >
> >          topicAndPartition -> brokerAndInitOffset.initOffset
> >
> >        })
> >
> >      }
> >
> >    }
> >
> > If I have one topic with one partition and num.consumer.fetchers set to 4
> >there is actually only one fetcher thread created not 4.
> >num.consumer.fetchers essentially set the max value of number of fetcher
> >threads not the actual number of fetcher threads. The actual number of
> >fetcher threads is controlled by this line of code
> >Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
> >
> >Is my assumption correct?
> >
> >--
> >Regards,
> >Tao
>
>


-- 
Regards,
Tao

Re: How does num.consumer.fetchers get used

Posted by Jiangjie Qin <jq...@linkedin.com.INVALID>.
Hi Tao,

Yes, your understanding is correct. We probably should update the document
to make it more clear. Could you open a ticket for it?

Jiangjie (Becket) Qin

On 3/6/15, 1:23 AM, "tao xiao" <xi...@gmail.com> wrote:

>Hi team,
>
>After reading the source code of AbstractFetcherManager I found out that
>the usage of num.consumer.fetchers may not match what is described in the
>Kafka doc. My interpretation of the Kafka doc is that  the number of
>fetcher threads is controlled by the value of
> property num.consumer.fetchers. If I set num.consumer.fetchers=4 there
>are
>4 fetcher threads in total created after consumer is initialized.
>
>But what I found from the source code tells me a different thing. Below
>code is copied from AbstractFetcherManager
>
>private def getFetcherId(topic: String, partitionId: Int) : Int = {
>
>    Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>
>  }
>
>
>def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
>BrokerAndInitialOffset]) {
>
>    mapLock synchronized {
>
>      val partitionsPerFetcher = partitionAndOffsets.groupBy{
>case(topicAndPartition,
>brokerAndInitialOffset) =>
>
>        BrokerAndFetcherId(brokerAndInitialOffset.broker,
>getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
>
>      for ((brokerAndFetcherId, partitionAndOffsets) <-
>partitionsPerFetcher) {
>
>        var fetcherThread: AbstractFetcherThread = null
>
>        fetcherThreadMap.get(brokerAndFetcherId) match {
>
>          case Some(f) => fetcherThread = f
>
>          case None =>
>
>            fetcherThread =
>createFetcherThread(brokerAndFetcherId.fetcherId,
>brokerAndFetcherId.broker)
>
>            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
>
>            fetcherThread.start
>
>        }
>
>
>
>fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
>{ case (topicAndPartition, brokerAndInitOffset) =>
>
>          topicAndPartition -> brokerAndInitOffset.initOffset
>
>        })
>
>      }
>
>    }
>
> If I have one topic with one partition and num.consumer.fetchers set to 4
>there is actually only one fetcher thread created not 4.
>num.consumer.fetchers essentially set the max value of number of fetcher
>threads not the actual number of fetcher threads. The actual number of
>fetcher threads is controlled by this line of code
>Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>
>Is my assumption correct?
>
>-- 
>Regards,
>Tao