You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Cj <cj...@gmail.com> on 2015/02/08 16:39:36 UTC

Poor performance consuming multiple topics


Hi Kafka team,

We have a use case where we need to consume from ~20 topics (each with 24 partitions), we have a potential max message size of 20MB so we've set our consumer fetch.size to 20MB but that's causing very poor performance on our consumer (most of our messages are in the 10-100k range). Is it possible to set the fetch size to a lower number than the max message size and gracefully handle larger messages (as a trapped exception for example) in order to improve our throughput?

Thank you in advance for your help
CJ Woolard

Re: Poor performance consuming multiple topics

Posted by CJ Woolard <cw...@channeliq.com>.
Thanks again for the help. As a final follow-up we added a  fetch.wait.max.ms of 10ms and that got us to a throughput of about 800kb/sec reading from the 20 topics, which is still a bit lower than I'd originally hoped, but should be sufficient for our use case in the near term. Thank you for all of your input, it's been very helpful for us.

- CJ

> On Feb 10, 2015, at 12:10 PM, "Guozhang Wang" <wa...@gmail.com> wrote:
> 
> CJ,
> 
> On the consumer side there is another two configs named "fetch.min.bytes"
> and "fetch.wait.max.ms":
> 
> http://kafka.apache.org/documentation.html#consumerconfigs
> 
> They controls how long the fetch request will wait on data.
> 
> What are the values on your consumers? You may want to try tuning these two
> configs as well.
> 
> Guozhang
> 
>> On Mon, Feb 9, 2015 at 9:48 AM, CJ Woolard <cw...@channeliq.com> wrote:
>> 
>> Thank you for your help, and I apologize for not adding sufficient detail
>> in my original question. To elaborate on our use case we are trying to
>> create a system tracing/monitoring app (which is of high importance to our
>> business) where we are trying to read messages from all of our Kafka
>> topics. We've been profiling a handful of runs with varying permutations of
>> Kafka config settings, and are struggling to find a combination that gives
>> us decent throughput. We're specifically trying to have one application
>> consume from ~20 topics (which may increase over time), some of the topics
>> have several million messages while some are currently empty. The behavior
>> we're seeing (regardless of the various config settings we've tried) is
>> that the consumer will take about 5 to 10 minutes creating it's streams,
>> after which point it appears to pull around ~20,000 messages at a decent
>> rate, and then it starts to throw consumer timeout exceptions for several
>> minutes. (Depending on the settings we choose it then may repeat that loop
>> where it again will pull a batch of messages at a decent rate and then
>> stall again on more timeouts). Our max message size is 20MB, so we set our
>> "fetch.message.max.bytes" to 20MB. We then set our "consumer.timeout.ms"
>> to "60000", so that reading the 20MB doesn't timeout, however that appears
>> to block threads for those topics which are empty (If we set it too high it
>> appears to block threads on the empty topics for a long time, if we set it
>> too low it times out reading large messages). We've tried increasing the
>> number of streams (to 20 for example), tried increasing the number of
>> thread pool consumers (to 20 for example) and tried increasing the number
>> of consumer fetchers (to 4 for example, although in profiling we appear to
>> get x2 the number of threads that we specify in config for what it's
>> worth), but have yet to find a combination of settings that works for us.
>> Again any direction here would be greatly appreciated.
>> 
>> Here is an example of a consumer config we've tried (again we've tried
>> several combinations in testing):
>> 
>> "group.id" -> Settings.Kafka.ConsumerGroupId,
>> "zookeeper.connect" -> Settings.Kafka.ZkConnectionString,
>> "num.consumer.fetchers" -> "4",
>> "consumer.timeout.ms" -> "60000",
>> "auto.offset.reset" -> "smallest",
>> "fetch.message.max.bytes" -> "20000000"
>> 
>> In terms of our code we've tried both the whitelist overload:
>> 
>> val numberOfStreams = 4    // (we've varied this number)
>> val filter = topics.mkString("|")
>> val topicFilter = new Whitelist(filter)
>> connector.createMessageStreamsByFilter(topicFilter, numberOfStreams,
>> decoder, decoder)
>> 
>> And the topic map overload:
>> 
>> private def createMapOfStreams(topics:Seq[String], numberOfPartitions:Int)
>> = {
>>    val connector = createKafkaConnector()
>>    val decoder = new StringDecoder()
>>    val topicsMap = topics.map(topic=>topic->numberOfPartitions).toMap
>>    connector.createMessageStreams(topicsMap, decoder, decoder)
>>  }
>> 
>> And our broker settings:
>> 
>> ############################# Socket Server Settings
>> #############################
>> 
>> # The port the socket server listens on
>> port=9092
>> 
>> # The number of threads handling network requests
>> num.network.threads=2
>> 
>> # The number of threads doing disk I/O
>> num.io.threads=8
>> 
>> # The send buffer (SO_SNDBUF) used by the socket server
>> socket.send.buffer.bytes=1048576
>> 
>> # The receive buffer (SO_RCVBUF) used by the socket server
>> socket.receive.buffer.bytes=1048576
>> 
>> # The maximum size of a request that the socket server will accept
>> (protection against OOM)
>> socket.request.max.bytes=104857600
>> 
>> 
>> # Hostname the broker will bind to. If not set, the server will bind to
>> all interfaces
>> #host.name=kafka-1
>> 
>> # Hostname the broker will advertise to producers and consumers. If not
>> set, it uses the
>> # value for "host.name" if configured.  Otherwise, it will use the value
>> returned from
>> # java.net.InetAddress.getCanonicalHostName().
>> advertised.host.name=kafka-1.qa.ciq-internal.net
>> 
>> # The port to publish to ZooKeeper for clients to use. If this is not set,
>> # it will publish the same port that the broker binds to.
>> advertised.port=9092
>> 
>> 
>> Thank you again for your help.
>> CJ
>> 
>> 
>> 
>> 
>> ____________________________________________________________________________________________________________________________________________________________________
>> 
>> 
>> ________________________________________
>> From: Guozhang Wang <wa...@gmail.com>
>> Sent: Monday, February 9, 2015 10:38 AM
>> To: users@kafka.apache.org
>> Subject: Re: Poor performance consuming multiple topics
>> 
>> Hello CJ,
>> 
>> You have to set the fetch size to be >=  the maximum message size possible,
>> otherwise the consumption will block upon encountering these large
>> messages.
>> 
>> I am wondering by saying "poor performance" what do you mean exactly? Are
>> you seeing low throughput, and can you share your consumer config values?
>> 
>> Guozhang
>> 
>> 
>>> On Sun, Feb 8, 2015 at 7:39 AM, Cj <cj...@gmail.com> wrote:
>>> 
>>> 
>>> 
>>> Hi Kafka team,
>>> 
>>> We have a use case where we need to consume from ~20 topics (each with 24
>>> partitions), we have a potential max message size of 20MB so we've set
>> our
>>> consumer fetch.size to 20MB but that's causing very poor performance on
>> our
>>> consumer (most of our messages are in the 10-100k range). Is it possible
>> to
>>> set the fetch size to a lower number than the max message size and
>>> gracefully handle larger messages (as a trapped exception for example) in
>>> order to improve our throughput?
>>> 
>>> Thank you in advance for your help
>>> CJ Woolard
>> 
>> 
>> 
>> 
>> --
>> -- Guozhang
> 
> 
> 
> -- 
> -- Guozhang

