You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Vijay Prakash <Vi...@microsoft.com.INVALID> on 2017/10/27 21:42:13 UTC

Producer performance is awful when acks=all

Hey all,

I'm currently running Kafka 0.10.1 on Windows and am doing some perf testing. I tried out the perf test setup described in this blog post: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines. When I try "single producer thread, 3x asynchronous replication", I get about 550k records/sec which seems acceptable for the perf loss due to running on Windows. However, when I set acks=all to try synchronous replication, I drop to about 120k records/sec, which is a LOT worse than the numbers in the blog post. Are there some specific settings that might cause acks=all to cause more perf issues? My broker settings are below. Thanks!

Broker settings:
advertised.host.name = null
                advertised.listeners = null
                advertised.port = null
                authorizer.class.name =
                auto.create.topics.enable = false
                auto.leader.rebalance.enable = true
                background.threads = 10
                broker.id = 3
                broker.id.generation.enable = true
                broker.rack = sc3
                compression.type = producer
                connections.max.idle.ms = 600000
                controlled.shutdown.enable = true
                controlled.shutdown.max.retries = 3
                controlled.shutdown.retry.backoff.ms = 5000
                controller.socket.timeout.ms = 30000
                default.replication.factor = 1
                delete.topic.enable = true
                fetch.purgatory.purge.interval.requests = 1000
                group.max.session.timeout.ms = 300000
                group.min.session.timeout.ms = 6000
                host.name =
                inter.broker.protocol.version = 0.10.1-IV2
                leader.imbalance.check.interval.seconds = 300
                leader.imbalance.per.broker.percentage = 10
                listeners = <redacted>
                log.cleaner.backoff.ms = 15000
                log.cleaner.dedupe.buffer.size = 134217728
                log.cleaner.delete.retention.ms = 86400000
                log.cleaner.enable = true
                log.cleaner.io.buffer.load.factor = 0.9
                log.cleaner.io.buffer.size = 524288
                log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
                log.cleaner.min.cleanable.ratio = 0.5
                log.cleaner.min.compaction.lag.ms = 0
                log.cleaner.threads = 1
                log.cleanup.policy = [delete]
                log.dir = D:/data/kafka/kafka-logs,H:/data/kafka/kafka-logs,I:/data/kafka/kafka-logs,J:/data/kafka/kafka-logs
                log.dirs = null
                log.flush.interval.messages = 20000
                log.flush.interval.ms = 1000
                log.flush.offset.checkpoint.interval.ms = 60000
                log.flush.scheduler.interval.ms = 9223372036854775807
                log.index.interval.bytes = 4096
                log.index.size.max.bytes = 10485760
                log.message.format.version = 0.10.1-IV2
                log.message.timestamp.difference.max.ms = 9223372036854775807
                log.message.timestamp.type = CreateTime
                log.preallocate = false
                log.retention.bytes = -1
                log.retention.check.interval.ms = 300000
                log.retention.hours = 24
                log.retention.minutes = null
                log.retention.ms = null
                log.roll.hours = 168
                log.roll.jitter.hours = 0
                log.roll.jitter.ms = null
                log.roll.ms = null
                log.segment.bytes = 536870912
                log.segment.delete.delay.ms = 60000
                max.connections.per.ip = 2147483647
                max.connections.per.ip.overrides =
                message.max.bytes = 1000012
                metric.reporters = []
                metrics.num.samples = 2
                metrics.sample.window.ms = 30000
                min.insync.replicas = 3
                num.io.threads = 20
                num.network.threads = 20
                num.partitions = 1
                num.recovery.threads.per.data.dir = 1
                num.replica.fetchers = 1
                offset.metadata.max.bytes = 4096
                offsets.commit.required.acks = -1
                offsets.commit.timeout.ms = 5000
                offsets.load.buffer.size = 5242880
                offsets.retention.check.interval.ms = 600000
                offsets.retention.minutes = 1440
                offsets.topic.compression.codec = 0
                offsets.topic.num.partitions = 50
                offsets.topic.replication.factor = 5
                offsets.topic.segment.bytes = 104857600
                port = 9092
                principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
                producer.purgatory.purge.interval.requests = 1000
                queued.max.requests = 500
                quota.consumer.default = 9223372036854775807
                quota.producer.default = 9223372036854775807
                quota.window.num = 11
                quota.window.size.seconds = 1
                replica.fetch.backoff.ms = 1000
                replica.fetch.max.bytes = 1048576
                replica.fetch.min.bytes = 1
                replica.fetch.response.max.bytes = 10485760
                replica.fetch.wait.max.ms = 500
                replica.high.watermark.checkpoint.interval.ms = 5000
                replica.lag.time.max.ms = 10000
                replica.socket.receive.buffer.bytes = 65536
                replica.socket.timeout.ms = 30000
                replication.quota.window.num = 11
                replication.quota.window.size.seconds = 1
                request.timeout.ms = 30000
                reserved.broker.max.id = 1000
                sasl.enabled.mechanisms = [GSSAPI]
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.min.time.before.relogin = 60000
                sasl.kerberos.principal.to.local.rules = [DEFAULT]
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                sasl.kerberos.ticket.renew.window.factor = 0.8
                sasl.mechanism.inter.broker.protocol = GSSAPI
                security.inter.broker.protocol = PLAINTEXT
                socket.receive.buffer.bytes = 1048576
                socket.request.max.bytes = 104857600
                socket.send.buffer.bytes = 1048576
                ssl.cipher.suites = null
                ssl.client.auth = none
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.endpoint.identification.algorithm = null
                ssl.key.password = null
                ssl.keymanager.algorithm = SunX509
                ssl.keystore.location = null
                ssl.keystore.password = null
                ssl.keystore.type = JKS
                ssl.protocol = TLS
                ssl.provider = null
                ssl.secure.random.implementation = null
                ssl.trustmanager.algorithm = PKIX
                ssl.truststore.location = null
                ssl.truststore.password = null
                ssl.truststore.type = JKS
                unclean.leader.election.enable = false
                zookeeper.connect = <redacted>
                zookeeper.connection.timeout.ms = 1000000
                zookeeper.session.timeout.ms = 6000
                zookeeper.set.acl = false
                zookeeper.sync.time.ms = 2000

