You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rajiv Kurian <ra...@signalfx.com> on 2016/01/26 06:20:14 UTC

Getting very poor performance from the new Kafka consumer

We are using the new kafka consumer with the following config (as logged by
kafka)

metric.reporters = []

        metadata.max.age.ms = 300000

        value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer

        group.id = myGroup.id

        partition.assignment.strategy = [org.apache.kafka.clients.consumer
.RangeAssignor]

        reconnect.backoff.ms = 50

        sasl.kerberos.ticket.renew.window.factor = 0.8

        max.partition.fetch.bytes = 2097152

        bootstrap.servers = [myBrokerList]

        retry.backoff.ms = 100

        sasl.kerberos.kinit.cmd = /usr/bin/kinit

        sasl.kerberos.service.name = null

        sasl.kerberos.ticket.renew.jitter = 0.05

        ssl.keystore.type = JKS

        ssl.trustmanager.algorithm = PKIX

        enable.auto.commit = false

        ssl.key.password = null

        fetch.max.wait.ms = 1000

        sasl.kerberos.min.time.before.relogin = 60000

        connections.max.idle.ms = 540000

        ssl.truststore.password = null

        session.timeout.ms = 30000

        metrics.num.samples = 2

        client.id =

        ssl.endpoint.identification.algorithm = null

        key.deserializer = class sf.kafka.VoidDeserializer

        ssl.protocol = TLS

        check.crcs = true

        request.timeout.ms = 40000

        ssl.provider = null

        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

        ssl.keystore.location = null

        heartbeat.interval.ms = 3000

        auto.commit.interval.ms = 5000

        receive.buffer.bytes = 32768

        ssl.cipher.suites = null

        ssl.truststore.type = JKS

        security.protocol = PLAINTEXT

        ssl.truststore.location = null

        ssl.keystore.password = null

        ssl.keymanager.algorithm = SunX509

        metrics.sample.window.ms = 30000

        fetch.min.bytes = 512

        send.buffer.bytes = 131072

        auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and
call poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get
byte arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this
number by diving the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords<Void, byte[]> records = null;

        try {

            // Here timeout is about 10 seconds, so it is pretty big.

            records = consumer.poll(timeout);

        } catch (Exception e) {

            logger.error("Exception polling Kafka ", e);

            records = null;

        }

        if (records != null) {

            for (ConsumerRecord<Void, byte[]> record : records) {

               // The handler puts the byte array on a very fast ring
buffer so it barely takes any time.

                handler.handleMessage(ByteBuffer.wrap(record.value()));

            }

        }

}



With this setup our performance has taken a horrendous hit as soon as we
started this one thread that just polls kafka in a loop.

I profiled the application using Java Mission Control and have a few
insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up
using a lot of CPU for handing such a low number of messages. Our process
was using 16% CPU before we added a single consumer and it went to 25% and
above after. That's an increase of over 50% from a single consumer getting
a single digit number of small messages per second. Here is an attachment
of the cpu usage breakdown in the consumer (the namespace is different
because we shade the kafka jar before using it) - http://imgur.com/tHjdVnM
 We've used bigger timeouts (100 seconds odd) and that doesn't seem to make
much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure
whether this is expected but this seems like it would completely kill
performance. Here is the exception tab of Java mission control.
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3
minutes which is about 10 thousand exceptions per second! The exception
stack trace shows that it originates from the poll call. I don't understand
how it can throw so many exceptions given I call poll it with a timeout of
10 seconds and get messages at about 1 per second.

3. The single thread seems to allocate a lot too. The single thread is
responsible for 17.87% of our entire JVM allocation rate. Most of what it
allocates seems to be those same EOFExceptions. Here is a chart showing the
single thread's allocation proportion: http://imgur.com/GNUJQsz Here is a
chart that shows a breakdown of the allocations: http://imgur.com/YjCXljE
About 20% of the allocations are for the EOFExceptions. This seems kind of
crazy especially given that this happens about 10 thousand times a second.
The rest of the allocations seem to be spread all over but again seem
excessive given how we are getting very few messages.

As a comparison, we also run a wrapper over the old SimpleConsumer that
gets a lot more data (10 -15 thousand 70 byte messages/sec on a different
topic) and it is able to handle that load without much trouble. At this
moment we are completely puzzled by this performance. Most of it does seem
to be due to the crazy volumes of exceptions. Note: Our messages seem to
all be making through. The exceptions are caught by Kafka's stack and never
bubble though to us.