Re: Poor performance consuming multiple topics

Posted by Guozhang Wang <wa...@gmail.com>.
CJ,

On the consumer side there is another two configs named "fetch.min.bytes"
and "fetch.wait.max.ms":

http://kafka.apache.org/documentation.html#consumerconfigs

They controls how long the fetch request will wait on data.

What are the values on your consumers? You may want to try tuning these two
configs as well.

Guozhang

On Mon, Feb 9, 2015 at 9:48 AM, CJ Woolard <cw...@channeliq.com> wrote:

> Thank you for your help, and I apologize for not adding sufficient detail
> in my original question. To elaborate on our use case we are trying to
> create a system tracing/monitoring app (which is of high importance to our
> business) where we are trying to read messages from all of our Kafka
> topics. We've been profiling a handful of runs with varying permutations of
> Kafka config settings, and are struggling to find a combination that gives
> us decent throughput. We're specifically trying to have one application
> consume from ~20 topics (which may increase over time), some of the topics
> have several million messages while some are currently empty. The behavior
> we're seeing (regardless of the various config settings we've tried) is
> that the consumer will take about 5 to 10 minutes creating it's streams,
> after which point it appears to pull around ~20,000 messages at a decent
> rate, and then it starts to throw consumer timeout exceptions for several
> minutes. (Depending on the settings we choose it then may repeat that loop
> where it again will pull a batch of messages at a decent rate and then
> stall again on more timeouts). Our max message size is 20MB, so we set our
> "fetch.message.max.bytes" to 20MB. We then set our "consumer.timeout.ms"
> to "60000", so that reading the 20MB doesn't timeout, however that appears
> to block threads for those topics which are empty (If we set it too high it
> appears to block threads on the empty topics for a long time, if we set it
> too low it times out reading large messages). We've tried increasing the
> number of streams (to 20 for example), tried increasing the number of
> thread pool consumers (to 20 for example) and tried increasing the number
> of consumer fetchers (to 4 for example, although in profiling we appear to
> get x2 the number of threads that we specify in config for what it's
> worth), but have yet to find a combination of settings that works for us.
> Again any direction here would be greatly appreciated.
>
> Here is an example of a consumer config we've tried (again we've tried
> several combinations in testing):
>
>  "group.id" -> Settings.Kafka.ConsumerGroupId,
>  "zookeeper.connect" -> Settings.Kafka.ZkConnectionString,
> "num.consumer.fetchers" -> "4",
>  "consumer.timeout.ms" -> "60000",
>  "auto.offset.reset" -> "smallest",
>  "fetch.message.max.bytes" -> "20000000"
>
> In terms of our code we've tried both the whitelist overload:
>
> val numberOfStreams = 4    // (we've varied this number)
> val filter = topics.mkString("|")
> val topicFilter = new Whitelist(filter)
> connector.createMessageStreamsByFilter(topicFilter, numberOfStreams,
> decoder, decoder)
>
> And the topic map overload:
>
> private def createMapOfStreams(topics:Seq[String], numberOfPartitions:Int)
> = {
>     val connector = createKafkaConnector()
>     val decoder = new StringDecoder()
>     val topicsMap = topics.map(topic=>topic->numberOfPartitions).toMap
>     connector.createMessageStreams(topicsMap, decoder, decoder)
>   }
>
> And our broker settings:
>
> ############################# Socket Server Settings
> #############################
>
> # The port the socket server listens on
> port=9092
>
> # The number of threads handling network requests
> num.network.threads=2
>
> # The number of threads doing disk I/O
> num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> socket.send.buffer.bytes=1048576
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> socket.receive.buffer.bytes=1048576
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> socket.request.max.bytes=104857600
>
>
> # Hostname the broker will bind to. If not set, the server will bind to
> all interfaces
> #host.name=kafka-1
>
> # Hostname the broker will advertise to producers and consumers. If not
> set, it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> advertised.host.name=kafka-1.qa.ciq-internal.net
>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> advertised.port=9092
>
>
> Thank you again for your help.
> CJ
>
>
>
>
> ____________________________________________________________________________________________________________________________________________________________________
>
>
> ________________________________________
> From: Guozhang Wang <wa...@gmail.com>
> Sent: Monday, February 9, 2015 10:38 AM
> To: users@kafka.apache.org
> Subject: Re: Poor performance consuming multiple topics
>
> Hello CJ,
>
> You have to set the fetch size to be >=  the maximum message size possible,
> otherwise the consumption will block upon encountering these large
> messages.
>
> I am wondering by saying "poor performance" what do you mean exactly? Are
> you seeing low throughput, and can you share your consumer config values?
>
> Guozhang
>
>
> On Sun, Feb 8, 2015 at 7:39 AM, Cj <cj...@gmail.com> wrote:
>
> >
> >
> > Hi Kafka team,
> >
> > We have a use case where we need to consume from ~20 topics (each with 24
> > partitions), we have a potential max message size of 20MB so we've set
> our
> > consumer fetch.size to 20MB but that's causing very poor performance on
> our
> > consumer (most of our messages are in the 10-100k range). Is it possible
> to
> > set the fetch size to a lower number than the max message size and
> > gracefully handle larger messages (as a trapped exception for example) in
> > order to improve our throughput?
> >
> > Thank you in advance for your help
> > CJ Woolard
>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Poor performance consuming multiple topics