Re: Producer performance is awful when acks=all

Posted by Colin McCabe <cm...@apache.org>.
Hi Vijay,

It is surprising to see such a large drop.  The first thing that it
suggests to me is that perhaps there is a network bottleneck that is
being glossed over in the acks=1 case, but not in the acks=all case.  In
the acks=1 case, is the broker co-located on the same machine as the
producer, or is it on a separate machine?

What kind of network are you using?  Is it gigabit ethernet?  10gigE?

If you run all three brokers on a single computer rather than on
multiple computes, do the results change?  If so, it suggests that the
network is acting as the bottleneck, again.

Are you using the new producer and consumer, or old versions?

best,
Colin


On Fri, Oct 27, 2017, at 14:42, Vijay Prakash wrote:
> Hey all,
> 
> I'm currently running Kafka 0.10.1 on Windows and am doing some perf
> testing. I tried out the perf test setup described in this blog post:
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines.
> When I try "single producer thread, 3x asynchronous replication", I get
> about 550k records/sec which seems acceptable for the perf loss due to
> running on Windows. However, when I set acks=all to try synchronous
> replication, I drop to about 120k records/sec, which is a LOT worse than
> the numbers in the blog post. Are there some specific settings that might
> cause acks=all to cause more perf issues? My broker settings are below.
> Thanks!
> 
> Broker settings:
> advertised.host.name = null
>                 advertised.listeners = null
>                 advertised.port = null
>                 authorizer.class.name =
>                 auto.create.topics.enable = false
>                 auto.leader.rebalance.enable = true
>                 background.threads = 10
>                 broker.id = 3
>                 broker.id.generation.enable = true
>                 broker.rack = sc3
>                 compression.type = producer
>                 connections.max.idle.ms = 600000
>                 controlled.shutdown.enable = true
>                 controlled.shutdown.max.retries = 3
>                 controlled.shutdown.retry.backoff.ms = 5000
>                 controller.socket.timeout.ms = 30000
>                 default.replication.factor = 1
>                 delete.topic.enable = true
>                 fetch.purgatory.purge.interval.requests = 1000
>                 group.max.session.timeout.ms = 300000
>                 group.min.session.timeout.ms = 6000
>                 host.name =
>                 inter.broker.protocol.version = 0.10.1-IV2
>                 leader.imbalance.check.interval.seconds = 300
>                 leader.imbalance.per.broker.percentage = 10
>                 listeners = <redacted>
>                 log.cleaner.backoff.ms = 15000
>                 log.cleaner.dedupe.buffer.size = 134217728
>                 log.cleaner.delete.retention.ms = 86400000
>                 log.cleaner.enable = true
>                 log.cleaner.io.buffer.load.factor = 0.9
>                 log.cleaner.io.buffer.size = 524288
>                 log.cleaner.io.max.bytes.per.second =
>                 1.7976931348623157E308
>                 log.cleaner.min.cleanable.ratio = 0.5
>                 log.cleaner.min.compaction.lag.ms = 0
>                 log.cleaner.threads = 1
>                 log.cleanup.policy = [delete]
>                 log.dir =
>                 D:/data/kafka/kafka-logs,H:/data/kafka/kafka-logs,I:/data/kafka/kafka-logs,J:/data/kafka/kafka-logs
>                 log.dirs = null
>                 log.flush.interval.messages = 20000
>                 log.flush.interval.ms = 1000
>                 log.flush.offset.checkpoint.interval.ms = 60000
>                 log.flush.scheduler.interval.ms = 9223372036854775807
>                 log.index.interval.bytes = 4096
>                 log.index.size.max.bytes = 10485760
>                 log.message.format.version = 0.10.1-IV2
>                 log.message.timestamp.difference.max.ms =
>                 9223372036854775807
>                 log.message.timestamp.type = CreateTime
>                 log.preallocate = false
>                 log.retention.bytes = -1
>                 log.retention.check.interval.ms = 300000
>                 log.retention.hours = 24
>                 log.retention.minutes = null
>                 log.retention.ms = null
>                 log.roll.hours = 168
>                 log.roll.jitter.hours = 0
>                 log.roll.jitter.ms = null
>                 log.roll.ms = null
>                 log.segment.bytes = 536870912
>                 log.segment.delete.delay.ms = 60000
>                 max.connections.per.ip = 2147483647
>                 max.connections.per.ip.overrides =
>                 message.max.bytes = 1000012
>                 metric.reporters = []
>                 metrics.num.samples = 2
>                 metrics.sample.window.ms = 30000
>                 min.insync.replicas = 3
>                 num.io.threads = 20
>                 num.network.threads = 20
>                 num.partitions = 1
>                 num.recovery.threads.per.data.dir = 1
>                 num.replica.fetchers = 1
>                 offset.metadata.max.bytes = 4096
>                 offsets.commit.required.acks = -1
>                 offsets.commit.timeout.ms = 5000
>                 offsets.load.buffer.size = 5242880
>                 offsets.retention.check.interval.ms = 600000
>                 offsets.retention.minutes = 1440
>                 offsets.topic.compression.codec = 0
>                 offsets.topic.num.partitions = 50
>                 offsets.topic.replication.factor = 5
>                 offsets.topic.segment.bytes = 104857600
>                 port = 9092
>                 principal.builder.class = class
>                 org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
>                 producer.purgatory.purge.interval.requests = 1000
>                 queued.max.requests = 500
>                 quota.consumer.default = 9223372036854775807
>                 quota.producer.default = 9223372036854775807
>                 quota.window.num = 11
>                 quota.window.size.seconds = 1
>                 replica.fetch.backoff.ms = 1000
>                 replica.fetch.max.bytes = 1048576
>                 replica.fetch.min.bytes = 1
>                 replica.fetch.response.max.bytes = 10485760
>                 replica.fetch.wait.max.ms = 500
>                 replica.high.watermark.checkpoint.interval.ms = 5000
>                 replica.lag.time.max.ms = 10000
>                 replica.socket.receive.buffer.bytes = 65536
>                 replica.socket.timeout.ms = 30000
>                 replication.quota.window.num = 11
>                 replication.quota.window.size.seconds = 1
>                 request.timeout.ms = 30000
>                 reserved.broker.max.id = 1000
>                 sasl.enabled.mechanisms = [GSSAPI]
>                 sasl.kerberos.kinit.cmd = /usr/bin/kinit
>                 sasl.kerberos.min.time.before.relogin = 60000
>                 sasl.kerberos.principal.to.local.rules = [DEFAULT]
>                 sasl.kerberos.service.name = null
>                 sasl.kerberos.ticket.renew.jitter = 0.05
>                 sasl.kerberos.ticket.renew.window.factor = 0.8
>                 sasl.mechanism.inter.broker.protocol = GSSAPI
>                 security.inter.broker.protocol = PLAINTEXT
>                 socket.receive.buffer.bytes = 1048576
>                 socket.request.max.bytes = 104857600
>                 socket.send.buffer.bytes = 1048576
>                 ssl.cipher.suites = null
>                 ssl.client.auth = none
>                 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>                 ssl.endpoint.identification.algorithm = null
>                 ssl.key.password = null
>                 ssl.keymanager.algorithm = SunX509
>                 ssl.keystore.location = null
>                 ssl.keystore.password = null
>                 ssl.keystore.type = JKS
>                 ssl.protocol = TLS
>                 ssl.provider = null
>                 ssl.secure.random.implementation = null
>                 ssl.trustmanager.algorithm = PKIX
>                 ssl.truststore.location = null
>                 ssl.truststore.password = null
>                 ssl.truststore.type = JKS
>                 unclean.leader.election.enable = false
>                 zookeeper.connect = <redacted>
>                 zookeeper.connection.timeout.ms = 1000000
>                 zookeeper.session.timeout.ms = 6000
>                 zookeeper.set.acl = false
>                 zookeeper.sync.time.ms = 2000