Are we doing anything wrong with how we are using the new consumer (longer
timeouts of a 100 second odd don't seem to help)?

Thanks in advance,

Rajiv

Re: Getting very poor performance from the new Kafka consumer

Posted by Rajiv Kurian <ra...@signalfx.com>.
Hi Jason,

Thanks for investigating. Indeed we do have probably more than the usual
number of partitions. Our use case is such that we have many partitions
(128 - 256) overall but very few messages per second on each partition.

I have created a JIRA at https://issues.apache.org/jira/browse/KAFKA-3159.

Let me know if I can provide more details or help in any other way.

Thanks again,
Rajiv

On Wed, Jan 27, 2016 at 9:27 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Rajiv,
>
> Thanks for the detailed report. Can you go ahead and create a JIRA? I do
> see the exceptions locally, but not nearly at the rate that you're
> reporting. That might be a factor of the number of partitions, so I'll do
> some investigation.
>
> -Jason
>
> On Wed, Jan 27, 2016 at 8:40 AM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > Hi Guozhang,
> >
> > The Github link I pasted was from the 0.9.0 branch. The same line seems
> to
> > be throwing exceptions in my code built of the maven 0.9.0.0 package. Are
> > you saying that something else has changed higher up the call stack that
> > will probably not trigger so many exceptions ?
> >
> > Thanks,
> > Rajiv
> >
> > On Tue, Jan 26, 2016 at 10:44 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Rajiv,
> > >
> > > Could you try to build the new consumer from 0.9.0 branch and see if
> the
> > > issue can be re-produced?
> > >
> > > Guozhang
> > >
> > > On Mon, Jan 25, 2016 at 9:46 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > >
> > > > The exception seems to be thrown here
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236
> > > >
> > > > Is this not expected to hit often?
> > > >
> > > > On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com>
> > > wrote:
> > > >
> > > > > Wanted to add that we are not using auto commit since we use custom
> > > > > partition assignments. In fact we never call
> consumer.commitAsync()
> > or
> > > > > consumer.commitSync() calls. My assumption is that since we store
> our
> > > own
> > > > > offsets these calls are not necessary. Hopefully this is not
> > > responsible
> > > > > for the poor performance.
> > > > >
> > > > > On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com>
> > > > wrote:
> > > > >
> > > > >> We are using the new kafka consumer with the following config (as
> > > logged
> > > > >> by kafka)
> > > > >>
> > > > >> metric.reporters = []
> > > > >>
> > > > >>         metadata.max.age.ms = 300000
> > > > >>
> > > > >>         value.deserializer = class
> > > > >> org.apache.kafka.common.serialization.ByteArrayDeserializer
> > > > >>
> > > > >>         group.id = myGroup.id
> > > > >>
> > > > >>         partition.assignment.strategy = [org.apache.kafka.clients.
> > > > >> consumer.RangeAssignor]
> > > > >>
> > > > >>         reconnect.backoff.ms = 50
> > > > >>
> > > > >>         sasl.kerberos.ticket.renew.window.factor = 0.8
> > > > >>
> > > > >>         max.partition.fetch.bytes = 2097152
> > > > >>
> > > > >>         bootstrap.servers = [myBrokerList]
> > > > >>
> > > > >>         retry.backoff.ms = 100
> > > > >>
> > > > >>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > > > >>
> > > > >>         sasl.kerberos.service.name = null
> > > > >>
> > > > >>         sasl.kerberos.ticket.renew.jitter = 0.05
> > > > >>
> > > > >>         ssl.keystore.type = JKS
> > > > >>
> > > > >>         ssl.trustmanager.algorithm = PKIX
> > > > >>
> > > > >>         enable.auto.commit = false
> > > > >>
> > > > >>         ssl.key.password = null
> > > > >>
> > > > >>         fetch.max.wait.ms = 1000
> > > > >>
> > > > >>         sasl.kerberos.min.time.before.relogin = 60000
> > > > >>
> > > > >>         connections.max.idle.ms = 540000
> > > > >>
> > > > >>         ssl.truststore.password = null
> > > > >>
> > > > >>         session.timeout.ms = 30000
> > > > >>
> > > > >>         metrics.num.samples = 2
> > > > >>
> > > > >>         client.id =
> > > > >>
> > > > >>         ssl.endpoint.identification.algorithm = null
> > > > >>
> > > > >>         key.deserializer = class sf.kafka.VoidDeserializer
> > > > >>
> > > > >>         ssl.protocol = TLS
> > > > >>
> > > > >>         check.crcs = true
> > > > >>
> > > > >>         request.timeout.ms = 40000
> > > > >>
> > > > >>         ssl.provider = null
> > > > >>
> > > > >>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > > > >>
> > > > >>         ssl.keystore.location = null
> > > > >>
> > > > >>         heartbeat.interval.ms = 3000
> > > > >>
> > > > >>         auto.commit.interval.ms = 5000
> > > > >>
> > > > >>         receive.buffer.bytes = 32768
> > > > >>
> > > > >>         ssl.cipher.suites = null
> > > > >>
> > > > >>         ssl.truststore.type = JKS
> > > > >>
> > > > >>         security.protocol = PLAINTEXT
> > > > >>
> > > > >>         ssl.truststore.location = null
> > > > >>
> > > > >>         ssl.keystore.password = null
> > > > >>
> > > > >>         ssl.keymanager.algorithm = SunX509
> > > > >>
> > > > >>         metrics.sample.window.ms = 30000
> > > > >>
> > > > >>         fetch.min.bytes = 512
> > > > >>
> > > > >>         send.buffer.bytes = 131072
> > > > >>
> > > > >>         auto.offset.reset = earliest
> > > > >>
> > > > >>
> > > > >> We use the consumer.assign() feature to assign a list of
> partitions
> > > and
> > > > >> call poll in a loop.  We have the following setup:
> > > > >>
> > > > >> 1. The messages have no key and we use the byte array deserializer
> > to
> > > > get
> > > > >> byte arrays from the config.
> > > > >>
> > > > >> 2. The messages themselves are on an average about 75 bytes. We
> get
> > > this
> > > > >> number by diving the Kafka broker bytes-in metric by the
> messages-in
> > > > metric.
> > > > >>
> > > > >> 3. Each consumer is assigned about 64 partitions of the same topic
> > > > spread
> > > > >> across three brokers.
> > > > >>
> > > > >> 4. We get very few messages per second maybe around 1-2 messages
> > > across
> > > > >> all partitions on a client right now.
> > > > >>
> > > > >> 5. We have no compression on the topic.
> > > > >>
> > > > >> Our run loop looks something like this
> > > > >>
> > > > >> while (isRunning()) {
> > > > >>
> > > > >> ConsumerRecords<Void, byte[]> records = null;
> > > > >>
> > > > >>         try {
> > > > >>
> > > > >>             // Here timeout is about 10 seconds, so it is pretty
> > big.
> > > > >>
> > > > >>             records = consumer.poll(timeout);
> > > > >>
> > > > >>         } catch (Exception e) {
> > > > >>
> > > > >>             logger.error("Exception polling Kafka ", e);
> > > > >>
> > > > >>             records = null;
> > > > >>
> > > > >>         }
> > > > >>
> > > > >>         if (records != null) {
> > > > >>
> > > > >>             for (ConsumerRecord<Void, byte[]> record : records) {
> > > > >>
> > > > >>                // The handler puts the byte array on a very fast
> > ring
> > > > >> buffer so it barely takes any time.
> > > > >>
> > > > >>
> > >  handler.handleMessage(ByteBuffer.wrap(record.value()));
> > > > >>
> > > > >>             }
> > > > >>
> > > > >>         }
> > > > >>
> > > > >> }
> > > > >>
> > > > >>
> > > > >>
> > > > >> With this setup our performance has taken a horrendous hit as soon
> > as
> > > we
> > > > >> started this one thread that just polls kafka in a loop.
> > > > >>
> > > > >> I profiled the application using Java Mission Control and have a
> few
> > > > >> insights.
> > > > >>
> > > > >> 1. There doesn't seem to be a single hotspot. The consumer just
> ends
> > > up
> > > > >> using a lot of CPU for handing such a low number of messages. Our
> > > > process
> > > > >> was using 16% CPU before we added a single consumer and it went to
> > 25%
> > > > and
> > > > >> above after. That's an increase of over 50% from a single consumer
> > > > getting
> > > > >> a single digit number of small messages per second. Here is an
> > > > attachment
> > > > >> of the cpu usage breakdown in the consumer (the namespace is
> > different
> > > > >> because we shade the kafka jar before using it) -
> > > > >> http://imgur.com/tHjdVnM  We've used bigger timeouts (100 seconds
> > > odd)
> > > > >> and that doesn't seem to make much of a difference either.
> > > > >>
> > > > >> 2. It also seems like Kafka throws a ton of EOFExceptions. I am
> not
> > > sure
> > > > >> whether this is expected but this seems like it would completely
> > kill
> > > > >> performance. Here is the exception tab of Java mission control.
> > > > >> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period
> > of 3
> > > > >> minutes which is about 10 thousand exceptions per second! The
> > > exception
> > > > >> stack trace shows that it originates from the poll call. I don't
> > > > understand
> > > > >> how it can throw so many exceptions given I call poll it with a
> > > timeout
> > > > of
> > > > >> 10 seconds and get messages at about 1 per second.
> > > > >>
> > > > >> 3. The single thread seems to allocate a lot too. The single
> thread
> > is
> > > > >> responsible for 17.87% of our entire JVM allocation rate. Most of
> > what
> > > > it
> > > > >> allocates seems to be those same EOFExceptions. Here is a chart
> > > showing
> > > > the
> > > > >> single thread's allocation proportion: http://imgur.com/GNUJQsz
> > Here
> > > is
> > > > >> a chart that shows a breakdown of the allocations:
> > > > >> http://imgur.com/YjCXljE About 20% of the allocations are for the
> > > > >> EOFExceptions. This seems kind of crazy especially given that this
> > > > happens
> > > > >> about 10 thousand times a second. The rest of the allocations seem
> > to
> > > be
> > > > >> spread all over but again seem excessive given how we are getting
> > very
> > > > few
> > > > >> messages.
> > > > >>
> > > > >> As a comparison, we also run a wrapper over the old SimpleConsumer
> > > that
> > > > >> gets a lot more data (10 -15 thousand 70 byte messages/sec on a
> > > > different
> > > > >> topic) and it is able to handle that load without much trouble. At
> > > this
> > > > >> moment we are completely puzzled by this performance. Most of it
> > does
> > > > seem
> > > > >> to be due to the crazy volumes of exceptions. Note: Our messages
> > seem
> > > to
> > > > >> all be making through. The exceptions are caught by Kafka's stack
> > and
> > > > never
> > > > >> bubble though to us.
> > > > >>
> > > > >> Are we doing anything wrong with how we are using the new consumer
> > > > >> (longer timeouts of a 100 second odd don't seem to help)?
> > > > >>
> > > > >> Thanks in advance,
> > > > >>
> > > > >> Rajiv
> > > > >>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Re: Getting very poor performance from the new Kafka consumer

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Rajiv,

Thanks for the detailed report. Can you go ahead and create a JIRA? I do
see the exceptions locally, but not nearly at the rate that you're
reporting. That might be a factor of the number of partitions, so I'll do
some investigation.

-Jason

On Wed, Jan 27, 2016 at 8:40 AM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Hi Guozhang,
>
> The Github link I pasted was from the 0.9.0 branch. The same line seems to
> be throwing exceptions in my code built of the maven 0.9.0.0 package. Are
> you saying that something else has changed higher up the call stack that
> will probably not trigger so many exceptions ?
>
> Thanks,
> Rajiv
>
> On Tue, Jan 26, 2016 at 10:44 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Rajiv,
> >
> > Could you try to build the new consumer from 0.9.0 branch and see if the
> > issue can be re-produced?
> >
> > Guozhang
> >
> > On Mon, Jan 25, 2016 at 9:46 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >
> > > The exception seems to be thrown here
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236
> > >
> > > Is this not expected to hit often?
> > >
> > > On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > >
> > > > Wanted to add that we are not using auto commit since we use custom
> > > > partition assignments. In fact we never call  consumer.commitAsync()
> or
> > > > consumer.commitSync() calls. My assumption is that since we store our
> > own
> > > > offsets these calls are not necessary. Hopefully this is not
> > responsible
> > > > for the poor performance.
> > > >
> > > > On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com>
> > > wrote:
> > > >
> > > >> We are using the new kafka consumer with the following config (as
> > logged
> > > >> by kafka)
> > > >>
> > > >> metric.reporters = []
> > > >>
> > > >>         metadata.max.age.ms = 300000
> > > >>
> > > >>         value.deserializer = class
> > > >> org.apache.kafka.common.serialization.ByteArrayDeserializer
> > > >>
> > > >>         group.id = myGroup.id
> > > >>
> > > >>         partition.assignment.strategy = [org.apache.kafka.clients.
> > > >> consumer.RangeAssignor]
> > > >>
> > > >>         reconnect.backoff.ms = 50
> > > >>
> > > >>         sasl.kerberos.ticket.renew.window.factor = 0.8
> > > >>
> > > >>         max.partition.fetch.bytes = 2097152
> > > >>
> > > >>         bootstrap.servers = [myBrokerList]
> > > >>
> > > >>         retry.backoff.ms = 100
> > > >>
> > > >>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > > >>
> > > >>         sasl.kerberos.service.name = null
> > > >>
> > > >>         sasl.kerberos.ticket.renew.jitter = 0.05
> > > >>
> > > >>         ssl.keystore.type = JKS
> > > >>
> > > >>         ssl.trustmanager.algorithm = PKIX
> > > >>
> > > >>         enable.auto.commit = false
> > > >>
> > > >>         ssl.key.password = null
> > > >>
> > > >>         fetch.max.wait.ms = 1000
> > > >>
> > > >>         sasl.kerberos.min.time.before.relogin = 60000
> > > >>
> > > >>         connections.max.idle.ms = 540000
> > > >>
> > > >>         ssl.truststore.password = null
> > > >>
> > > >>         session.timeout.ms = 30000
> > > >>
> > > >>         metrics.num.samples = 2
> > > >>
> > > >>         client.id =
> > > >>
> > > >>         ssl.endpoint.identification.algorithm = null
> > > >>
> > > >>         key.deserializer = class sf.kafka.VoidDeserializer
> > > >>
> > > >>         ssl.protocol = TLS
> > > >>
> > > >>         check.crcs = true
> > > >>
> > > >>         request.timeout.ms = 40000
> > > >>
> > > >>         ssl.provider = null
> > > >>
> > > >>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > > >>
> > > >>         ssl.keystore.location = null
> > > >>
> > > >>         heartbeat.interval.ms = 3000
> > > >>
> > > >>         auto.commit.interval.ms = 5000
> > > >>
> > > >>         receive.buffer.bytes = 32768
> > > >>
> > > >>         ssl.cipher.suites = null
> > > >>
> > > >>         ssl.truststore.type = JKS
> > > >>
> > > >>         security.protocol = PLAINTEXT
> > > >>
> > > >>         ssl.truststore.location = null
> > > >>
> > > >>         ssl.keystore.password = null
> > > >>
> > > >>         ssl.keymanager.algorithm = SunX509
> > > >>
> > > >>         metrics.sample.window.ms = 30000
> > > >>
> > > >>         fetch.min.bytes = 512
> > > >>
> > > >>         send.buffer.bytes = 131072
> > > >>
> > > >>         auto.offset.reset = earliest
> > > >>
> > > >>
> > > >> We use the consumer.assign() feature to assign a list of partitions
> > and
> > > >> call poll in a loop.  We have the following setup:
> > > >>
> > > >> 1. The messages have no key and we use the byte array deserializer
> to
> > > get
> > > >> byte arrays from the config.
> > > >>
> > > >> 2. The messages themselves are on an average about 75 bytes. We get
> > this
> > > >> number by diving the Kafka broker bytes-in metric by the messages-in
> > > metric.
> > > >>
> > > >> 3. Each consumer is assigned about 64 partitions of the same topic
> > > spread
> > > >> across three brokers.
> > > >>
> > > >> 4. We get very few messages per second maybe around 1-2 messages
> > across
> > > >> all partitions on a client right now.
> > > >>
> > > >> 5. We have no compression on the topic.
> > > >>
> > > >> Our run loop looks something like this
> > > >>
> > > >> while (isRunning()) {
> > > >>
> > > >> ConsumerRecords<Void, byte[]> records = null;
> > > >>
> > > >>         try {
> > > >>
> > > >>             // Here timeout is about 10 seconds, so it is pretty
> big.
> > > >>
> > > >>             records = consumer.poll(timeout);
> > > >>
> > > >>         } catch (Exception e) {
> > > >>
> > > >>             logger.error("Exception polling Kafka ", e);
> > > >>
> > > >>             records = null;
> > > >>
> > > >>         }
> > > >>
> > > >>         if (records != null) {
> > > >>
> > > >>             for (ConsumerRecord<Void, byte[]> record : records) {
> > > >>
> > > >>                // The handler puts the byte array on a very fast
> ring
> > > >> buffer so it barely takes any time.
> > > >>
> > > >>
> >  handler.handleMessage(ByteBuffer.wrap(record.value()));
> > > >>
> > > >>             }
> > > >>
> > > >>         }
> > > >>
> > > >> }
> > > >>
> > > >>
> > > >>
> > > >> With this setup our performance has taken a horrendous hit as soon
> as
> > we
> > > >> started this one thread that just polls kafka in a loop.
> > > >>
> > > >> I profiled the application using Java Mission Control and have a few
> > > >> insights.
> > > >>
> > > >> 1. There doesn't seem to be a single hotspot. The consumer just ends
> > up
> > > >> using a lot of CPU for handing such a low number of messages. Our
> > > process
> > > >> was using 16% CPU before we added a single consumer and it went to
> 25%
> > > and
> > > >> above after. That's an increase of over 50% from a single consumer
> > > getting
> > > >> a single digit number of small messages per second. Here is an
> > > attachment
> > > >> of the cpu usage breakdown in the consumer (the namespace is
> different
> > > >> because we shade the kafka jar before using it) -
> > > >> http://imgur.com/tHjdVnM  We've used bigger timeouts (100 seconds
> > odd)
> > > >> and that doesn't seem to make much of a difference either.
> > > >>
> > > >> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not
> > sure
> > > >> whether this is expected but this seems like it would completely
> kill
> > > >> performance. Here is the exception tab of Java mission control.
> > > >> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period
> of 3
> > > >> minutes which is about 10 thousand exceptions per second! The
> > exception
> > > >> stack trace shows that it originates from the poll call. I don't
> > > understand
> > > >> how it can throw so many exceptions given I call poll it with a
> > timeout
> > > of
> > > >> 10 seconds and get messages at about 1 per second.
> > > >>
> > > >> 3. The single thread seems to allocate a lot too. The single thread
> is
> > > >> responsible for 17.87% of our entire JVM allocation rate. Most of
> what
> > > it
> > > >> allocates seems to be those same EOFExceptions. Here is a chart
> > showing
> > > the
> > > >> single thread's allocation proportion: http://imgur.com/GNUJQsz
> Here
> > is
> > > >> a chart that shows a breakdown of the allocations:
> > > >> http://imgur.com/YjCXljE About 20% of the allocations are for the
> > > >> EOFExceptions. This seems kind of crazy especially given that this
> > > happens
> > > >> about 10 thousand times a second. The rest of the allocations seem
> to
> > be
> > > >> spread all over but again seem excessive given how we are getting
> very
> > > few
> > > >> messages.
> > > >>
> > > >> As a comparison, we also run a wrapper over the old SimpleConsumer
> > that
> > > >> gets a lot more data (10 -15 thousand 70 byte messages/sec on a
> > > different
> > > >> topic) and it is able to handle that load without much trouble. At
> > this
> > > >> moment we are completely puzzled by this performance. Most of it
> does
> > > seem
> > > >> to be due to the crazy volumes of exceptions. Note: Our messages
> seem
> > to
> > > >> all be making through. The exceptions are caught by Kafka's stack
> and
> > > never
> > > >> bubble though to us.
> > > >>
> > > >> Are we doing anything wrong with how we are using the new consumer
> > > >> (longer timeouts of a 100 second odd don't seem to help)?
> > > >>
> > > >> Thanks in advance,
> > > >>
> > > >> Rajiv
> > > >>
> > > >>
> > > >>
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: Getting very poor performance from the new Kafka consumer

Posted by Rajiv Kurian <ra...@signalfx.com>.
Hi Guozhang,

The Github link I pasted was from the 0.9.0 branch. The same line seems to
be throwing exceptions in my code built of the maven 0.9.0.0 package. Are
you saying that something else has changed higher up the call stack that
will probably not trigger so many exceptions ?

Thanks,
Rajiv

On Tue, Jan 26, 2016 at 10:44 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Rajiv,
>
> Could you try to build the new consumer from 0.9.0 branch and see if the
> issue can be re-produced?
>
> Guozhang
>
> On Mon, Jan 25, 2016 at 9:46 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > The exception seems to be thrown here
> >
> >
> https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236
> >
> > Is this not expected to hit often?
> >
> > On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >
> > > Wanted to add that we are not using auto commit since we use custom
> > > partition assignments. In fact we never call  consumer.commitAsync() or
> > > consumer.commitSync() calls. My assumption is that since we store our
> own
> > > offsets these calls are not necessary. Hopefully this is not
> responsible
> > > for the poor performance.
> > >
> > > On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > >
> > >> We are using the new kafka consumer with the following config (as
> logged
> > >> by kafka)
> > >>
> > >> metric.reporters = []
> > >>
> > >>         metadata.max.age.ms = 300000
> > >>
> > >>         value.deserializer = class
> > >> org.apache.kafka.common.serialization.ByteArrayDeserializer
> > >>
> > >>         group.id = myGroup.id
> > >>
> > >>         partition.assignment.strategy = [org.apache.kafka.clients.
> > >> consumer.RangeAssignor]
> > >>
> > >>         reconnect.backoff.ms = 50
> > >>
> > >>         sasl.kerberos.ticket.renew.window.factor = 0.8
> > >>
> > >>         max.partition.fetch.bytes = 2097152
> > >>
> > >>         bootstrap.servers = [myBrokerList]
> > >>
> > >>         retry.backoff.ms = 100
> > >>
> > >>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > >>
> > >>         sasl.kerberos.service.name = null
> > >>
> > >>         sasl.kerberos.ticket.renew.jitter = 0.05
> > >>
> > >>         ssl.keystore.type = JKS
> > >>
> > >>         ssl.trustmanager.algorithm = PKIX
> > >>
> > >>         enable.auto.commit = false
> > >>
> > >>         ssl.key.password = null
> > >>
> > >>         fetch.max.wait.ms = 1000
> > >>
> > >>         sasl.kerberos.min.time.before.relogin = 60000
> > >>
> > >>         connections.max.idle.ms = 540000
> > >>
> > >>         ssl.truststore.password = null
> > >>
> > >>         session.timeout.ms = 30000
> > >>
> > >>         metrics.num.samples = 2
> > >>
> > >>         client.id =
> > >>
> > >>         ssl.endpoint.identification.algorithm = null
> > >>
> > >>         key.deserializer = class sf.kafka.VoidDeserializer
> > >>
> > >>         ssl.protocol = TLS
> > >>
> > >>         check.crcs = true
> > >>
> > >>         request.timeout.ms = 40000
> > >>
> > >>         ssl.provider = null
> > >>
> > >>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > >>
> > >>         ssl.keystore.location = null
> > >>
> > >>         heartbeat.interval.ms = 3000
> > >>
> > >>         auto.commit.interval.ms = 5000
> > >>
> > >>         receive.buffer.bytes = 32768
> > >>
> > >>         ssl.cipher.suites = null
> > >>
> > >>         ssl.truststore.type = JKS
> > >>
> > >>         security.protocol = PLAINTEXT
> > >>
> > >>         ssl.truststore.location = null
> > >>
> > >>         ssl.keystore.password = null
> > >>
> > >>         ssl.keymanager.algorithm = SunX509
> > >>
> > >>         metrics.sample.window.ms = 30000
> > >>
> > >>         fetch.min.bytes = 512
> > >>
> > >>         send.buffer.bytes = 131072
> > >>
> > >>         auto.offset.reset = earliest
> > >>
> > >>
> > >> We use the consumer.assign() feature to assign a list of partitions
> and
> > >> call poll in a loop.  We have the following setup:
> > >>
> > >> 1. The messages have no key and we use the byte array deserializer to
> > get
> > >> byte arrays from the config.
> > >>
> > >> 2. The messages themselves are on an average about 75 bytes. We get
> this
> > >> number by diving the Kafka broker bytes-in metric by the messages-in
> > metric.
> > >>
> > >> 3. Each consumer is assigned about 64 partitions of the same topic
> > spread
> > >> across three brokers.
> > >>
> > >> 4. We get very few messages per second maybe around 1-2 messages
> across
> > >> all partitions on a client right now.
> > >>
> > >> 5. We have no compression on the topic.
> > >>
> > >> Our run loop looks something like this
> > >>
> > >> while (isRunning()) {
> > >>
> > >> ConsumerRecords<Void, byte[]> records = null;
> > >>
> > >>         try {
> > >>
> > >>             // Here timeout is about 10 seconds, so it is pretty big.
> > >>
> > >>             records = consumer.poll(timeout);
> > >>
> > >>         } catch (Exception e) {
> > >>
> > >>             logger.error("Exception polling Kafka ", e);
> > >>
> > >>             records = null;
> > >>
> > >>         }
> > >>
> > >>         if (records != null) {
> > >>
> > >>             for (ConsumerRecord<Void, byte[]> record : records) {
> > >>
> > >>                // The handler puts the byte array on a very fast ring
> > >> buffer so it barely takes any time.
> > >>
> > >>
>  handler.handleMessage(ByteBuffer.wrap(record.value()));
> > >>
> > >>             }
> > >>
> > >>         }
> > >>
> > >> }
> > >>
> > >>
> > >>
> > >> With this setup our performance has taken a horrendous hit as soon as
> we
> > >> started this one thread that just polls kafka in a loop.
> > >>
> > >> I profiled the application using Java Mission Control and have a few
> > >> insights.
> > >>
> > >> 1. There doesn't seem to be a single hotspot. The consumer just ends
> up
> > >> using a lot of CPU for handing such a low number of messages. Our
> > process
> > >> was using 16% CPU before we added a single consumer and it went to 25%
> > and
> > >> above after. That's an increase of over 50% from a single consumer
> > getting
> > >> a single digit number of small messages per second. Here is an
> > attachment
> > >> of the cpu usage breakdown in the consumer (the namespace is different
> > >> because we shade the kafka jar before using it) -
> > >> http://imgur.com/tHjdVnM  We've used bigger timeouts (100 seconds
> odd)
> > >> and that doesn't seem to make much of a difference either.
> > >>
> > >> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not
> sure
> > >> whether this is expected but this seems like it would completely kill
> > >> performance. Here is the exception tab of Java mission control.
> > >> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3
> > >> minutes which is about 10 thousand exceptions per second! The
> exception
> > >> stack trace shows that it originates from the poll call. I don't
> > understand
> > >> how it can throw so many exceptions given I call poll it with a
> timeout
> > of
> > >> 10 seconds and get messages at about 1 per second.
> > >>
> > >> 3. The single thread seems to allocate a lot too. The single thread is
> > >> responsible for 17.87% of our entire JVM allocation rate. Most of what
> > it
> > >> allocates seems to be those same EOFExceptions. Here is a chart
> showing
> > the
> > >> single thread's allocation proportion: http://imgur.com/GNUJQsz Here
> is
> > >> a chart that shows a breakdown of the allocations:
> > >> http://imgur.com/YjCXljE About 20% of the allocations are for the
> > >> EOFExceptions. This seems kind of crazy especially given that this
> > happens
> > >> about 10 thousand times a second. The rest of the allocations seem to
> be
> > >> spread all over but again seem excessive given how we are getting very
> > few
> > >> messages.
> > >>
> > >> As a comparison, we also run a wrapper over the old SimpleConsumer
> that
> > >> gets a lot more data (10 -15 thousand 70 byte messages/sec on a
> > different
> > >> topic) and it is able to handle that load without much trouble. At
> this
> > >> moment we are completely puzzled by this performance. Most of it does
> > seem
> > >> to be due to the crazy volumes of exceptions. Note: Our messages seem
> to
> > >> all be making through. The exceptions are caught by Kafka's stack and
> > never
> > >> bubble though to us.
> > >>
> > >> Are we doing anything wrong with how we are using the new consumer
> > >> (longer timeouts of a 100 second odd don't seem to help)?
> > >>
> > >> Thanks in advance,
> > >>
> > >> Rajiv
> > >>
> > >>
> > >>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Getting very poor performance from the new Kafka consumer

Posted by Guozhang Wang <wa...@gmail.com>.
Rajiv,

Could you try to build the new consumer from 0.9.0 branch and see if the
issue can be re-produced?

Guozhang

On Mon, Jan 25, 2016 at 9:46 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> The exception seems to be thrown here
>
> https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236
>
> Is this not expected to hit often?
>
> On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
> > Wanted to add that we are not using auto commit since we use custom
> > partition assignments. In fact we never call  consumer.commitAsync() or
> > consumer.commitSync() calls. My assumption is that since we store our own
> > offsets these calls are not necessary. Hopefully this is not responsible
> > for the poor performance.
> >
> > On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> >
> >> We are using the new kafka consumer with the following config (as logged
> >> by kafka)
> >>
> >> metric.reporters = []
> >>
> >>         metadata.max.age.ms = 300000
> >>
> >>         value.deserializer = class
> >> org.apache.kafka.common.serialization.ByteArrayDeserializer
> >>
> >>         group.id = myGroup.id
> >>
> >>         partition.assignment.strategy = [org.apache.kafka.clients.
> >> consumer.RangeAssignor]
> >>
> >>         reconnect.backoff.ms = 50
> >>
> >>         sasl.kerberos.ticket.renew.window.factor = 0.8
> >>
> >>         max.partition.fetch.bytes = 2097152
> >>
> >>         bootstrap.servers = [myBrokerList]
> >>
> >>         retry.backoff.ms = 100
> >>
> >>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>
> >>         sasl.kerberos.service.name = null
> >>
> >>         sasl.kerberos.ticket.renew.jitter = 0.05
> >>
> >>         ssl.keystore.type = JKS
> >>
> >>         ssl.trustmanager.algorithm = PKIX
> >>
> >>         enable.auto.commit = false
> >>
> >>         ssl.key.password = null
> >>
> >>         fetch.max.wait.ms = 1000
> >>
> >>         sasl.kerberos.min.time.before.relogin = 60000
> >>
> >>         connections.max.idle.ms = 540000
> >>
> >>         ssl.truststore.password = null
> >>
> >>         session.timeout.ms = 30000
> >>
> >>         metrics.num.samples = 2
> >>
> >>         client.id =
> >>
> >>         ssl.endpoint.identification.algorithm = null
> >>
> >>         key.deserializer = class sf.kafka.VoidDeserializer
> >>
> >>         ssl.protocol = TLS
> >>
> >>         check.crcs = true
> >>
> >>         request.timeout.ms = 40000
> >>
> >>         ssl.provider = null
> >>
> >>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >>
> >>         ssl.keystore.location = null
> >>
> >>         heartbeat.interval.ms = 3000
> >>
> >>         auto.commit.interval.ms = 5000
> >>
> >>         receive.buffer.bytes = 32768
> >>
> >>         ssl.cipher.suites = null
> >>
> >>         ssl.truststore.type = JKS
> >>
> >>         security.protocol = PLAINTEXT
> >>
> >>         ssl.truststore.location = null
> >>
> >>         ssl.keystore.password = null
> >>
> >>         ssl.keymanager.algorithm = SunX509
> >>
> >>         metrics.sample.window.ms = 30000
> >>
> >>         fetch.min.bytes = 512
> >>
> >>         send.buffer.bytes = 131072
> >>
> >>         auto.offset.reset = earliest
> >>
> >>
> >> We use the consumer.assign() feature to assign a list of partitions and
> >> call poll in a loop.  We have the following setup:
> >>
> >> 1. The messages have no key and we use the byte array deserializer to
> get
> >> byte arrays from the config.
> >>
> >> 2. The messages themselves are on an average about 75 bytes. We get this
> >> number by diving the Kafka broker bytes-in metric by the messages-in
> metric.
> >>
> >> 3. Each consumer is assigned about 64 partitions of the same topic
> spread
> >> across three brokers.
> >>
> >> 4. We get very few messages per second maybe around 1-2 messages across
> >> all partitions on a client right now.
> >>
> >> 5. We have no compression on the topic.
> >>
> >> Our run loop looks something like this
> >>
> >> while (isRunning()) {
> >>
> >> ConsumerRecords<Void, byte[]> records = null;
> >>
> >>         try {
> >>
> >>             // Here timeout is about 10 seconds, so it is pretty big.
> >>
> >>             records = consumer.poll(timeout);
> >>
> >>         } catch (Exception e) {
> >>
> >>             logger.error("Exception polling Kafka ", e);
> >>
> >>             records = null;
> >>
> >>         }
> >>
> >>         if (records != null) {
> >>
> >>             for (ConsumerRecord<Void, byte[]> record : records) {
> >>
> >>                // The handler puts the byte array on a very fast ring
> >> buffer so it barely takes any time.
> >>
> >>                 handler.handleMessage(ByteBuffer.wrap(record.value()));
> >>
> >>             }
> >>
> >>         }
> >>
> >> }
> >>
> >>
> >>
> >> With this setup our performance has taken a horrendous hit as soon as we
> >> started this one thread that just polls kafka in a loop.
> >>
> >> I profiled the application using Java Mission Control and have a few
> >> insights.
> >>
> >> 1. There doesn't seem to be a single hotspot. The consumer just ends up
> >> using a lot of CPU for handing such a low number of messages. Our
> process
> >> was using 16% CPU before we added a single consumer and it went to 25%
> and
> >> above after. That's an increase of over 50% from a single consumer
> getting
> >> a single digit number of small messages per second. Here is an
> attachment
> >> of the cpu usage breakdown in the consumer (the namespace is different
> >> because we shade the kafka jar before using it) -
> >> http://imgur.com/tHjdVnM  We've used bigger timeouts (100 seconds odd)
> >> and that doesn't seem to make much of a difference either.
> >>
> >> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure
> >> whether this is expected but this seems like it would completely kill
> >> performance. Here is the exception tab of Java mission control.
> >> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3
> >> minutes which is about 10 thousand exceptions per second! The exception
> >> stack trace shows that it originates from the poll call. I don't
> understand
> >> how it can throw so many exceptions given I call poll it with a timeout
> of
> >> 10 seconds and get messages at about 1 per second.
> >>
> >> 3. The single thread seems to allocate a lot too. The single thread is
> >> responsible for 17.87% of our entire JVM allocation rate. Most of what
> it
> >> allocates seems to be those same EOFExceptions. Here is a chart showing
> the
> >> single thread's allocation proportion: http://imgur.com/GNUJQsz Here is
> >> a chart that shows a breakdown of the allocations:
> >> http://imgur.com/YjCXljE About 20% of the allocations are for the
> >> EOFExceptions. This seems kind of crazy especially given that this
> happens
> >> about 10 thousand times a second. The rest of the allocations seem to be
> >> spread all over but again seem excessive given how we are getting very
> few
> >> messages.
> >>
> >> As a comparison, we also run a wrapper over the old SimpleConsumer that
> >> gets a lot more data (10 -15 thousand 70 byte messages/sec on a
> different
> >> topic) and it is able to handle that load without much trouble. At this
> >> moment we are completely puzzled by this performance. Most of it does
> seem
> >> to be due to the crazy volumes of exceptions. Note: Our messages seem to
> >> all be making through. The exceptions are caught by Kafka's stack and
> never
> >> bubble though to us.
> >>
> >> Are we doing anything wrong with how we are using the new consumer
> >> (longer timeouts of a 100 second odd don't seem to help)?
> >>
> >> Thanks in advance,
> >>
> >> Rajiv
> >>
> >>
> >>
> >
>



-- 
-- Guozhang

Re: Getting very poor performance from the new Kafka consumer

Posted by Rajiv Kurian <ra...@signalfx.com>.
The exception seems to be thrown here
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236

Is this not expected to hit often?

On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Wanted to add that we are not using auto commit since we use custom
> partition assignments. In fact we never call  consumer.commitAsync() or
> consumer.commitSync() calls. My assumption is that since we store our own
> offsets these calls are not necessary. Hopefully this is not responsible
> for the poor performance.
>
> On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
>> We are using the new kafka consumer with the following config (as logged
>> by kafka)
>>
>> metric.reporters = []
>>
>>         metadata.max.age.ms = 300000
>>
>>         value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>>         group.id = myGroup.id
>>
>>         partition.assignment.strategy = [org.apache.kafka.clients.
>> consumer.RangeAssignor]
>>
>>         reconnect.backoff.ms = 50
>>
>>         sasl.kerberos.ticket.renew.window.factor = 0.8
>>
>>         max.partition.fetch.bytes = 2097152
>>
>>         bootstrap.servers = [myBrokerList]
>>
>>         retry.backoff.ms = 100
>>
>>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>
>>         sasl.kerberos.service.name = null
>>
>>         sasl.kerberos.ticket.renew.jitter = 0.05
>>
>>         ssl.keystore.type = JKS
>>
>>         ssl.trustmanager.algorithm = PKIX
>>
>>         enable.auto.commit = false
>>
>>         ssl.key.password = null
>>
>>         fetch.max.wait.ms = 1000
>>
>>         sasl.kerberos.min.time.before.relogin = 60000
>>
>>         connections.max.idle.ms = 540000
>>
>>         ssl.truststore.password = null
>>
>>         session.timeout.ms = 30000
>>
>>         metrics.num.samples = 2
>>
>>         client.id =
>>
>>         ssl.endpoint.identification.algorithm = null
>>
>>         key.deserializer = class sf.kafka.VoidDeserializer
>>
>>         ssl.protocol = TLS
>>
>>         check.crcs = true
>>
>>         request.timeout.ms = 40000
>>
>>         ssl.provider = null
>>
>>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>
>>         ssl.keystore.location = null
>>
>>         heartbeat.interval.ms = 3000
>>
>>         auto.commit.interval.ms = 5000
>>
>>         receive.buffer.bytes = 32768
>>
>>         ssl.cipher.suites = null
>>
>>         ssl.truststore.type = JKS
>>
>>         security.protocol = PLAINTEXT
>>
>>         ssl.truststore.location = null
>>
>>         ssl.keystore.password = null
>>
>>         ssl.keymanager.algorithm = SunX509
>>
>>         metrics.sample.window.ms = 30000
>>
>>         fetch.min.bytes = 512
>>
>>         send.buffer.bytes = 131072
>>
>>         auto.offset.reset = earliest
>>
>>
>> We use the consumer.assign() feature to assign a list of partitions and
>> call poll in a loop.  We have the following setup:
>>
>> 1. The messages have no key and we use the byte array deserializer to get
>> byte arrays from the config.
>>
>> 2. The messages themselves are on an average about 75 bytes. We get this
>> number by diving the Kafka broker bytes-in metric by the messages-in metric.
>>
>> 3. Each consumer is assigned about 64 partitions of the same topic spread
>> across three brokers.
>>
>> 4. We get very few messages per second maybe around 1-2 messages across
>> all partitions on a client right now.
>>
>> 5. We have no compression on the topic.
>>
>> Our run loop looks something like this
>>
>> while (isRunning()) {
>>
>> ConsumerRecords<Void, byte[]> records = null;
>>
>>         try {
>>
>>             // Here timeout is about 10 seconds, so it is pretty big.
>>
>>             records = consumer.poll(timeout);
>>
>>         } catch (Exception e) {
>>
>>             logger.error("Exception polling Kafka ", e);
>>
>>             records = null;
>>
>>         }
>>
>>         if (records != null) {
>>
>>             for (ConsumerRecord<Void, byte[]> record : records) {
>>
>>                // The handler puts the byte array on a very fast ring
>> buffer so it barely takes any time.
>>
>>                 handler.handleMessage(ByteBuffer.wrap(record.value()));
>>
>>             }
>>
>>         }
>>
>> }
>>
>>
>>
>> With this setup our performance has taken a horrendous hit as soon as we
>> started this one thread that just polls kafka in a loop.
>>
>> I profiled the application using Java Mission Control and have a few
>> insights.
>>
>> 1. There doesn't seem to be a single hotspot. The consumer just ends up
>> using a lot of CPU for handing such a low number of messages. Our process
>> was using 16% CPU before we added a single consumer and it went to 25% and
>> above after. That's an increase of over 50% from a single consumer getting
>> a single digit number of small messages per second. Here is an attachment
>> of the cpu usage breakdown in the consumer (the namespace is different
>> because we shade the kafka jar before using it) -
>> http://imgur.com/tHjdVnM  We've used bigger timeouts (100 seconds odd)
>> and that doesn't seem to make much of a difference either.
>>
>> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure
>> whether this is expected but this seems like it would completely kill
>> performance. Here is the exception tab of Java mission control.
>> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3
>> minutes which is about 10 thousand exceptions per second! The exception
>> stack trace shows that it originates from the poll call. I don't understand
>> how it can throw so many exceptions given I call poll it with a timeout of
>> 10 seconds and get messages at about 1 per second.
>>
>> 3. The single thread seems to allocate a lot too. The single thread is
>> responsible for 17.87% of our entire JVM allocation rate. Most of what it
>> allocates seems to be those same EOFExceptions. Here is a chart showing the
>> single thread's allocation proportion: http://imgur.com/GNUJQsz Here is
>> a chart that shows a breakdown of the allocations:
>> http://imgur.com/YjCXljE About 20% of the allocations are for the
>> EOFExceptions. This seems kind of crazy especially given that this happens
>> about 10 thousand times a second. The rest of the allocations seem to be
>> spread all over but again seem excessive given how we are getting very few
>> messages.
>>
>> As a comparison, we also run a wrapper over the old SimpleConsumer that
>> gets a lot more data (10 -15 thousand 70 byte messages/sec on a different
>> topic) and it is able to handle that load without much trouble. At this
>> moment we are completely puzzled by this performance. Most of it does seem
>> to be due to the crazy volumes of exceptions. Note: Our messages seem to
>> all be making through. The exceptions are caught by Kafka's stack and never
>> bubble though to us.
>>
>> Are we doing anything wrong with how we are using the new consumer
>> (longer timeouts of a 100 second odd don't seem to help)?
>>
>> Thanks in advance,
>>
>> Rajiv
>>
>>
>>
>

Re: Getting very poor performance from the new Kafka consumer

Posted by Rajiv Kurian <ra...@signalfx.com>.
Wanted to add that we are not using auto commit since we use custom
partition assignments. In fact we never call  consumer.commitAsync() or
consumer.commitSync() calls. My assumption is that since we store our own
offsets these calls are not necessary. Hopefully this is not responsible
for the poor performance.

On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> We are using the new kafka consumer with the following config (as logged
> by kafka)
>
> metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>
>         group.id = myGroup.id
>
>         partition.assignment.strategy = [org.apache.kafka.clients.consumer
> .RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 2097152
>
>         bootstrap.servers = [myBrokerList]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = false
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 1000
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class sf.kafka.VoidDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 32768
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 512
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
>
> We use the consumer.assign() feature to assign a list of partitions and
> call poll in a loop.  We have the following setup:
>
> 1. The messages have no key and we use the byte array deserializer to get
> byte arrays from the config.
>
> 2. The messages themselves are on an average about 75 bytes. We get this
> number by diving the Kafka broker bytes-in metric by the messages-in metric.
>
> 3. Each consumer is assigned about 64 partitions of the same topic spread
> across three brokers.
>
> 4. We get very few messages per second maybe around 1-2 messages across
> all partitions on a client right now.
>
> 5. We have no compression on the topic.
>
> Our run loop looks something like this
>
> while (isRunning()) {
>
> ConsumerRecords<Void, byte[]> records = null;
>
>         try {
>
>             // Here timeout is about 10 seconds, so it is pretty big.
>
>             records = consumer.poll(timeout);
>
>         } catch (Exception e) {
>
>             logger.error("Exception polling Kafka ", e);
>
>             records = null;
>
>         }
>
>         if (records != null) {
>
>             for (ConsumerRecord<Void, byte[]> record : records) {
>
>                // The handler puts the byte array on a very fast ring
> buffer so it barely takes any time.
>
>                 handler.handleMessage(ByteBuffer.wrap(record.value()));
>
>             }
>
>         }
>
> }
>
>
>
> With this setup our performance has taken a horrendous hit as soon as we
> started this one thread that just polls kafka in a loop.
>
> I profiled the application using Java Mission Control and have a few
> insights.
>
> 1. There doesn't seem to be a single hotspot. The consumer just ends up
> using a lot of CPU for handing such a low number of messages. Our process
> was using 16% CPU before we added a single consumer and it went to 25% and
> above after. That's an increase of over 50% from a single consumer getting
> a single digit number of small messages per second. Here is an attachment
> of the cpu usage breakdown in the consumer (the namespace is different
> because we shade the kafka jar before using it) - http://imgur.com/tHjdVnM
>  We've used bigger timeouts (100 seconds odd) and that doesn't seem to make
> much of a difference either.
>
> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure
> whether this is expected but this seems like it would completely kill
> performance. Here is the exception tab of Java mission control.
> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3
> minutes which is about 10 thousand exceptions per second! The exception
> stack trace shows that it originates from the poll call. I don't understand
> how it can throw so many exceptions given I call poll it with a timeout of
> 10 seconds and get messages at about 1 per second.
>
> 3. The single thread seems to allocate a lot too. The single thread is
> responsible for 17.87% of our entire JVM allocation rate. Most of what it
> allocates seems to be those same EOFExceptions. Here is a chart showing the
> single thread's allocation proportion: http://imgur.com/GNUJQsz Here is a
> chart that shows a breakdown of the allocations: http://imgur.com/YjCXljE
> About 20% of the allocations are for the EOFExceptions. This seems kind of
> crazy especially given that this happens about 10 thousand times a second.
> The rest of the allocations seem to be spread all over but again seem
> excessive given how we are getting very few messages.
>
> As a comparison, we also run a wrapper over the old SimpleConsumer that
> gets a lot more data (10 -15 thousand 70 byte messages/sec on a different
> topic) and it is able to handle that load without much trouble. At this
> moment we are completely puzzled by this performance. Most of it does seem
> to be due to the crazy volumes of exceptions. Note: Our messages seem to
> all be making through. The exceptions are caught by Kafka's stack and never
> bubble though to us.
>
> Are we doing anything wrong with how we are using the new consumer (longer
> timeouts of a 100 second odd don't seem to help)?
>
> Thanks in advance,
>
> Rajiv
>
>
>