Posted by CJ Woolard <cw...@channeliq.com>.
Thank you for your help, and I apologize for not adding sufficient detail in my original question. To elaborate on our use case we are trying to create a system tracing/monitoring app (which is of high importance to our business) where we are trying to read messages from all of our Kafka topics. We've been profiling a handful of runs with varying permutations of Kafka config settings, and are struggling to find a combination that gives us decent throughput. We're specifically trying to have one application consume from ~20 topics (which may increase over time), some of the topics have several million messages while some are currently empty. The behavior we're seeing (regardless of the various config settings we've tried) is that the consumer will take about 5 to 10 minutes creating it's streams, after which point it appears to pull around ~20,000 messages at a decent rate, and then it starts to throw consumer timeout exceptions for several minutes. (Depending on the settings we choose it then may repeat that loop where it again will pull a batch of messages at a decent rate and then stall again on more timeouts). Our max message size is 20MB, so we set our "fetch.message.max.bytes" to 20MB. We then set our "consumer.timeout.ms" to "60000", so that reading the 20MB doesn't timeout, however that appears to block threads for those topics which are empty (If we set it too high it appears to block threads on the empty topics for a long time, if we set it too low it times out reading large messages). We've tried increasing the number of streams (to 20 for example), tried increasing the number of thread pool consumers (to 20 for example) and tried increasing the number of consumer fetchers (to 4 for example, although in profiling we appear to get x2 the number of threads that we specify in config for what it's worth), but have yet to find a combination of settings that works for us. Again any direction here would be greatly appreciated. 

Here is an example of a consumer config we've tried (again we've tried several combinations in testing):

 "group.id" -> Settings.Kafka.ConsumerGroupId,
 "zookeeper.connect" -> Settings.Kafka.ZkConnectionString,
"num.consumer.fetchers" -> "4",
 "consumer.timeout.ms" -> "60000",
 "auto.offset.reset" -> "smallest",
 "fetch.message.max.bytes" -> "20000000"

In terms of our code we've tried both the whitelist overload:

val numberOfStreams = 4    // (we've varied this number)
val filter = topics.mkString("|")
val topicFilter = new Whitelist(filter)
connector.createMessageStreamsByFilter(topicFilter, numberOfStreams, decoder, decoder)

And the topic map overload:

private def createMapOfStreams(topics:Seq[String], numberOfPartitions:Int) = {
    val connector = createKafkaConnector()
    val decoder = new StringDecoder()
    val topicsMap = topics.map(topic=>topic->numberOfPartitions).toMap
    connector.createMessageStreams(topicsMap, decoder, decoder)
  }

And our broker settings:

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# The number of threads handling network requests
num.network.threads=2
 
# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=kafka-1

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=kafka-1.qa.ciq-internal.net

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092


Thank you again for your help.
CJ



____________________________________________________________________________________________________________________________________________________________________


________________________________________
From: Guozhang Wang <wa...@gmail.com>
Sent: Monday, February 9, 2015 10:38 AM
To: users@kafka.apache.org
Subject: Re: Poor performance consuming multiple topics

Hello CJ,

You have to set the fetch size to be >=  the maximum message size possible,
otherwise the consumption will block upon encountering these large messages.

I am wondering by saying "poor performance" what do you mean exactly? Are
you seeing low throughput, and can you share your consumer config values?

Guozhang


On Sun, Feb 8, 2015 at 7:39 AM, Cj <cj...@gmail.com> wrote:

>
>
> Hi Kafka team,
>
> We have a use case where we need to consume from ~20 topics (each with 24
> partitions), we have a potential max message size of 20MB so we've set our
> consumer fetch.size to 20MB but that's causing very poor performance on our
> consumer (most of our messages are in the 10-100k range). Is it possible to
> set the fetch size to a lower number than the max message size and
> gracefully handle larger messages (as a trapped exception for example) in
> order to improve our throughput?
>
> Thank you in advance for your help
> CJ Woolard




--
-- Guozhang

Re: Poor performance consuming multiple topics

Posted by Guozhang Wang <wa...@gmail.com>.
Hello CJ,

You have to set the fetch size to be >=  the maximum message size possible,
otherwise the consumption will block upon encountering these large messages.

I am wondering by saying "poor performance" what do you mean exactly? Are
you seeing low throughput, and can you share your consumer config values?

Guozhang


On Sun, Feb 8, 2015 at 7:39 AM, Cj <cj...@gmail.com> wrote:

>
>
> Hi Kafka team,
>
> We have a use case where we need to consume from ~20 topics (each with 24
> partitions), we have a potential max message size of 20MB so we've set our
> consumer fetch.size to 20MB but that's causing very poor performance on our
> consumer (most of our messages are in the 10-100k range). Is it possible to
> set the fetch size to a lower number than the max message size and
> gracefully handle larger messages (as a trapped exception for example) in
> order to improve our throughput?
>
> Thank you in advance for your help
> CJ Woolard




-- 
-- Guozhang

Re: Poor performance consuming multiple topics

Posted by dinesh kumar <di...@gmail.com>.
Hi CJ,
I recently ran into some kafka message size related issue and did some
digging around to understand the system. I will put those details in brief
and hope it will help you.

Each consumer connector has fetcher threads and fetcher manager threads
associated with it. The Fetcher thread talks to the Kafka brokers  and get
the data for the consumer. The fetcher thread get the partition information
after the re-balance operation. So say each consumer owns N partitions in a
topic, and M (M< N) partitions are in Broker i (Broker i is the leader of
these partitions) , the fetcher thread sends a request to Broker i for the
data. Kafka Protocol is in designed such that the maximum amount of data
transferred to one client in a single request should be less that 2GB (2GB
also includes the protocol overhead but they are only a few bytes and can
be ignored for now).

The data requested by fetcher thread is in unit of chunks per partition.
Each chunk is of the size of *fetch.message.max.bytes* a parameter in the
consumer configuration. Each chunk can have many messages in them. Also if
there is a very large message of say 200 MB that needs to be consumed, then
the fetch.message.max.bytes should be at least 200MB as in-complete
messages are not allowed (ie., one large message cannot be broken into
multiple pieces and transferred to the client)


The request for data made by the fetcher threads are in chunks of
fetch.message.max.bytes
and since they are per partition, it is very easy to run into a situation
where the total amount of data requested by the fetcher thread crosses 2GB.
This results in a situation where the consumer gets no data from the broker.

The data transferred from the broker is put in a Blocking Queue
<http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html>.
The consumer thread will be blocked on the queue till the fetcher thread
puts some data in the queue. If you specify a timeout (consumer.timeout.ms)
during initialization of the client,  the consumer thread will wait for a
maximum of  consumer.timeout.ms for the data and will throw a Timeout
Exception. If consumer.timeout.ms is -1 (default value) then the consumer
thread will be blocked till the fetcher queues some data.

Kafka supports compression. Compression happens in the producer end and it
is decompressed by the consumer. The decompression happens only when the
message is processed by the consumer thread and not while getting added to
the Blocking queue (ie., decompression is done by the consumer thread and
not by fetcher thread).

So the *fetch.message.max.bytes *should be the maximum message size after
compression. So to circumvent the limitation in Kafka protocol of 2GB per
request, we can use kafka compression.


So to summarize and to answer your question, there is no way to get a large
message with a small *fetch.message.max.bytes.*


Thanks,

Dinesh

On 8 February 2015 at 21:09, Cj <cj...@gmail.com> wrote:

>
>
> Hi Kafka team,
>
> We have a use case where we need to consume from ~20 topics (each with 24
> partitions), we have a potential max message size of 20MB so we've set our
> consumer fetch.size to 20MB but that's causing very poor performance on our
> consumer (most of our messages are in the 10-100k range). Is it possible to
> set the fetch size to a lower number than the max message size and
> gracefully handle larger messages (as a trapped exception for example) in
> order to improve our throughput?
>
> Thank you in advance for your help
> CJ Woolard