You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Eric Owhadi <er...@esgyn.com> on 2019/10/02 15:41:35 UTC

poor producing performance with very low CPU utilization?

Hi Kafka users,
I am new to Kafka and am struggling with getting acceptable producing rate.
I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting hyperthreading. 256GB memory on a 10Gbit network
Kafka is installed as part of cloudera parcel, with 5GB java heap.
Producer version: Kafka client 2.2.1

Wed Oct  2 07:56:59 PDT 2019
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
Using -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as CSD_JAVA_OPTS
Using /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir
Using scripts/control.sh as process script
CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CMF_CONF_DIR=/etc/cloudera-scm-agent

Date: Wed Oct  2 07:56:59 PDT 2019
Host: xxxxx.esgyn.local
Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
Zookeeper Quorum: xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
Zookeeper Chroot:
PORT: 9092
JMX_PORT: 9393
SSL_PORT: 9093
ENABLE_MONITORING: true
METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
BROKER_HEAP_SIZE: 5120
BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
BROKER_SSL_ENABLED: false
KERBEROS_AUTH_ENABLED: false
KAFKA_PRINCIPAL:
SECURITY_INTER_BROKER_PROTOCOL: INFERRED
AUTHENTICATE_ZOOKEEPER_CONNECTION: true
SUPER_USERS: kafka
Kafka version found: 2.2.1-kafka4.1.0
Sentry version found: 1.5.1-cdh5.15.0
ZK_PRINCIPAL_NAME: zookeeper
Final Zookeeper Quorum is xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
security.inter.broker.protocol inferred as PLAINTEXT
LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,

I am producing messages of 12016 bytes uncompressed, then snappy compressed by kafka.
I am using a topic with 200 partitions, and a custom partitioner that I verified is doing good job at spreading the load on the 2 brokers.

My producer config look like:
                     kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
                     kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
              kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer");
                     kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
                     kafkaProps.put("compression.type","snappy");
                     kafkaProps.put("batch.size","65536");
                     kafkaProps.put("acks", "all");
                     kafkaProps.put("linger.ms","1");

I tried first doing fire and forget send, thinking I would get best performance.
Then I tried synchronous send, and amazingly found that I would get better performance with sync send.

However, after 1 or 2 minute of load test, I start getting error on the synchronous send like this:
ava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
        at org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:505)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation

So I am suspecting that I am producing too fast, and brokers cannot catch up.
I tried bumping up the io thread from default 8 to 40, that did not help.

I am getting a producing rate of only about 100 message per seconds, and about 1 Megabyte per seconds according to kafka metrics.
The CPU utilization is barely noticeable (3%), network is ridiculously unaffected, and having googled around, this is not the kind of perf I should expect out of my config. I was hoping for at least 10X more if not 100X better. Was my expectations too high, or am I missing something in config that is causing this performance numbers?

Some details: I produce using a jetty custom handler that I verified to be super-fast when I am not producing (commenting out the send()), and I am using a single (I also tried with 2) producer reused on all jetty threads.

Any help/clue would be much appreciated,
Thanks in advance,
Eric Owhadi
Esgyn Corporation.




RE: poor producing performance with very low CPU utilization?

Posted by Eric Owhadi <er...@esgyn.com>.
I found one big contributor to the badness was my custom partitioner  had a bug (missing a Utils.ToPositive call).
I also found the default partitioner use of murmur is very bad, compare to simply doing a hash, for a 5X perf degradation!
As you can see bellow, using a good custom partitioner , I can achieve 1875 message/s, while the default partitioner only gets me to 381 message/s.

Just to share my finding, here are my notes:
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		if ((keyBytes == null) || (!(key instanceof Long)))
			throw new InvalidRecordException("We expect all message to have a long as key");
		return Utils.toPositive(Utils.murmur2(keyBytes))%cluster.partitionCountForTopic(topic);
	}
	
	381 message/s 2.6mb/s
	
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		if ((keyBytes == null) || (!(key instanceof Long)))
			throw new InvalidRecordException("We expect all message to have a long as key");
		long k = (long) key / 1000; //consecutive 1000 id are co located
		return Utils.murmur2(toBytes(k))%cluster.partitionCountForTopic(topic);
	}

	private static byte[] toBytes(long val) {
	    byte [] b = new byte[8];
	    for (int i = 7; i > 0; i--) {
	      b[i] = (byte) val;
	      val >>>= 8;
	    }
	    b[0] = (byte) val;
	    return b;
	  }

	205 message/s 1.4mb/s including some error 
	
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		if ((keyBytes == null) || (!(key instanceof Long)))
			throw new InvalidRecordException("We expect all message to have a long as key");
		long k = (long) key / 1000; //consecutive 1000 id are co located
		return Utils.toPositive(Utils.murmur2(toBytes(k)))%cluster.partitionCountForTopic(topic);

	}
		381 message/s 2.6mb/s
	
		public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		if ((keyBytes == null) || (!(key instanceof Long)))
			throw new InvalidRecordException("We expect all message to have a long as key");
		long k = (long) key / 1000; //consecutive 1000 id are co located
		return Long.hashCode(k)%cluster.partitionCountForTopic(topic);
	}
	1865 message/s  3.7mb/s
	
		public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		if ((keyBytes == null) || (!(key instanceof Long)))
			throw new InvalidRecordException("We expect all message to have a long as key");
		long k = (long) key / 1000; //consecutive 1000 id are co located

		return (int)(k%cluster.partitionCountForTopic(topic));
	1875 message/s  3.7mb/s
	






-----Original Message-----
From: Eric Owhadi <er...@esgyn.com> 
Sent: Thursday, October 3, 2019 4:34 PM
To: users@kafka.apache.org
Subject: RE: poor producing performance with very low CPU utilization?

External

To test if my backend was good, I tried using the kafka-producer-perf-test. Boy that was fast!

Instead of my lame 200 message per seconds, I am getting 20 000 message per seconds. 100X....
That is more in line with my expectations. Granted that this test does not use my custom partitioner and serializer. I will try add variables one after the other, but definitelly the bottleneck is not the server :-).


kafka-producer-perf-test --num-records 6000000 --record-size 12016 --topic DEFAULT-.TIMESERIES.SmartpumpCollectorVector --throughput 1000000   --print-metrics --producer-props bootstrap.servers=nap052.esgyn.local:9092,localhost:9092  compression.type=snappy batch.size=65536 acks=all linger.ms=85
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/ESGYNDB-2.7.0-A1/traf_home/export/lib/orc-tools-1.5.0-uber.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/10/03 14:19:33 INFO producer.ProducerConfig: ProducerConfig values:
        acks = all
        batch.size = 65536
        bootstrap.servers = [xxx.esgyn.local:9092, localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = snappy
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 85
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

19/10/03 14:19:33 INFO utils.AppInfoParser: Kafka version: 2.2.1-kafka-4.1.0
19/10/03 14:19:33 INFO utils.AppInfoParser: Kafka commitId: unknown
19/10/03 14:19:33 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
19/10/03 14:19:33 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
19/10/03 14:19:33 INFO clients.Metadata: Cluster ID: t4ZcV5AwRq6AK0scvEyf6w
128382 records sent, 25630.3 records/sec (293.71 MB/sec), 106.6 ms avg latency, 630.0 ms max latency.
137267 records sent, 27447.9 records/sec (314.54 MB/sec), 103.1 ms avg latency, 618.0 ms max latency.
154956 records sent, 30960.2 records/sec (354.78 MB/sec), 108.1 ms avg latency, 523.0 ms max latency.
160987 records sent, 32165.2 records/sec (368.59 MB/sec), 109.2 ms avg latency, 583.0 ms max latency.
156126 records sent, 31194.0 records/sec (357.46 MB/sec), 110.2 ms avg latency, 890.0 ms max latency.
143638 records sent, 28727.6 records/sec (329.20 MB/sec), 100.5 ms avg latency, 517.0 ms max latency.
81710 records sent, 13403.9 records/sec (153.60 MB/sec), 144.7 ms avg latency, 2654.0 ms max latency.
5414 records sent, 1036.6 records/sec (11.88 MB/sec), 2016.0 ms avg latency, 7246.0 ms max latency.
82215 records sent, 16423.3 records/sec (188.20 MB/sec), 307.1 ms avg latency, 8694.0 ms max latency.
88897 records sent, 17768.7 records/sec (203.62 MB/sec), 167.4 ms avg latency, 1725.0 ms max latency.
151365 records sent, 30248.8 records/sec (346.63 MB/sec), 108.6 ms avg latency, 493.0 ms max latency.
156538 records sent, 31301.3 records/sec (358.69 MB/sec), 109.5 ms avg latency, 523.0 ms max latency.
130701 records sent, 26114.1 records/sec (299.25 MB/sec), 113.8 ms avg latency, 674.0 ms max latency.
67604 records sent, 13496.5 records/sec (154.66 MB/sec), 197.0 ms avg latency, 2677.0 ms max latency.
65918 records sent, 13181.0 records/sec (151.05 MB/sec), 184.6 ms avg latency, 2619.0 ms max latency.
101673 records sent, 20314.3 records/sec (232.79 MB/sec), 132.8 ms avg latency, 1593.0 ms max latency.
120567 records sent, 24089.3 records/sec (276.05 MB/sec), 125.1 ms avg latency, 847.0 ms max latency.
153552 records sent, 30685.9 records/sec (351.64 MB/sec), 111.2 ms avg latency, 501.0 ms max latency.
148710 records sent, 29730.1 records/sec (340.69 MB/sec), 109.8 ms avg latency, 596.0 ms max latency.
90535 records sent, 17945.5 records/sec (205.64 MB/sec), 140.3 ms avg latency, 1202.0 ms max latency.
43865 records sent, 8641.6 records/sec (99.03 MB/sec), 216.2 ms avg latency, 2082.0 ms max latency.
56323 records sent, 11257.8 records/sec (129.01 MB/sec), 389.2 ms avg latency, 3426.0 ms max latency.
136051 records sent, 27204.8 records/sec (311.75 MB/sec), 110.0 ms avg latency, 648.0 ms max latency.
85620 records sent, 17120.6 records/sec (196.19 MB/sec), 189.7 ms avg latency, 2418.0 ms max latency.
141306 records sent, 28261.2 records/sec (323.85 MB/sec), 101.9 ms avg latency, 536.0 ms max latency.
140048 records sent, 28009.6 records/sec (320.97 MB/sec), 108.6 ms avg latency, 566.0 ms max latency.
109297 records sent, 21841.9 records/sec (250.29 MB/sec), 154.9 ms avg latency, 1034.0 ms max latency.
60731 records sent, 11715.1 records/sec (134.25 MB/sec), 179.8 ms avg latency, 2625.0 ms max latency.
24637 records sent, 4919.5 records/sec (56.37 MB/sec), 810.5 ms avg latency, 3651.0 ms max latency.
140499 records sent, 28071.7 records/sec (321.68 MB/sec), 107.1 ms avg latency, 593.0 ms max latency.
82061 records sent, 16408.9 records/sec (188.04 MB/sec), 197.2 ms avg latency, 1949.0 ms max latency.
151387 records sent, 30204.9 records/sec (346.13 MB/sec), 101.1 ms avg latency, 582.0 ms max latency.
126133 records sent, 25181.3 records/sec (288.56 MB/sec), 121.5 ms avg latency, 749.0 ms max latency.
98287 records sent, 19637.8 records/sec (225.04 MB/sec), 148.9 ms avg latency, 1070.0 ms max latency.
57611 records sent, 11515.3 records/sec (131.96 MB/sec), 228.0 ms avg latency, 1301.0 ms max latency.
54744 records sent, 10940.0 records/sec (125.37 MB/sec), 219.9 ms avg latency, 3090.0 ms max latency.
127183 records sent, 25411.2 records/sec (291.20 MB/sec), 130.2 ms avg latency, 1317.0 ms max latency.
114095 records sent, 22782.5 records/sec (261.07 MB/sec), 128.0 ms avg latency, 827.0 ms max latency.
135414 records sent, 27066.6 records/sec (310.17 MB/sec), 114.4 ms avg latency, 963.0 ms max latency.
89328 records sent, 17865.6 records/sec (204.73 MB/sec), 155.0 ms avg latency, 1446.0 ms max latency.
117350 records sent, 23441.9 records/sec (268.63 MB/sec), 140.3 ms avg latency, 1357.0 ms max latency.
59659 records sent, 11927.0 records/sec (136.68 MB/sec), 287.4 ms avg latency, 2629.0 ms max latency.
121268 records sent, 24239.1 records/sec (277.76 MB/sec), 148.9 ms avg latency, 2729.0 ms max latency.
125586 records sent, 25117.2 records/sec (287.83 MB/sec), 125.8 ms avg latency, 823.0 ms max latency.
135528 records sent, 27078.5 records/sec (310.30 MB/sec), 112.3 ms avg latency, 792.0 ms max latency.
147679 records sent, 29535.8 records/sec (338.46 MB/sec), 105.7 ms avg latency, 524.0 ms max latency.
104386 records sent, 20868.9 records/sec (239.14 MB/sec), 148.7 ms avg latency, 1920.0 ms max latency.
103882 records sent, 20776.4 records/sec (238.08 MB/sec), 157.5 ms avg latency, 2064.0 ms max latency.
42445 records sent, 8451.8 records/sec (96.85 MB/sec), 252.5 ms avg latency, 2314.0 ms max latency.
93666 records sent, 18722.0 records/sec (214.54 MB/sec), 226.7 ms avg latency, 3462.0 ms max latency.
101172 records sent, 20190.0 records/sec (231.36 MB/sec), 140.0 ms avg latency, 1601.0 ms max latency.
91260 records sent, 18241.1 records/sec (209.03 MB/sec), 135.5 ms avg latency, 1447.0 ms max latency.
157822 records sent, 31545.5 records/sec (361.49 MB/sec), 110.2 ms avg latency, 488.0 ms max latency.
107820 records sent, 21525.3 records/sec (246.67 MB/sec), 131.8 ms avg latency, 859.0 ms max latency.
65499 records sent, 12302.6 records/sec (140.98 MB/sec), 180.2 ms avg latency, 2417.0 ms max latency.
87593 records sent, 17501.1 records/sec (200.55 MB/sec), 229.3 ms avg latency, 1865.0 ms max latency.
6000000 records sent, 21145.896110 records/sec (242.32 MB/sec), 144.03 ms avg latency, 8694.00 ms max latency, 91 ms 50th, 359 ms 95th, 1160 ms 99th, 3182 ms 99.9th.

Metric Name                                                                                                            Value
app-info:commit-id:{client-id=producer-1}                                                                            : unknown
app-info:version:{client-id=producer-1}                                                                              : 2.2.1-kafka-4.1.0
kafka-metrics-count:count:{client-id=producer-1}                                                                     : 125.000
producer-metrics:batch-size-avg:{client-id=producer-1}                                                               : 27928.990
producer-metrics:batch-size-max:{client-id=producer-1}                                                               : 53510.000
producer-metrics:batch-split-rate:{client-id=producer-1}                                                             : 0.000
producer-metrics:batch-split-total:{client-id=producer-1}                                                            : 0.000
producer-metrics:buffer-available-bytes:{client-id=producer-1}                                                       : 33554432.000
producer-metrics:buffer-exhausted-rate:{client-id=producer-1}                                                        : 0.000
producer-metrics:buffer-exhausted-total:{client-id=producer-1}                                                       : 0.000
producer-metrics:buffer-total-bytes:{client-id=producer-1}                                                           : 33554432.000
producer-metrics:bufferpool-wait-ratio:{client-id=producer-1}                                                        : 0.627
producer-metrics:bufferpool-wait-time-total:{client-id=producer-1}                                                   : 163044386821.000
producer-metrics:compression-rate-avg:{client-id=producer-1}                                                         : 0.605
producer-metrics:connection-close-rate:{client-id=producer-1}                                                        : 0.000
producer-metrics:connection-close-total:{client-id=producer-1}                                                       : 2.000
producer-metrics:connection-count:{client-id=producer-1}                                                             : 3.000
producer-metrics:connection-creation-rate:{client-id=producer-1}                                                     : 0.000
producer-metrics:connection-creation-total:{client-id=producer-1}                                                    : 3.000
producer-metrics:failed-authentication-rate:{client-id=producer-1}                                                   : 0.000
producer-metrics:failed-authentication-total:{client-id=producer-1}                                                  : 0.000
producer-metrics:failed-reauthentication-rate:{client-id=producer-1}                                                 : 0.000
producer-metrics:failed-reauthentication-total:{client-id=producer-1}                                                : 0.000
producer-metrics:incoming-byte-rate:{client-id=producer-1}                                                           : 122909.831
producer-metrics:incoming-byte-total:{client-id=producer-1}                                                          : 36624593.000
producer-metrics:io-ratio:{client-id=producer-1}                                                                     : 0.103
producer-metrics:io-time-ns-avg:{client-id=producer-1}                                                               : 155782.854
producer-metrics:io-wait-ratio:{client-id=producer-1}                                                                : 0.423
producer-metrics:io-wait-time-ns-avg:{client-id=producer-1}                                                          : 640556.224
producer-metrics:io-waittime-total:{client-id=producer-1}                                                            : 102808922802.000
producer-metrics:iotime-total:{client-id=producer-1}                                                                 : 33032357708.000
producer-metrics:metadata-age:{client-id=producer-1}                                                                 : 283.529
producer-metrics:network-io-rate:{client-id=producer-1}                                                              : 244.207
producer-metrics:network-io-total:{client-id=producer-1}                                                             : 72510.000
producer-metrics:outgoing-byte-rate:{client-id=producer-1}                                                           : 106935747.320
producer-metrics:outgoing-byte-total:{client-id=producer-1}                                                          : 33992901291.000
producer-metrics:produce-throttle-time-avg:{client-id=producer-1}                                                    : 0.000
producer-metrics:produce-throttle-time-max:{client-id=producer-1}                                                    : 0.000
producer-metrics:reauthentication-latency-avg:{client-id=producer-1}                                                 : NaN
producer-metrics:reauthentication-latency-max:{client-id=producer-1}                                                 : NaN
producer-metrics:record-error-rate:{client-id=producer-1}                                                            : 0.000
producer-metrics:record-error-total:{client-id=producer-1}                                                           : 0.000
producer-metrics:record-queue-time-avg:{client-id=producer-1}                                                        : 100.057
producer-metrics:record-queue-time-max:{client-id=producer-1}                                                        : 3440.000
producer-metrics:record-retry-rate:{client-id=producer-1}                                                            : 0.000
producer-metrics:record-retry-total:{client-id=producer-1}                                                           : 0.000
producer-metrics:record-send-rate:{client-id=producer-1}                                                             : 18598.670
producer-metrics:record-send-total:{client-id=producer-1}                                                            : 6000000.000
producer-metrics:record-size-avg:{client-id=producer-1}                                                              : 12103.000
producer-metrics:record-size-max:{client-id=producer-1}                                                              : 12103.000
producer-metrics:records-per-request-avg:{client-id=producer-1}                                                      : 152.407
producer-metrics:request-latency-avg:{client-id=producer-1}                                                          : 30.006
producer-metrics:request-latency-max:{client-id=producer-1}                                                          : 1693.000
producer-metrics:request-rate:{client-id=producer-1}                                                                 : 122.077
producer-metrics:request-size-avg:{client-id=producer-1}                                                             : 875950.384
producer-metrics:request-size-max:{client-id=producer-1}                                                             : 1054112.000
producer-metrics:request-total:{client-id=producer-1}                                                                : 36255.000
producer-metrics:requests-in-flight:{client-id=producer-1}                                                           : 0.000
producer-metrics:response-rate:{client-id=producer-1}                                                                : 122.141
producer-metrics:response-total:{client-id=producer-1}                                                               : 36255.000
producer-metrics:select-rate:{client-id=producer-1}                                                                  : 659.815
producer-metrics:select-total:{client-id=producer-1}                                                                 : 197285.000
producer-metrics:successful-authentication-no-reauth-total:{client-id=producer-1}                                    : 0.000
producer-metrics:successful-authentication-rate:{client-id=producer-1}                                               : 0.000
producer-metrics:successful-authentication-total:{client-id=producer-1}                                              : 0.000
producer-metrics:successful-reauthentication-rate:{client-id=producer-1}                                             : 0.000
producer-metrics:successful-reauthentication-total:{client-id=producer-1}                                            : 0.000
producer-metrics:waiting-threads:{client-id=producer-1}                                                              : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node--1}                                     : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node--2}                                     : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-357}                                    : 61091.292
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-358}                                    : 62046.566
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node--1}                                    : 8843.000
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node--2}                                    : 0.000
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node-357}                                   : 18882528.000
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node-358}                                   : 17733222.000
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node--1}                                     : 0.000
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node--2}                                     : 0.000
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node-357}                                    : 53121988.622
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node-358}                                    : 54009460.328
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node--1}                                    : 99.000
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node--2}                                    : 0.000
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node-357}                                   : 17091951276.000
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node-358}                                   : 16900949916.000
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node--1}                                    : NaN
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node--2}                                    : NaN
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node-357}                                   : 30.531
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node-358}                                   : 29.481
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node--1}                                    : NaN
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node--2}                                    : NaN
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node-357}                                   : 1430.000
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node-358}                                   : 1693.000
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node--1}                                           : 0.000
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node--2}                                           : 0.000
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node-357}                                          : 61.032
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node-358}                                          : 61.253
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node--1}                                       : NaN
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node--2}                                       : NaN
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node-357}                                      : 870389.622
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node-358}                                      : 881740.177
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node--1}                                       : NaN
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node--2}                                       : NaN
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node-357}                                      : 1053528.000
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node-358}                                      : 1054112.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node--1}                                          : 2.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node--2}                                          : 0.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node-357}                                         : 18232.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node-358}                                         : 18021.000
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node--1}                                          : 0.000
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node--2}                                          : 0.000
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node-357}                                         : 61.094
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node-358}                                         : 61.272
producer-node-metrics:response-total:{client-id=producer-1, node-id=node--1}                                         : 2.000
producer-node-metrics:response-total:{client-id=producer-1, node-id=node--2}                                         : 0.000
producer-node-metrics:response-total:{client-id=producer-1, node-id=node-357}                                        : 18232.000
producer-node-metrics:response-total:{client-id=producer-1, node-id=node-358}                                        : 18021.000
producer-topic-metrics:byte-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}          : 106864118.709
producer-topic-metrics:byte-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}         : 33980657526.000
producer-topic-metrics:compression-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}   : 0.605
producer-topic-metrics:record-error-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}  : 0.000 producer-topic-metrics:record-error-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000 producer-topic-metrics:record-retry-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}  : 0.000 producer-topic-metrics:record-retry-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000
producer-topic-metrics:record-send-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}   : 18598.670
producer-topic-metrics:record-send-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}  : 6000000.000
19/10/03 14:24:16 INFO producer.KafkaProducer: [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
-----Original Message-----
From: Eric Owhadi <er...@esgyn.com>
Sent: Thursday, October 3, 2019 3:45 PM
To: users@kafka.apache.org
Subject: RE: poor producing performance with very low CPU utilization?

External

There is a key piece of information that should be critical to guess where the problem is:

When I change from ack = all to ack = 1, instead of increasing message/s, it actually devises it by half!

As if the problem is about how fast I produce data (given when I use ack 1 I assume I block less time in the synchronous send, and therefore my producing pump increases.

I wonder if some sort of contention happen when producer populate the 200 partition queues when the rate of production is high in the user thread?
Eric

-----Original Message-----
From: Eric Owhadi <er...@esgyn.com>
Sent: Thursday, October 3, 2019 1:33 PM
To: users@kafka.apache.org
Subject: RE: poor producing performance with very low CPU utilization?

External

Hi Eric,
Thanks a lot for your answer. Please find inline responses:

>>You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
>>Have you verified that you're not reaching any caps on your producer's machine?

The producer is on the same machine that the broker. Running very quiet, 3% CPU when I run my test. So no there is no stress on the producing side

>>I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the >>default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.

I have 200 partition on my topic, and the load is well balanced across all partition. So the math you are doing should be X200 right? In addition, I found that batch size had no effect, and the linger.ms was the triggering factor to cause a buffer send. I played with batch size and in flight number of request upward, and that had no effect.

>>This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all >>means your producer is going to wait for the records to be fully replicated before considering it complete.

>>Doing the math, you have ~200 records per second, but this is split 
>>between
>>2 brokers. This means you're producing 100 records per second per broker.
>>Simplifying a bit to 25 records in flight per broker, that's a latency 
>>of
>>~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch >>to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

given all is local (producer running on same node as broker), and the size of my node (80 vcore), I hope I don t need 250ms to do that...
The equivalent workload on hbase2.0  is 10 to 20X faster (and that include same replica config etc).

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:


-----Original Message-----
From: Eric Azama <ea...@gmail.com>
Sent: Thursday, October 3, 2019 1:07 PM
To: users@kafka.apache.org
Subject: Re: poor producing performance with very low CPU utilization?

External

Hi Eric,

You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
Have you verified that you're not reaching any caps on your producer's machine?

I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.
This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all means your producer is going to wait for the records to be fully replicated before considering it complete.

Doing the math, you have ~200 records per second, but this is split between
2 brokers. This means you're producing 100 records per second per broker.
Simplifying a bit to 25 records in flight per broker, that's a latency of
~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:

> Hi Jamie,
> Thanks for the hint. I played with these parameters, and found only 
> linger.ms is playing a significant role for my test case.
> It is very sensitive and highly non linear.
> I get these results:
> Linger.ms       message per second
> 80              100
> 84              205
> 85              215 -> top
> 86              213
> 90              205
> 95              195
> 100             187
> 200             100
>
> So as you can see, this is very sensitive and one can miss the peek easily.
> However, 200 messages per second for 2 powerful nodes and relatively 
> small message (12016bytes) is still at least 10X bellow what I would have hoped.
> When I see system resources still being barely moving, with cpu at 3%, 
> I am sure something is not right.
> Regards,
> Eric
>
> -----Original Message-----
> From: Jamie <ja...@aol.co.uk.INVALID>
> Sent: Wednesday, October 2, 2019 4:27 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> I found increasing the linger.ms to between 50-100 ms significantly 
> increases performance (fewer larger requests instead of many small 
> ones), I'd also increase the batch size and the buffer.memory.
> Thanks,
> Jamie
>
>
> -----Original Message-----
> From: Eric Owhadi <er...@esgyn.com>
> To: users@kafka.apache.org <us...@kafka.apache.org>
> Sent: Wed, 2 Oct 2019 16:42
> Subject: poor producing performance with very low CPU utilization?
>
> Hi Kafka users,
> I am new to Kafka and am struggling with getting acceptable producing rate.
> I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting 
> hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as 
> part of cloudera parcel, with 5GB java heap.
> Producer version: Kafka client 2.2.1
>
> Wed Oct  2 07:56:59 PDT 2019
> JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> Using -XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> 1462d01d2_pid6908.hprof
> -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as 
> CSD_JAVA_OPTS Using 
> /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf 
> dir Using scripts/control.sh as process script 
> CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CMF_CONF_DIR=/etc/cloudera-scm-agent
>
> Date: Wed Oct  2 07:56:59 PDT 2019
> Host: xxxxx.esgyn.local
> Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> Zookeeper Quorum:
> xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> Zookeeper Chroot:
> PORT: 9092
> JMX_PORT: 9393
> SSL_PORT: 9093
> ENABLE_MONITORING: true
> METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> BROKER_HEAP_SIZE: 5120
> BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
> -Djava.awt.headless=true
> BROKER_SSL_ENABLED: false
> KERBEROS_AUTH_ENABLED: false
> KAFKA_PRINCIPAL:
> SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> SUPER_USERS: kafka
> Kafka version found: 2.2.1-kafka4.1.0
> Sentry version found: 1.5.1-cdh5.15.0
> ZK_PRINCIPAL_NAME: zookeeper
> Final Zookeeper Quorum is
> xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> security.inter.broker.protocol inferred as PLAINTEXT 
> LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
>
> I am producing messages of 12016 bytes uncompressed, then snappy 
> compressed by kafka.
> I am using a topic with 200 partitions, and a custom partitioner that 
> I verified is doing good job at spreading the load on the 2 brokers.
>
> My producer config look like:
>
> kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
>                     kafkaProps.put("key.serializer", 
> "org.apache.kafka.common.serialization.LongSerializer");
>
> kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> llectorVectorSerializer");
>
> kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
>                     kafkaProps.put("compression.type","snappy");
>                     kafkaProps.put("batch.size","65536");
>                     kafkaProps.put("acks", "all");
>                     kafkaProps.put("linger.ms","1");
>
> I tried first doing fire and forget send, thinking I would get best 
> performance.
> Then I tried synchronous send, and amazingly found that I would get 
> better performance with sync send.
>
> However, after 1 or 2 minute of load test, I start getting error on 
> the synchronous send like this:
> ava.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) 
> for
> DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed 
> since batch creation
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>         at
> org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
>         at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>         at org.eclipse.jetty.server.Server.handle(Server.java:505)
>         at
> org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
>         at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
>         at org.eclipse.jetty.io
> .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
>         at org.eclipse.jetty.io
> .FillInterest.fillable(FillInterest.java:103)
>         at org.eclipse.jetty.io
> .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
>         at
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000
> ms has passed since batch creation
>
> So I am suspecting that I am producing too fast, and brokers cannot 
> catch up.
> I tried bumping up the io thread from default 8 to 40, that did not help.
>
> I am getting a producing rate of only about 100 message per seconds, 
> and about 1 Megabyte per seconds according to kafka metrics.
> The CPU utilization is barely noticeable (3%), network is ridiculously 
> unaffected, and having googled around, this is not the kind of perf I 
> should expect out of my config. I was hoping for at least 10X more if 
> not 100X better. Was my expectations too high, or am I missing 
> something in config that is causing this performance numbers?
>
> Some details: I produce using a jetty custom handler that I verified 
> to be super-fast when I am not producing (commenting out the send()), 
> and I am using a single (I also tried with 2) producer reused on all jetty threads.
>
> Any help/clue would be much appreciated, Thanks in advance, Eric 
> Owhadi Esgyn Corporation.
>
>
>

RE: poor producing performance with very low CPU utilization?

Posted by Eric Owhadi <er...@esgyn.com>.
To test if my backend was good, I tried using the kafka-producer-perf-test. Boy that was fast!

Instead of my lame 200 message per seconds, I am getting 20 000 message per seconds. 100X....
That is more in line with my expectations. Granted that this test does not use my custom partitioner and serializer. I will try add variables one after the other, but definitelly the bottleneck is not the server :-).


kafka-producer-perf-test --num-records 6000000 --record-size 12016 --topic DEFAULT-.TIMESERIES.SmartpumpCollectorVector --throughput 1000000   --print-metrics --producer-props bootstrap.servers=nap052.esgyn.local:9092,localhost:9092  compression.type=snappy batch.size=65536 acks=all linger.ms=85
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/ESGYNDB-2.7.0-A1/traf_home/export/lib/orc-tools-1.5.0-uber.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka/libs/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/10/03 14:19:33 INFO producer.ProducerConfig: ProducerConfig values:
        acks = all
        batch.size = 65536
        bootstrap.servers = [xxx.esgyn.local:9092, localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = snappy
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 85
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

19/10/03 14:19:33 INFO utils.AppInfoParser: Kafka version: 2.2.1-kafka-4.1.0
19/10/03 14:19:33 INFO utils.AppInfoParser: Kafka commitId: unknown
19/10/03 14:19:33 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
19/10/03 14:19:33 WARN clients.NetworkClient: [Producer clientId=producer-1] Connection to node -2 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
19/10/03 14:19:33 INFO clients.Metadata: Cluster ID: t4ZcV5AwRq6AK0scvEyf6w
128382 records sent, 25630.3 records/sec (293.71 MB/sec), 106.6 ms avg latency, 630.0 ms max latency.
137267 records sent, 27447.9 records/sec (314.54 MB/sec), 103.1 ms avg latency, 618.0 ms max latency.
154956 records sent, 30960.2 records/sec (354.78 MB/sec), 108.1 ms avg latency, 523.0 ms max latency.
160987 records sent, 32165.2 records/sec (368.59 MB/sec), 109.2 ms avg latency, 583.0 ms max latency.
156126 records sent, 31194.0 records/sec (357.46 MB/sec), 110.2 ms avg latency, 890.0 ms max latency.
143638 records sent, 28727.6 records/sec (329.20 MB/sec), 100.5 ms avg latency, 517.0 ms max latency.
81710 records sent, 13403.9 records/sec (153.60 MB/sec), 144.7 ms avg latency, 2654.0 ms max latency.
5414 records sent, 1036.6 records/sec (11.88 MB/sec), 2016.0 ms avg latency, 7246.0 ms max latency.
82215 records sent, 16423.3 records/sec (188.20 MB/sec), 307.1 ms avg latency, 8694.0 ms max latency.
88897 records sent, 17768.7 records/sec (203.62 MB/sec), 167.4 ms avg latency, 1725.0 ms max latency.
151365 records sent, 30248.8 records/sec (346.63 MB/sec), 108.6 ms avg latency, 493.0 ms max latency.
156538 records sent, 31301.3 records/sec (358.69 MB/sec), 109.5 ms avg latency, 523.0 ms max latency.
130701 records sent, 26114.1 records/sec (299.25 MB/sec), 113.8 ms avg latency, 674.0 ms max latency.
67604 records sent, 13496.5 records/sec (154.66 MB/sec), 197.0 ms avg latency, 2677.0 ms max latency.
65918 records sent, 13181.0 records/sec (151.05 MB/sec), 184.6 ms avg latency, 2619.0 ms max latency.
101673 records sent, 20314.3 records/sec (232.79 MB/sec), 132.8 ms avg latency, 1593.0 ms max latency.
120567 records sent, 24089.3 records/sec (276.05 MB/sec), 125.1 ms avg latency, 847.0 ms max latency.
153552 records sent, 30685.9 records/sec (351.64 MB/sec), 111.2 ms avg latency, 501.0 ms max latency.
148710 records sent, 29730.1 records/sec (340.69 MB/sec), 109.8 ms avg latency, 596.0 ms max latency.
90535 records sent, 17945.5 records/sec (205.64 MB/sec), 140.3 ms avg latency, 1202.0 ms max latency.
43865 records sent, 8641.6 records/sec (99.03 MB/sec), 216.2 ms avg latency, 2082.0 ms max latency.
56323 records sent, 11257.8 records/sec (129.01 MB/sec), 389.2 ms avg latency, 3426.0 ms max latency.
136051 records sent, 27204.8 records/sec (311.75 MB/sec), 110.0 ms avg latency, 648.0 ms max latency.
85620 records sent, 17120.6 records/sec (196.19 MB/sec), 189.7 ms avg latency, 2418.0 ms max latency.
141306 records sent, 28261.2 records/sec (323.85 MB/sec), 101.9 ms avg latency, 536.0 ms max latency.
140048 records sent, 28009.6 records/sec (320.97 MB/sec), 108.6 ms avg latency, 566.0 ms max latency.
109297 records sent, 21841.9 records/sec (250.29 MB/sec), 154.9 ms avg latency, 1034.0 ms max latency.
60731 records sent, 11715.1 records/sec (134.25 MB/sec), 179.8 ms avg latency, 2625.0 ms max latency.
24637 records sent, 4919.5 records/sec (56.37 MB/sec), 810.5 ms avg latency, 3651.0 ms max latency.
140499 records sent, 28071.7 records/sec (321.68 MB/sec), 107.1 ms avg latency, 593.0 ms max latency.
82061 records sent, 16408.9 records/sec (188.04 MB/sec), 197.2 ms avg latency, 1949.0 ms max latency.
151387 records sent, 30204.9 records/sec (346.13 MB/sec), 101.1 ms avg latency, 582.0 ms max latency.
126133 records sent, 25181.3 records/sec (288.56 MB/sec), 121.5 ms avg latency, 749.0 ms max latency.
98287 records sent, 19637.8 records/sec (225.04 MB/sec), 148.9 ms avg latency, 1070.0 ms max latency.
57611 records sent, 11515.3 records/sec (131.96 MB/sec), 228.0 ms avg latency, 1301.0 ms max latency.
54744 records sent, 10940.0 records/sec (125.37 MB/sec), 219.9 ms avg latency, 3090.0 ms max latency.
127183 records sent, 25411.2 records/sec (291.20 MB/sec), 130.2 ms avg latency, 1317.0 ms max latency.
114095 records sent, 22782.5 records/sec (261.07 MB/sec), 128.0 ms avg latency, 827.0 ms max latency.
135414 records sent, 27066.6 records/sec (310.17 MB/sec), 114.4 ms avg latency, 963.0 ms max latency.
89328 records sent, 17865.6 records/sec (204.73 MB/sec), 155.0 ms avg latency, 1446.0 ms max latency.
117350 records sent, 23441.9 records/sec (268.63 MB/sec), 140.3 ms avg latency, 1357.0 ms max latency.
59659 records sent, 11927.0 records/sec (136.68 MB/sec), 287.4 ms avg latency, 2629.0 ms max latency.
121268 records sent, 24239.1 records/sec (277.76 MB/sec), 148.9 ms avg latency, 2729.0 ms max latency.
125586 records sent, 25117.2 records/sec (287.83 MB/sec), 125.8 ms avg latency, 823.0 ms max latency.
135528 records sent, 27078.5 records/sec (310.30 MB/sec), 112.3 ms avg latency, 792.0 ms max latency.
147679 records sent, 29535.8 records/sec (338.46 MB/sec), 105.7 ms avg latency, 524.0 ms max latency.
104386 records sent, 20868.9 records/sec (239.14 MB/sec), 148.7 ms avg latency, 1920.0 ms max latency.
103882 records sent, 20776.4 records/sec (238.08 MB/sec), 157.5 ms avg latency, 2064.0 ms max latency.
42445 records sent, 8451.8 records/sec (96.85 MB/sec), 252.5 ms avg latency, 2314.0 ms max latency.
93666 records sent, 18722.0 records/sec (214.54 MB/sec), 226.7 ms avg latency, 3462.0 ms max latency.
101172 records sent, 20190.0 records/sec (231.36 MB/sec), 140.0 ms avg latency, 1601.0 ms max latency.
91260 records sent, 18241.1 records/sec (209.03 MB/sec), 135.5 ms avg latency, 1447.0 ms max latency.
157822 records sent, 31545.5 records/sec (361.49 MB/sec), 110.2 ms avg latency, 488.0 ms max latency.
107820 records sent, 21525.3 records/sec (246.67 MB/sec), 131.8 ms avg latency, 859.0 ms max latency.
65499 records sent, 12302.6 records/sec (140.98 MB/sec), 180.2 ms avg latency, 2417.0 ms max latency.
87593 records sent, 17501.1 records/sec (200.55 MB/sec), 229.3 ms avg latency, 1865.0 ms max latency.
6000000 records sent, 21145.896110 records/sec (242.32 MB/sec), 144.03 ms avg latency, 8694.00 ms max latency, 91 ms 50th, 359 ms 95th, 1160 ms 99th, 3182 ms 99.9th.

Metric Name                                                                                                            Value
app-info:commit-id:{client-id=producer-1}                                                                            : unknown
app-info:version:{client-id=producer-1}                                                                              : 2.2.1-kafka-4.1.0
kafka-metrics-count:count:{client-id=producer-1}                                                                     : 125.000
producer-metrics:batch-size-avg:{client-id=producer-1}                                                               : 27928.990
producer-metrics:batch-size-max:{client-id=producer-1}                                                               : 53510.000
producer-metrics:batch-split-rate:{client-id=producer-1}                                                             : 0.000
producer-metrics:batch-split-total:{client-id=producer-1}                                                            : 0.000
producer-metrics:buffer-available-bytes:{client-id=producer-1}                                                       : 33554432.000
producer-metrics:buffer-exhausted-rate:{client-id=producer-1}                                                        : 0.000
producer-metrics:buffer-exhausted-total:{client-id=producer-1}                                                       : 0.000
producer-metrics:buffer-total-bytes:{client-id=producer-1}                                                           : 33554432.000
producer-metrics:bufferpool-wait-ratio:{client-id=producer-1}                                                        : 0.627
producer-metrics:bufferpool-wait-time-total:{client-id=producer-1}                                                   : 163044386821.000
producer-metrics:compression-rate-avg:{client-id=producer-1}                                                         : 0.605
producer-metrics:connection-close-rate:{client-id=producer-1}                                                        : 0.000
producer-metrics:connection-close-total:{client-id=producer-1}                                                       : 2.000
producer-metrics:connection-count:{client-id=producer-1}                                                             : 3.000
producer-metrics:connection-creation-rate:{client-id=producer-1}                                                     : 0.000
producer-metrics:connection-creation-total:{client-id=producer-1}                                                    : 3.000
producer-metrics:failed-authentication-rate:{client-id=producer-1}                                                   : 0.000
producer-metrics:failed-authentication-total:{client-id=producer-1}                                                  : 0.000
producer-metrics:failed-reauthentication-rate:{client-id=producer-1}                                                 : 0.000
producer-metrics:failed-reauthentication-total:{client-id=producer-1}                                                : 0.000
producer-metrics:incoming-byte-rate:{client-id=producer-1}                                                           : 122909.831
producer-metrics:incoming-byte-total:{client-id=producer-1}                                                          : 36624593.000
producer-metrics:io-ratio:{client-id=producer-1}                                                                     : 0.103
producer-metrics:io-time-ns-avg:{client-id=producer-1}                                                               : 155782.854
producer-metrics:io-wait-ratio:{client-id=producer-1}                                                                : 0.423
producer-metrics:io-wait-time-ns-avg:{client-id=producer-1}                                                          : 640556.224
producer-metrics:io-waittime-total:{client-id=producer-1}                                                            : 102808922802.000
producer-metrics:iotime-total:{client-id=producer-1}                                                                 : 33032357708.000
producer-metrics:metadata-age:{client-id=producer-1}                                                                 : 283.529
producer-metrics:network-io-rate:{client-id=producer-1}                                                              : 244.207
producer-metrics:network-io-total:{client-id=producer-1}                                                             : 72510.000
producer-metrics:outgoing-byte-rate:{client-id=producer-1}                                                           : 106935747.320
producer-metrics:outgoing-byte-total:{client-id=producer-1}                                                          : 33992901291.000
producer-metrics:produce-throttle-time-avg:{client-id=producer-1}                                                    : 0.000
producer-metrics:produce-throttle-time-max:{client-id=producer-1}                                                    : 0.000
producer-metrics:reauthentication-latency-avg:{client-id=producer-1}                                                 : NaN
producer-metrics:reauthentication-latency-max:{client-id=producer-1}                                                 : NaN
producer-metrics:record-error-rate:{client-id=producer-1}                                                            : 0.000
producer-metrics:record-error-total:{client-id=producer-1}                                                           : 0.000
producer-metrics:record-queue-time-avg:{client-id=producer-1}                                                        : 100.057
producer-metrics:record-queue-time-max:{client-id=producer-1}                                                        : 3440.000
producer-metrics:record-retry-rate:{client-id=producer-1}                                                            : 0.000
producer-metrics:record-retry-total:{client-id=producer-1}                                                           : 0.000
producer-metrics:record-send-rate:{client-id=producer-1}                                                             : 18598.670
producer-metrics:record-send-total:{client-id=producer-1}                                                            : 6000000.000
producer-metrics:record-size-avg:{client-id=producer-1}                                                              : 12103.000
producer-metrics:record-size-max:{client-id=producer-1}                                                              : 12103.000
producer-metrics:records-per-request-avg:{client-id=producer-1}                                                      : 152.407
producer-metrics:request-latency-avg:{client-id=producer-1}                                                          : 30.006
producer-metrics:request-latency-max:{client-id=producer-1}                                                          : 1693.000
producer-metrics:request-rate:{client-id=producer-1}                                                                 : 122.077
producer-metrics:request-size-avg:{client-id=producer-1}                                                             : 875950.384
producer-metrics:request-size-max:{client-id=producer-1}                                                             : 1054112.000
producer-metrics:request-total:{client-id=producer-1}                                                                : 36255.000
producer-metrics:requests-in-flight:{client-id=producer-1}                                                           : 0.000
producer-metrics:response-rate:{client-id=producer-1}                                                                : 122.141
producer-metrics:response-total:{client-id=producer-1}                                                               : 36255.000
producer-metrics:select-rate:{client-id=producer-1}                                                                  : 659.815
producer-metrics:select-total:{client-id=producer-1}                                                                 : 197285.000
producer-metrics:successful-authentication-no-reauth-total:{client-id=producer-1}                                    : 0.000
producer-metrics:successful-authentication-rate:{client-id=producer-1}                                               : 0.000
producer-metrics:successful-authentication-total:{client-id=producer-1}                                              : 0.000
producer-metrics:successful-reauthentication-rate:{client-id=producer-1}                                             : 0.000
producer-metrics:successful-reauthentication-total:{client-id=producer-1}                                            : 0.000
producer-metrics:waiting-threads:{client-id=producer-1}                                                              : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node--1}                                     : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node--2}                                     : 0.000
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-357}                                    : 61091.292
producer-node-metrics:incoming-byte-rate:{client-id=producer-1, node-id=node-358}                                    : 62046.566
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node--1}                                    : 8843.000
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node--2}                                    : 0.000
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node-357}                                   : 18882528.000
producer-node-metrics:incoming-byte-total:{client-id=producer-1, node-id=node-358}                                   : 17733222.000
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node--1}                                     : 0.000
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node--2}                                     : 0.000
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node-357}                                    : 53121988.622
producer-node-metrics:outgoing-byte-rate:{client-id=producer-1, node-id=node-358}                                    : 54009460.328
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node--1}                                    : 99.000
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node--2}                                    : 0.000
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node-357}                                   : 17091951276.000
producer-node-metrics:outgoing-byte-total:{client-id=producer-1, node-id=node-358}                                   : 16900949916.000
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node--1}                                    : NaN
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node--2}                                    : NaN
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node-357}                                   : 30.531
producer-node-metrics:request-latency-avg:{client-id=producer-1, node-id=node-358}                                   : 29.481
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node--1}                                    : NaN
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node--2}                                    : NaN
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node-357}                                   : 1430.000
producer-node-metrics:request-latency-max:{client-id=producer-1, node-id=node-358}                                   : 1693.000
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node--1}                                           : 0.000
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node--2}                                           : 0.000
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node-357}                                          : 61.032
producer-node-metrics:request-rate:{client-id=producer-1, node-id=node-358}                                          : 61.253
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node--1}                                       : NaN
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node--2}                                       : NaN
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node-357}                                      : 870389.622
producer-node-metrics:request-size-avg:{client-id=producer-1, node-id=node-358}                                      : 881740.177
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node--1}                                       : NaN
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node--2}                                       : NaN
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node-357}                                      : 1053528.000
producer-node-metrics:request-size-max:{client-id=producer-1, node-id=node-358}                                      : 1054112.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node--1}                                          : 2.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node--2}                                          : 0.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node-357}                                         : 18232.000
producer-node-metrics:request-total:{client-id=producer-1, node-id=node-358}                                         : 18021.000
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node--1}                                          : 0.000
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node--2}                                          : 0.000
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node-357}                                         : 61.094
producer-node-metrics:response-rate:{client-id=producer-1, node-id=node-358}                                         : 61.272
producer-node-metrics:response-total:{client-id=producer-1, node-id=node--1}                                         : 2.000
producer-node-metrics:response-total:{client-id=producer-1, node-id=node--2}                                         : 0.000
producer-node-metrics:response-total:{client-id=producer-1, node-id=node-357}                                        : 18232.000
producer-node-metrics:response-total:{client-id=producer-1, node-id=node-358}                                        : 18021.000
producer-topic-metrics:byte-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}          : 106864118.709
producer-topic-metrics:byte-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}         : 33980657526.000
producer-topic-metrics:compression-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}   : 0.605
producer-topic-metrics:record-error-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}  : 0.000
producer-topic-metrics:record-error-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000
producer-topic-metrics:record-retry-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}  : 0.000
producer-topic-metrics:record-retry-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector} : 0.000
producer-topic-metrics:record-send-rate:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}   : 18598.670
producer-topic-metrics:record-send-total:{client-id=producer-1, topic=DEFAULT-.TIMESERIES.SmartpumpCollectorVector}  : 6000000.000
19/10/03 14:24:16 INFO producer.KafkaProducer: [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
-----Original Message-----
From: Eric Owhadi <er...@esgyn.com> 
Sent: Thursday, October 3, 2019 3:45 PM
To: users@kafka.apache.org
Subject: RE: poor producing performance with very low CPU utilization?

External

There is a key piece of information that should be critical to guess where the problem is:

When I change from ack = all to ack = 1, instead of increasing message/s, it actually devises it by half!

As if the problem is about how fast I produce data (given when I use ack 1 I assume I block less time in the synchronous send, and therefore my producing pump increases.

I wonder if some sort of contention happen when producer populate the 200 partition queues when the rate of production is high in the user thread?
Eric

-----Original Message-----
From: Eric Owhadi <er...@esgyn.com>
Sent: Thursday, October 3, 2019 1:33 PM
To: users@kafka.apache.org
Subject: RE: poor producing performance with very low CPU utilization?

External

Hi Eric,
Thanks a lot for your answer. Please find inline responses:

>>You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
>>Have you verified that you're not reaching any caps on your producer's machine?

The producer is on the same machine that the broker. Running very quiet, 3% CPU when I run my test. So no there is no stress on the producing side

>>I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the >>default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.

I have 200 partition on my topic, and the load is well balanced across all partition. So the math you are doing should be X200 right? In addition, I found that batch size had no effect, and the linger.ms was the triggering factor to cause a buffer send. I played with batch size and in flight number of request upward, and that had no effect.

>>This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all >>means your producer is going to wait for the records to be fully replicated before considering it complete.

>>Doing the math, you have ~200 records per second, but this is split 
>>between
>>2 brokers. This means you're producing 100 records per second per broker.
>>Simplifying a bit to 25 records in flight per broker, that's a latency 
>>of
>>~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch >>to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

given all is local (producer running on same node as broker), and the size of my node (80 vcore), I hope I don t need 250ms to do that...
The equivalent workload on hbase2.0  is 10 to 20X faster (and that include same replica config etc).

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:


-----Original Message-----
From: Eric Azama <ea...@gmail.com>
Sent: Thursday, October 3, 2019 1:07 PM
To: users@kafka.apache.org
Subject: Re: poor producing performance with very low CPU utilization?

External

Hi Eric,

You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
Have you verified that you're not reaching any caps on your producer's machine?

I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.
This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all means your producer is going to wait for the records to be fully replicated before considering it complete.

Doing the math, you have ~200 records per second, but this is split between
2 brokers. This means you're producing 100 records per second per broker.
Simplifying a bit to 25 records in flight per broker, that's a latency of
~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:

> Hi Jamie,
> Thanks for the hint. I played with these parameters, and found only 
> linger.ms is playing a significant role for my test case.
> It is very sensitive and highly non linear.
> I get these results:
> Linger.ms       message per second
> 80              100
> 84              205
> 85              215 -> top
> 86              213
> 90              205
> 95              195
> 100             187
> 200             100
>
> So as you can see, this is very sensitive and one can miss the peek easily.
> However, 200 messages per second for 2 powerful nodes and relatively 
> small message (12016bytes) is still at least 10X bellow what I would have hoped.
> When I see system resources still being barely moving, with cpu at 3%, 
> I am sure something is not right.
> Regards,
> Eric
>
> -----Original Message-----
> From: Jamie <ja...@aol.co.uk.INVALID>
> Sent: Wednesday, October 2, 2019 4:27 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> I found increasing the linger.ms to between 50-100 ms significantly 
> increases performance (fewer larger requests instead of many small 
> ones), I'd also increase the batch size and the buffer.memory.
> Thanks,
> Jamie
>
>
> -----Original Message-----
> From: Eric Owhadi <er...@esgyn.com>
> To: users@kafka.apache.org <us...@kafka.apache.org>
> Sent: Wed, 2 Oct 2019 16:42
> Subject: poor producing performance with very low CPU utilization?
>
> Hi Kafka users,
> I am new to Kafka and am struggling with getting acceptable producing rate.
> I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting 
> hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as 
> part of cloudera parcel, with 5GB java heap.
> Producer version: Kafka client 2.2.1
>
> Wed Oct  2 07:56:59 PDT 2019
> JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> Using -XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> 1462d01d2_pid6908.hprof
> -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as 
> CSD_JAVA_OPTS Using 
> /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf 
> dir Using scripts/control.sh as process script 
> CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CMF_CONF_DIR=/etc/cloudera-scm-agent
>
> Date: Wed Oct  2 07:56:59 PDT 2019
> Host: xxxxx.esgyn.local
> Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> Zookeeper Quorum:
> xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> Zookeeper Chroot:
> PORT: 9092
> JMX_PORT: 9393
> SSL_PORT: 9093
> ENABLE_MONITORING: true
> METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> BROKER_HEAP_SIZE: 5120
> BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
> -Djava.awt.headless=true
> BROKER_SSL_ENABLED: false
> KERBEROS_AUTH_ENABLED: false
> KAFKA_PRINCIPAL:
> SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> SUPER_USERS: kafka
> Kafka version found: 2.2.1-kafka4.1.0
> Sentry version found: 1.5.1-cdh5.15.0
> ZK_PRINCIPAL_NAME: zookeeper
> Final Zookeeper Quorum is
> xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> security.inter.broker.protocol inferred as PLAINTEXT 
> LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
>
> I am producing messages of 12016 bytes uncompressed, then snappy 
> compressed by kafka.
> I am using a topic with 200 partitions, and a custom partitioner that 
> I verified is doing good job at spreading the load on the 2 brokers.
>
> My producer config look like:
>
> kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
>                     kafkaProps.put("key.serializer", 
> "org.apache.kafka.common.serialization.LongSerializer");
>
> kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> llectorVectorSerializer");
>
> kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
>                     kafkaProps.put("compression.type","snappy");
>                     kafkaProps.put("batch.size","65536");
>                     kafkaProps.put("acks", "all");
>                     kafkaProps.put("linger.ms","1");
>
> I tried first doing fire and forget send, thinking I would get best 
> performance.
> Then I tried synchronous send, and amazingly found that I would get 
> better performance with sync send.
>
> However, after 1 or 2 minute of load test, I start getting error on 
> the synchronous send like this:
> ava.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) 
> for
> DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed 
> since batch creation
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>         at
> org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
>         at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>         at org.eclipse.jetty.server.Server.handle(Server.java:505)
>         at
> org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
>         at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
>         at org.eclipse.jetty.io
> .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
>         at org.eclipse.jetty.io
> .FillInterest.fillable(FillInterest.java:103)
>         at org.eclipse.jetty.io
> .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
>         at
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000
> ms has passed since batch creation
>
> So I am suspecting that I am producing too fast, and brokers cannot 
> catch up.
> I tried bumping up the io thread from default 8 to 40, that did not help.
>
> I am getting a producing rate of only about 100 message per seconds, 
> and about 1 Megabyte per seconds according to kafka metrics.
> The CPU utilization is barely noticeable (3%), network is ridiculously 
> unaffected, and having googled around, this is not the kind of perf I 
> should expect out of my config. I was hoping for at least 10X more if 
> not 100X better. Was my expectations too high, or am I missing 
> something in config that is causing this performance numbers?
>
> Some details: I produce using a jetty custom handler that I verified 
> to be super-fast when I am not producing (commenting out the send()), 
> and I am using a single (I also tried with 2) producer reused on all jetty threads.
>
> Any help/clue would be much appreciated, Thanks in advance, Eric 
> Owhadi Esgyn Corporation.
>
>
>

Re: poor producing performance with very low CPU utilization?

Posted by Xiyuan Hu <xi...@gmail.com>.
Hi Eric,

I had the similar issue on my application. One thing I noticed is,
metric `kafka_consumer_io_wait_time_avg_seconds` is switching from 2
mins to 6 hours! Average is about 1-2 hours. This metric is a build-in
micrometer metric. I'm still googling the source code of this metric
to understand better of this metric.

Thanks

On Thu, Oct 3, 2019 at 4:54 PM Alexandru Ionita
<al...@gmail.com> wrote:
>
> This might help.
>
> Try to replicate the configuration this guy is using for benchmarking kafka.
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> Am Do., 3. Okt. 2019 um 22:45 Uhr schrieb Eric Owhadi <eric.owhadi@esgyn.com
> >:
>
> > There is a key piece of information that should be critical to guess where
> > the problem is:
> >
> > When I change from ack = all to ack = 1, instead of increasing message/s,
> > it actually devises it by half!
> >
> > As if the problem is about how fast I produce data (given when I use ack 1
> > I assume I block less time in the synchronous send, and therefore my
> > producing pump increases.
> >
> > I wonder if some sort of contention happen when producer populate the 200
> > partition queues when the rate of production is high in the user thread?
> > Eric
> >
> > -----Original Message-----
> > From: Eric Owhadi <er...@esgyn.com>
> > Sent: Thursday, October 3, 2019 1:33 PM
> > To: users@kafka.apache.org
> > Subject: RE: poor producing performance with very low CPU utilization?
> >
> > External
> >
> > Hi Eric,
> > Thanks a lot for your answer. Please find inline responses:
> >
> > >>You've given hardware information about your brokers, but I don't think
> > you've provided information about the machine your producer is running on.
> > >>Have you verified that you're not reaching any caps on your producer's
> > machine?
> >
> > The producer is on the same machine that the broker. Running very quiet,
> > 3% CPU when I run my test. So no there is no stress on the producing side
> >
> > >>I also think you might be hitting the limit of what a single producer is
> > capable of pushing through with your current setup. With record size of
> > ~12k and the >>default batch size configuration of 64k, you'll only be able
> > to send 5 records per batch. The default number of in flight batches is 5.
> >
> > I have 200 partition on my topic, and the load is well balanced across all
> > partition. So the math you are doing should be X200 right? In addition, I
> > found that batch size had no effect, and the linger.ms was the triggering
> > factor to cause a buffer send. I played with batch size and in flight
> > number of request upward, and that had no effect.
> >
> > >>This means at any given time, you'll only have 25 records in flight per
> > connection. I'm assuming your partitions are configured with at least 2
> > replicas. Acks=all >>means your producer is going to wait for the records
> > to be fully replicated before considering it complete.
> >
> > >>Doing the math, you have ~200 records per second, but this is split
> > >>between
> > >>2 brokers. This means you're producing 100 records per second per broker.
> > >>Simplifying a bit to 25 records in flight per broker, that's a latency
> > >>of
> > >>~250 ms to move around 300kb. At minimum, this includes the time to,
> > [compress the batch], [send the batch over the network to the leader],
> > [write the batch >>to the leader's log], [fetch the batch over the network
> > to the replica], [write the batch to the replica's log], and all of the
> > assorted responses to those calls.
> >
> > given all is local (producer running on same node as broker), and the size
> > of my node (80 vcore), I hope I don t need 250ms to do that...
> > The equivalent workload on hbase2.0  is 10 to 20X faster (and that include
> > same replica config etc).
> >
> > On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:
> >
> >
> > -----Original Message-----
> > From: Eric Azama <ea...@gmail.com>
> > Sent: Thursday, October 3, 2019 1:07 PM
> > To: users@kafka.apache.org
> > Subject: Re: poor producing performance with very low CPU utilization?
> >
> > External
> >
> > Hi Eric,
> >
> > You've given hardware information about your brokers, but I don't think
> > you've provided information about the machine your producer is running on.
> > Have you verified that you're not reaching any caps on your producer's
> > machine?
> >
> > I also think you might be hitting the limit of what a single producer is
> > capable of pushing through with your current setup. With record size of
> > ~12k and the default batch size configuration of 64k, you'll only be able
> > to send 5 records per batch. The default number of in flight batches is 5.
> > This means at any given time, you'll only have 25 records in flight per
> > connection. I'm assuming your partitions are configured with at least 2
> > replicas. Acks=all means your producer is going to wait for the records to
> > be fully replicated before considering it complete.
> >
> > Doing the math, you have ~200 records per second, but this is split between
> > 2 brokers. This means you're producing 100 records per second per broker.
> > Simplifying a bit to 25 records in flight per broker, that's a latency of
> > ~250 ms to move around 300kb. At minimum, this includes the time to,
> > [compress the batch], [send the batch over the network to the leader],
> > [write the batch to the leader's log], [fetch the batch over the network to
> > the replica], [write the batch to the replica's log], and all of the
> > assorted responses to those calls.
> >
> > On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:
> >
> > > Hi Jamie,
> > > Thanks for the hint. I played with these parameters, and found only
> > > linger.ms is playing a significant role for my test case.
> > > It is very sensitive and highly non linear.
> > > I get these results:
> > > Linger.ms       message per second
> > > 80              100
> > > 84              205
> > > 85              215 -> top
> > > 86              213
> > > 90              205
> > > 95              195
> > > 100             187
> > > 200             100
> > >
> > > So as you can see, this is very sensitive and one can miss the peek
> > easily.
> > > However, 200 messages per second for 2 powerful nodes and relatively
> > > small message (12016bytes) is still at least 10X bellow what I would
> > have hoped.
> > > When I see system resources still being barely moving, with cpu at 3%,
> > > I am sure something is not right.
> > > Regards,
> > > Eric
> > >
> > > -----Original Message-----
> > > From: Jamie <ja...@aol.co.uk.INVALID>
> > > Sent: Wednesday, October 2, 2019 4:27 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: poor producing performance with very low CPU utilization?
> > >
> > > External
> > >
> > > Hi Eric,
> > > I found increasing the linger.ms to between 50-100 ms significantly
> > > increases performance (fewer larger requests instead of many small
> > > ones), I'd also increase the batch size and the buffer.memory.
> > > Thanks,
> > > Jamie
> > >
> > >
> > > -----Original Message-----
> > > From: Eric Owhadi <er...@esgyn.com>
> > > To: users@kafka.apache.org <us...@kafka.apache.org>
> > > Sent: Wed, 2 Oct 2019 16:42
> > > Subject: poor producing performance with very low CPU utilization?
> > >
> > > Hi Kafka users,
> > > I am new to Kafka and am struggling with getting acceptable producing
> > rate.
> > > I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting
> > > hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as
> > > part of cloudera parcel, with 5GB java heap.
> > > Producer version: Kafka client 2.2.1
> > >
> > > Wed Oct  2 07:56:59 PDT 2019
> > > JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> > > Using -XX:+HeapDumpOnOutOfMemoryError
> > > -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> > > 1462d01d2_pid6908.hprof
> > > -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as
> > > CSD_JAVA_OPTS Using
> > > /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf
> > > dir Using scripts/control.sh as process script
> > > CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > > CMF_CONF_DIR=/etc/cloudera-scm-agent
> > >
> > > Date: Wed Oct  2 07:56:59 PDT 2019
> > > Host: xxxxx.esgyn.local
> > > Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > > CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > > KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> > > Zookeeper Quorum:
> > > xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> > > Zookeeper Chroot:
> > > PORT: 9092
> > > JMX_PORT: 9393
> > > SSL_PORT: 9093
> > > ENABLE_MONITORING: true
> > > METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> > > BROKER_HEAP_SIZE: 5120
> > > BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> > > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC
> > > -Djava.awt.headless=true
> > > BROKER_SSL_ENABLED: false
> > > KERBEROS_AUTH_ENABLED: false
> > > KAFKA_PRINCIPAL:
> > > SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> > > AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> > > SUPER_USERS: kafka
> > > Kafka version found: 2.2.1-kafka4.1.0
> > > Sentry version found: 1.5.1-cdh5.15.0
> > > ZK_PRINCIPAL_NAME: zookeeper
> > > Final Zookeeper Quorum is
> > > xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> > > security.inter.broker.protocol inferred as PLAINTEXT
> > > LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
> > >
> > > I am producing messages of 12016 bytes uncompressed, then snappy
> > > compressed by kafka.
> > > I am using a topic with 200 partitions, and a custom partitioner that
> > > I verified is doing good job at spreading the load on the 2 brokers.
> > >
> > > My producer config look like:
> > >
> > >
> > kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
> > >                     kafkaProps.put("key.serializer",
> > > "org.apache.kafka.common.serialization.LongSerializer");
> > >
> > > kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> > > llectorVectorSerializer");
> > >
> > >
> > kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
> > >                     kafkaProps.put("compression.type","snappy");
> > >                     kafkaProps.put("batch.size","65536");
> > >                     kafkaProps.put("acks", "all");
> > >                     kafkaProps.put("linger.ms","1");
> > >
> > > I tried first doing fire and forget send, thinking I would get best
> > > performance.
> > > Then I tried synchronous send, and amazingly found that I would get
> > > better performance with sync send.
> > >
> > > However, after 1 or 2 minute of load test, I start getting error on
> > > the synchronous send like this:
> > > ava.util.concurrent.ExecutionException:
> > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> > > for
> > > DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed
> > > since batch creation
> > >         at
> > >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
> > >         at
> > >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
> > >         at
> > >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
> > >         at
> > >
> > org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
> > >         at
> > >
> > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
> > >         at org.eclipse.jetty.server.Server.handle(Server.java:505)
> > >         at
> > > org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
> > >         at
> > >
> > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
> > >         at org.eclipse.jetty.io
> > > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
> > >         at org.eclipse.jetty.io
> > > .FillInterest.fillable(FillInterest.java:103)
> > >         at org.eclipse.jetty.io
> > > .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
> > >         at java.lang.Thread.run(Thread.java:748)
> > > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> > > record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000
> > > ms has passed since batch creation
> > >
> > > So I am suspecting that I am producing too fast, and brokers cannot
> > > catch up.
> > > I tried bumping up the io thread from default 8 to 40, that did not help.
> > >
> > > I am getting a producing rate of only about 100 message per seconds,
> > > and about 1 Megabyte per seconds according to kafka metrics.
> > > The CPU utilization is barely noticeable (3%), network is ridiculously
> > > unaffected, and having googled around, this is not the kind of perf I
> > > should expect out of my config. I was hoping for at least 10X more if
> > > not 100X better. Was my expectations too high, or am I missing
> > > something in config that is causing this performance numbers?
> > >
> > > Some details: I produce using a jetty custom handler that I verified
> > > to be super-fast when I am not producing (commenting out the send()),
> > > and I am using a single (I also tried with 2) producer reused on all
> > jetty threads.
> > >
> > > Any help/clue would be much appreciated, Thanks in advance, Eric
> > > Owhadi Esgyn Corporation.
> > >
> > >
> > >
> >

Re: poor producing performance with very low CPU utilization?

Posted by Alexandru Ionita <al...@gmail.com>.
This might help.

Try to replicate the configuration this guy is using for benchmarking kafka.
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Am Do., 3. Okt. 2019 um 22:45 Uhr schrieb Eric Owhadi <eric.owhadi@esgyn.com
>:

> There is a key piece of information that should be critical to guess where
> the problem is:
>
> When I change from ack = all to ack = 1, instead of increasing message/s,
> it actually devises it by half!
>
> As if the problem is about how fast I produce data (given when I use ack 1
> I assume I block less time in the synchronous send, and therefore my
> producing pump increases.
>
> I wonder if some sort of contention happen when producer populate the 200
> partition queues when the rate of production is high in the user thread?
> Eric
>
> -----Original Message-----
> From: Eric Owhadi <er...@esgyn.com>
> Sent: Thursday, October 3, 2019 1:33 PM
> To: users@kafka.apache.org
> Subject: RE: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> Thanks a lot for your answer. Please find inline responses:
>
> >>You've given hardware information about your brokers, but I don't think
> you've provided information about the machine your producer is running on.
> >>Have you verified that you're not reaching any caps on your producer's
> machine?
>
> The producer is on the same machine that the broker. Running very quiet,
> 3% CPU when I run my test. So no there is no stress on the producing side
>
> >>I also think you might be hitting the limit of what a single producer is
> capable of pushing through with your current setup. With record size of
> ~12k and the >>default batch size configuration of 64k, you'll only be able
> to send 5 records per batch. The default number of in flight batches is 5.
>
> I have 200 partition on my topic, and the load is well balanced across all
> partition. So the math you are doing should be X200 right? In addition, I
> found that batch size had no effect, and the linger.ms was the triggering
> factor to cause a buffer send. I played with batch size and in flight
> number of request upward, and that had no effect.
>
> >>This means at any given time, you'll only have 25 records in flight per
> connection. I'm assuming your partitions are configured with at least 2
> replicas. Acks=all >>means your producer is going to wait for the records
> to be fully replicated before considering it complete.
>
> >>Doing the math, you have ~200 records per second, but this is split
> >>between
> >>2 brokers. This means you're producing 100 records per second per broker.
> >>Simplifying a bit to 25 records in flight per broker, that's a latency
> >>of
> >>~250 ms to move around 300kb. At minimum, this includes the time to,
> [compress the batch], [send the batch over the network to the leader],
> [write the batch >>to the leader's log], [fetch the batch over the network
> to the replica], [write the batch to the replica's log], and all of the
> assorted responses to those calls.
>
> given all is local (producer running on same node as broker), and the size
> of my node (80 vcore), I hope I don t need 250ms to do that...
> The equivalent workload on hbase2.0  is 10 to 20X faster (and that include
> same replica config etc).
>
> On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:
>
>
> -----Original Message-----
> From: Eric Azama <ea...@gmail.com>
> Sent: Thursday, October 3, 2019 1:07 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
>
> You've given hardware information about your brokers, but I don't think
> you've provided information about the machine your producer is running on.
> Have you verified that you're not reaching any caps on your producer's
> machine?
>
> I also think you might be hitting the limit of what a single producer is
> capable of pushing through with your current setup. With record size of
> ~12k and the default batch size configuration of 64k, you'll only be able
> to send 5 records per batch. The default number of in flight batches is 5.
> This means at any given time, you'll only have 25 records in flight per
> connection. I'm assuming your partitions are configured with at least 2
> replicas. Acks=all means your producer is going to wait for the records to
> be fully replicated before considering it complete.
>
> Doing the math, you have ~200 records per second, but this is split between
> 2 brokers. This means you're producing 100 records per second per broker.
> Simplifying a bit to 25 records in flight per broker, that's a latency of
> ~250 ms to move around 300kb. At minimum, this includes the time to,
> [compress the batch], [send the batch over the network to the leader],
> [write the batch to the leader's log], [fetch the batch over the network to
> the replica], [write the batch to the replica's log], and all of the
> assorted responses to those calls.
>
> On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:
>
> > Hi Jamie,
> > Thanks for the hint. I played with these parameters, and found only
> > linger.ms is playing a significant role for my test case.
> > It is very sensitive and highly non linear.
> > I get these results:
> > Linger.ms       message per second
> > 80              100
> > 84              205
> > 85              215 -> top
> > 86              213
> > 90              205
> > 95              195
> > 100             187
> > 200             100
> >
> > So as you can see, this is very sensitive and one can miss the peek
> easily.
> > However, 200 messages per second for 2 powerful nodes and relatively
> > small message (12016bytes) is still at least 10X bellow what I would
> have hoped.
> > When I see system resources still being barely moving, with cpu at 3%,
> > I am sure something is not right.
> > Regards,
> > Eric
> >
> > -----Original Message-----
> > From: Jamie <ja...@aol.co.uk.INVALID>
> > Sent: Wednesday, October 2, 2019 4:27 PM
> > To: users@kafka.apache.org
> > Subject: Re: poor producing performance with very low CPU utilization?
> >
> > External
> >
> > Hi Eric,
> > I found increasing the linger.ms to between 50-100 ms significantly
> > increases performance (fewer larger requests instead of many small
> > ones), I'd also increase the batch size and the buffer.memory.
> > Thanks,
> > Jamie
> >
> >
> > -----Original Message-----
> > From: Eric Owhadi <er...@esgyn.com>
> > To: users@kafka.apache.org <us...@kafka.apache.org>
> > Sent: Wed, 2 Oct 2019 16:42
> > Subject: poor producing performance with very low CPU utilization?
> >
> > Hi Kafka users,
> > I am new to Kafka and am struggling with getting acceptable producing
> rate.
> > I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting
> > hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as
> > part of cloudera parcel, with 5GB java heap.
> > Producer version: Kafka client 2.2.1
> >
> > Wed Oct  2 07:56:59 PDT 2019
> > JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> > Using -XX:+HeapDumpOnOutOfMemoryError
> > -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> > 1462d01d2_pid6908.hprof
> > -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as
> > CSD_JAVA_OPTS Using
> > /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf
> > dir Using scripts/control.sh as process script
> > CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > CMF_CONF_DIR=/etc/cloudera-scm-agent
> >
> > Date: Wed Oct  2 07:56:59 PDT 2019
> > Host: xxxxx.esgyn.local
> > Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> > Zookeeper Quorum:
> > xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> > Zookeeper Chroot:
> > PORT: 9092
> > JMX_PORT: 9393
> > SSL_PORT: 9093
> > ENABLE_MONITORING: true
> > METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> > BROKER_HEAP_SIZE: 5120
> > BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC
> > -Djava.awt.headless=true
> > BROKER_SSL_ENABLED: false
> > KERBEROS_AUTH_ENABLED: false
> > KAFKA_PRINCIPAL:
> > SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> > AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> > SUPER_USERS: kafka
> > Kafka version found: 2.2.1-kafka4.1.0
> > Sentry version found: 1.5.1-cdh5.15.0
> > ZK_PRINCIPAL_NAME: zookeeper
> > Final Zookeeper Quorum is
> > xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> > security.inter.broker.protocol inferred as PLAINTEXT
> > LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
> >
> > I am producing messages of 12016 bytes uncompressed, then snappy
> > compressed by kafka.
> > I am using a topic with 200 partitions, and a custom partitioner that
> > I verified is doing good job at spreading the load on the 2 brokers.
> >
> > My producer config look like:
> >
> >
> kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
> >                     kafkaProps.put("key.serializer",
> > "org.apache.kafka.common.serialization.LongSerializer");
> >
> > kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> > llectorVectorSerializer");
> >
> >
> kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
> >                     kafkaProps.put("compression.type","snappy");
> >                     kafkaProps.put("batch.size","65536");
> >                     kafkaProps.put("acks", "all");
> >                     kafkaProps.put("linger.ms","1");
> >
> > I tried first doing fire and forget send, thinking I would get best
> > performance.
> > Then I tried synchronous send, and amazingly found that I would get
> > better performance with sync send.
> >
> > However, after 1 or 2 minute of load test, I start getting error on
> > the synchronous send like this:
> > ava.util.concurrent.ExecutionException:
> > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> > for
> > DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed
> > since batch creation
> >         at
> >
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
> >         at
> >
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
> >         at
> >
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
> >         at
> >
> org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
> >         at
> >
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
> >         at org.eclipse.jetty.server.Server.handle(Server.java:505)
> >         at
> > org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
> >         at
> >
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
> >         at org.eclipse.jetty.io
> > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
> >         at org.eclipse.jetty.io
> > .FillInterest.fillable(FillInterest.java:103)
> >         at org.eclipse.jetty.io
> > .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
> >         at
> >
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
> >         at
> >
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
> >         at
> >
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> > record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000
> > ms has passed since batch creation
> >
> > So I am suspecting that I am producing too fast, and brokers cannot
> > catch up.
> > I tried bumping up the io thread from default 8 to 40, that did not help.
> >
> > I am getting a producing rate of only about 100 message per seconds,
> > and about 1 Megabyte per seconds according to kafka metrics.
> > The CPU utilization is barely noticeable (3%), network is ridiculously
> > unaffected, and having googled around, this is not the kind of perf I
> > should expect out of my config. I was hoping for at least 10X more if
> > not 100X better. Was my expectations too high, or am I missing
> > something in config that is causing this performance numbers?
> >
> > Some details: I produce using a jetty custom handler that I verified
> > to be super-fast when I am not producing (commenting out the send()),
> > and I am using a single (I also tried with 2) producer reused on all
> jetty threads.
> >
> > Any help/clue would be much appreciated, Thanks in advance, Eric
> > Owhadi Esgyn Corporation.
> >
> >
> >
>

RE: poor producing performance with very low CPU utilization?

Posted by Eric Owhadi <er...@esgyn.com>.
There is a key piece of information that should be critical to guess where the problem is:

When I change from ack = all to ack = 1, instead of increasing message/s, it actually devises it by half!

As if the problem is about how fast I produce data (given when I use ack 1 I assume I block less time in the synchronous send, and therefore my producing pump increases.

I wonder if some sort of contention happen when producer populate the 200 partition queues when the rate of production is high in the user thread?
Eric

-----Original Message-----
From: Eric Owhadi <er...@esgyn.com> 
Sent: Thursday, October 3, 2019 1:33 PM
To: users@kafka.apache.org
Subject: RE: poor producing performance with very low CPU utilization?

External

Hi Eric,
Thanks a lot for your answer. Please find inline responses:

>>You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
>>Have you verified that you're not reaching any caps on your producer's machine?

The producer is on the same machine that the broker. Running very quiet, 3% CPU when I run my test. So no there is no stress on the producing side

>>I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the >>default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.

I have 200 partition on my topic, and the load is well balanced across all partition. So the math you are doing should be X200 right? In addition, I found that batch size had no effect, and the linger.ms was the triggering factor to cause a buffer send. I played with batch size and in flight number of request upward, and that had no effect.

>>This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all >>means your producer is going to wait for the records to be fully replicated before considering it complete.

>>Doing the math, you have ~200 records per second, but this is split 
>>between
>>2 brokers. This means you're producing 100 records per second per broker.
>>Simplifying a bit to 25 records in flight per broker, that's a latency 
>>of
>>~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch >>to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

given all is local (producer running on same node as broker), and the size of my node (80 vcore), I hope I don t need 250ms to do that...
The equivalent workload on hbase2.0  is 10 to 20X faster (and that include same replica config etc).

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:


-----Original Message-----
From: Eric Azama <ea...@gmail.com>
Sent: Thursday, October 3, 2019 1:07 PM
To: users@kafka.apache.org
Subject: Re: poor producing performance with very low CPU utilization?

External

Hi Eric,

You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
Have you verified that you're not reaching any caps on your producer's machine?

I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.
This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all means your producer is going to wait for the records to be fully replicated before considering it complete.

Doing the math, you have ~200 records per second, but this is split between
2 brokers. This means you're producing 100 records per second per broker.
Simplifying a bit to 25 records in flight per broker, that's a latency of
~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:

> Hi Jamie,
> Thanks for the hint. I played with these parameters, and found only 
> linger.ms is playing a significant role for my test case.
> It is very sensitive and highly non linear.
> I get these results:
> Linger.ms       message per second
> 80              100
> 84              205
> 85              215 -> top
> 86              213
> 90              205
> 95              195
> 100             187
> 200             100
>
> So as you can see, this is very sensitive and one can miss the peek easily.
> However, 200 messages per second for 2 powerful nodes and relatively 
> small message (12016bytes) is still at least 10X bellow what I would have hoped.
> When I see system resources still being barely moving, with cpu at 3%, 
> I am sure something is not right.
> Regards,
> Eric
>
> -----Original Message-----
> From: Jamie <ja...@aol.co.uk.INVALID>
> Sent: Wednesday, October 2, 2019 4:27 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> I found increasing the linger.ms to between 50-100 ms significantly 
> increases performance (fewer larger requests instead of many small 
> ones), I'd also increase the batch size and the buffer.memory.
> Thanks,
> Jamie
>
>
> -----Original Message-----
> From: Eric Owhadi <er...@esgyn.com>
> To: users@kafka.apache.org <us...@kafka.apache.org>
> Sent: Wed, 2 Oct 2019 16:42
> Subject: poor producing performance with very low CPU utilization?
>
> Hi Kafka users,
> I am new to Kafka and am struggling with getting acceptable producing rate.
> I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting 
> hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as 
> part of cloudera parcel, with 5GB java heap.
> Producer version: Kafka client 2.2.1
>
> Wed Oct  2 07:56:59 PDT 2019
> JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> Using -XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> 1462d01d2_pid6908.hprof
> -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as 
> CSD_JAVA_OPTS Using 
> /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf 
> dir Using scripts/control.sh as process script 
> CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CMF_CONF_DIR=/etc/cloudera-scm-agent
>
> Date: Wed Oct  2 07:56:59 PDT 2019
> Host: xxxxx.esgyn.local
> Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> Zookeeper Quorum:
> xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> Zookeeper Chroot:
> PORT: 9092
> JMX_PORT: 9393
> SSL_PORT: 9093
> ENABLE_MONITORING: true
> METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> BROKER_HEAP_SIZE: 5120
> BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
> -Djava.awt.headless=true
> BROKER_SSL_ENABLED: false
> KERBEROS_AUTH_ENABLED: false
> KAFKA_PRINCIPAL:
> SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> SUPER_USERS: kafka
> Kafka version found: 2.2.1-kafka4.1.0
> Sentry version found: 1.5.1-cdh5.15.0
> ZK_PRINCIPAL_NAME: zookeeper
> Final Zookeeper Quorum is
> xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> security.inter.broker.protocol inferred as PLAINTEXT 
> LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
>
> I am producing messages of 12016 bytes uncompressed, then snappy 
> compressed by kafka.
> I am using a topic with 200 partitions, and a custom partitioner that 
> I verified is doing good job at spreading the load on the 2 brokers.
>
> My producer config look like:
>
> kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
>                     kafkaProps.put("key.serializer", 
> "org.apache.kafka.common.serialization.LongSerializer");
>
> kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> llectorVectorSerializer");
>
> kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
>                     kafkaProps.put("compression.type","snappy");
>                     kafkaProps.put("batch.size","65536");
>                     kafkaProps.put("acks", "all");
>                     kafkaProps.put("linger.ms","1");
>
> I tried first doing fire and forget send, thinking I would get best 
> performance.
> Then I tried synchronous send, and amazingly found that I would get 
> better performance with sync send.
>
> However, after 1 or 2 minute of load test, I start getting error on 
> the synchronous send like this:
> ava.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) 
> for
> DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed 
> since batch creation
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>         at
> org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
>         at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>         at org.eclipse.jetty.server.Server.handle(Server.java:505)
>         at
> org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
>         at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
>         at org.eclipse.jetty.io
> .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
>         at org.eclipse.jetty.io
> .FillInterest.fillable(FillInterest.java:103)
>         at org.eclipse.jetty.io
> .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
>         at
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000
> ms has passed since batch creation
>
> So I am suspecting that I am producing too fast, and brokers cannot 
> catch up.
> I tried bumping up the io thread from default 8 to 40, that did not help.
>
> I am getting a producing rate of only about 100 message per seconds, 
> and about 1 Megabyte per seconds according to kafka metrics.
> The CPU utilization is barely noticeable (3%), network is ridiculously 
> unaffected, and having googled around, this is not the kind of perf I 
> should expect out of my config. I was hoping for at least 10X more if 
> not 100X better. Was my expectations too high, or am I missing 
> something in config that is causing this performance numbers?
>
> Some details: I produce using a jetty custom handler that I verified 
> to be super-fast when I am not producing (commenting out the send()), 
> and I am using a single (I also tried with 2) producer reused on all jetty threads.
>
> Any help/clue would be much appreciated, Thanks in advance, Eric 
> Owhadi Esgyn Corporation.
>
>
>

RE: poor producing performance with very low CPU utilization?

Posted by Eric Owhadi <er...@esgyn.com>.
Hi Eric,
Thanks a lot for your answer. Please find inline responses:
 
>>You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
>>Have you verified that you're not reaching any caps on your producer's machine?

The producer is on the same machine that the broker. Running very quiet, 3% CPU when I run my test. So no there is no stress on the producing side

>>I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the >>default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.

I have 200 partition on my topic, and the load is well balanced across all partition. So the math you are doing should be X200 right? In addition, I found that batch size had no effect, and the linger.ms was the triggering factor to cause a buffer send. I played with batch size and in flight number of request upward, and that had no effect. 

>>This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all >>means your producer is going to wait for the records to be fully replicated before considering it complete.

>>Doing the math, you have ~200 records per second, but this is split between
>>2 brokers. This means you're producing 100 records per second per broker.
>>Simplifying a bit to 25 records in flight per broker, that's a latency of
>>~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch >>to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

given all is local (producer running on same node as broker), and the size of my node (80 vcore), I hope I don t need 250ms to do that...
The equivalent workload on hbase2.0  is 10 to 20X faster (and that include same replica config etc).

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:


-----Original Message-----
From: Eric Azama <ea...@gmail.com> 
Sent: Thursday, October 3, 2019 1:07 PM
To: users@kafka.apache.org
Subject: Re: poor producing performance with very low CPU utilization?

External

Hi Eric,

You've given hardware information about your brokers, but I don't think you've provided information about the machine your producer is running on.
Have you verified that you're not reaching any caps on your producer's machine?

I also think you might be hitting the limit of what a single producer is capable of pushing through with your current setup. With record size of ~12k and the default batch size configuration of 64k, you'll only be able to send 5 records per batch. The default number of in flight batches is 5.
This means at any given time, you'll only have 25 records in flight per connection. I'm assuming your partitions are configured with at least 2 replicas. Acks=all means your producer is going to wait for the records to be fully replicated before considering it complete.

Doing the math, you have ~200 records per second, but this is split between
2 brokers. This means you're producing 100 records per second per broker.
Simplifying a bit to 25 records in flight per broker, that's a latency of
~250 ms to move around 300kb. At minimum, this includes the time to, [compress the batch], [send the batch over the network to the leader], [write the batch to the leader's log], [fetch the batch over the network to the replica], [write the batch to the replica's log], and all of the assorted responses to those calls.

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:

> Hi Jamie,
> Thanks for the hint. I played with these parameters, and found only 
> linger.ms is playing a significant role for my test case.
> It is very sensitive and highly non linear.
> I get these results:
> Linger.ms       message per second
> 80              100
> 84              205
> 85              215 -> top
> 86              213
> 90              205
> 95              195
> 100             187
> 200             100
>
> So as you can see, this is very sensitive and one can miss the peek easily.
> However, 200 messages per second for 2 powerful nodes and relatively 
> small message (12016bytes) is still at least 10X bellow what I would have hoped.
> When I see system resources still being barely moving, with cpu at 3%, 
> I am sure something is not right.
> Regards,
> Eric
>
> -----Original Message-----
> From: Jamie <ja...@aol.co.uk.INVALID>
> Sent: Wednesday, October 2, 2019 4:27 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> I found increasing the linger.ms to between 50-100 ms significantly 
> increases performance (fewer larger requests instead of many small 
> ones), I'd also increase the batch size and the buffer.memory.
> Thanks,
> Jamie
>
>
> -----Original Message-----
> From: Eric Owhadi <er...@esgyn.com>
> To: users@kafka.apache.org <us...@kafka.apache.org>
> Sent: Wed, 2 Oct 2019 16:42
> Subject: poor producing performance with very low CPU utilization?
>
> Hi Kafka users,
> I am new to Kafka and am struggling with getting acceptable producing rate.
> I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting 
> hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as 
> part of cloudera parcel, with 5GB java heap.
> Producer version: Kafka client 2.2.1
>
> Wed Oct  2 07:56:59 PDT 2019
> JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> Using -XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> 1462d01d2_pid6908.hprof 
> -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as 
> CSD_JAVA_OPTS Using 
> /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf 
> dir Using scripts/control.sh as process script 
> CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CMF_CONF_DIR=/etc/cloudera-scm-agent
>
> Date: Wed Oct  2 07:56:59 PDT 2019
> Host: xxxxx.esgyn.local
> Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> Zookeeper Quorum:
> xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> Zookeeper Chroot:
> PORT: 9092
> JMX_PORT: 9393
> SSL_PORT: 9093
> ENABLE_MONITORING: true
> METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> BROKER_HEAP_SIZE: 5120
> BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
> -Djava.awt.headless=true
> BROKER_SSL_ENABLED: false
> KERBEROS_AUTH_ENABLED: false
> KAFKA_PRINCIPAL:
> SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> SUPER_USERS: kafka
> Kafka version found: 2.2.1-kafka4.1.0
> Sentry version found: 1.5.1-cdh5.15.0
> ZK_PRINCIPAL_NAME: zookeeper
> Final Zookeeper Quorum is
> xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> security.inter.broker.protocol inferred as PLAINTEXT 
> LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
>
> I am producing messages of 12016 bytes uncompressed, then snappy 
> compressed by kafka.
> I am using a topic with 200 partitions, and a custom partitioner that 
> I verified is doing good job at spreading the load on the 2 brokers.
>
> My producer config look like:
>
> kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
>                     kafkaProps.put("key.serializer", 
> "org.apache.kafka.common.serialization.LongSerializer");
>
> kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> llectorVectorSerializer");
>
> kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
>                     kafkaProps.put("compression.type","snappy");
>                     kafkaProps.put("batch.size","65536");
>                     kafkaProps.put("acks", "all");
>                     kafkaProps.put("linger.ms","1");
>
> I tried first doing fire and forget send, thinking I would get best 
> performance.
> Then I tried synchronous send, and amazingly found that I would get 
> better performance with sync send.
>
> However, after 1 or 2 minute of load test, I start getting error on 
> the synchronous send like this:
> ava.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) 
> for
> DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed 
> since batch creation
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>         at
> org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
>         at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>         at org.eclipse.jetty.server.Server.handle(Server.java:505)
>         at
> org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
>         at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
>         at org.eclipse.jetty.io
> .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
>         at org.eclipse.jetty.io
> .FillInterest.fillable(FillInterest.java:103)
>         at org.eclipse.jetty.io
> .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
>         at
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 
> ms has passed since batch creation
>
> So I am suspecting that I am producing too fast, and brokers cannot 
> catch up.
> I tried bumping up the io thread from default 8 to 40, that did not help.
>
> I am getting a producing rate of only about 100 message per seconds, 
> and about 1 Megabyte per seconds according to kafka metrics.
> The CPU utilization is barely noticeable (3%), network is ridiculously 
> unaffected, and having googled around, this is not the kind of perf I 
> should expect out of my config. I was hoping for at least 10X more if 
> not 100X better. Was my expectations too high, or am I missing 
> something in config that is causing this performance numbers?
>
> Some details: I produce using a jetty custom handler that I verified 
> to be super-fast when I am not producing (commenting out the send()), 
> and I am using a single (I also tried with 2) producer reused on all jetty threads.
>
> Any help/clue would be much appreciated, Thanks in advance, Eric 
> Owhadi Esgyn Corporation.
>
>
>

Re: poor producing performance with very low CPU utilization?

Posted by Eric Azama <ea...@gmail.com>.
Hi Eric,

You've given hardware information about your brokers, but I don't think
you've provided information about the machine your producer is running on.
Have you verified that you're not reaching any caps on your producer's
machine?

I also think you might be hitting the limit of what a single producer is
capable of pushing through with your current setup. With record size of
~12k and the default batch size configuration of 64k, you'll only be able
to send 5 records per batch. The default number of in flight batches is 5.
This means at any given time, you'll only have 25 records in flight per
connection. I'm assuming your partitions are configured with at least 2
replicas. Acks=all means your producer is going to wait for the records to
be fully replicated before considering it complete.

Doing the math, you have ~200 records per second, but this is split between
2 brokers. This means you're producing 100 records per second per broker.
Simplifying a bit to 25 records in flight per broker, that's a latency of
~250 ms to move around 300kb. At minimum, this includes the time to,
[compress the batch], [send the batch over the network to the leader],
[write the batch to the leader's log], [fetch the batch over the network to
the replica], [write the batch to the replica's log], and all of the
assorted responses to those calls.

On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <er...@esgyn.com> wrote:

> Hi Jamie,
> Thanks for the hint. I played with these parameters, and found only
> linger.ms is playing a significant role for my test case.
> It is very sensitive and highly non linear.
> I get these results:
> Linger.ms       message per second
> 80              100
> 84              205
> 85              215 -> top
> 86              213
> 90              205
> 95              195
> 100             187
> 200             100
>
> So as you can see, this is very sensitive and one can miss the peek easily.
> However, 200 messages per second for 2 powerful nodes and relatively small
> message (12016bytes) is still at least 10X bellow what I would have hoped.
> When I see system resources still being barely moving, with cpu at 3%, I am
> sure something is not right.
> Regards,
> Eric
>
> -----Original Message-----
> From: Jamie <ja...@aol.co.uk.INVALID>
> Sent: Wednesday, October 2, 2019 4:27 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> I found increasing the linger.ms to between 50-100 ms significantly
> increases performance (fewer larger requests instead of many small ones),
> I'd also increase the batch size and the buffer.memory.
> Thanks,
> Jamie
>
>
> -----Original Message-----
> From: Eric Owhadi <er...@esgyn.com>
> To: users@kafka.apache.org <us...@kafka.apache.org>
> Sent: Wed, 2 Oct 2019 16:42
> Subject: poor producing performance with very low CPU utilization?
>
> Hi Kafka users,
> I am new to Kafka and am struggling with getting acceptable producing rate.
> I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting
> hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as part
> of cloudera parcel, with 5GB java heap.
> Producer version: Kafka client 2.2.1
>
> Wed Oct  2 07:56:59 PDT 2019
> JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> Using -XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof
> -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as
> CSD_JAVA_OPTS Using
> /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir
> Using scripts/control.sh as process script
> CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CMF_CONF_DIR=/etc/cloudera-scm-agent
>
> Date: Wed Oct  2 07:56:59 PDT 2019
> Host: xxxxx.esgyn.local
> Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> Zookeeper Quorum:
> xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> Zookeeper Chroot:
> PORT: 9092
> JMX_PORT: 9393
> SSL_PORT: 9093
> ENABLE_MONITORING: true
> METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> BROKER_HEAP_SIZE: 5120
> BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC
> -Djava.awt.headless=true
> BROKER_SSL_ENABLED: false
> KERBEROS_AUTH_ENABLED: false
> KAFKA_PRINCIPAL:
> SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> SUPER_USERS: kafka
> Kafka version found: 2.2.1-kafka4.1.0
> Sentry version found: 1.5.1-cdh5.15.0
> ZK_PRINCIPAL_NAME: zookeeper
> Final Zookeeper Quorum is
> xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> security.inter.broker.protocol inferred as PLAINTEXT
> LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
>
> I am producing messages of 12016 bytes uncompressed, then snappy
> compressed by kafka.
> I am using a topic with 200 partitions, and a custom partitioner that I
> verified is doing good job at spreading the load on the 2 brokers.
>
> My producer config look like:
>
> kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
>                     kafkaProps.put("key.serializer",
> "org.apache.kafka.common.serialization.LongSerializer");
>
> kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer");
>
> kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
>                     kafkaProps.put("compression.type","snappy");
>                     kafkaProps.put("batch.size","65536");
>                     kafkaProps.put("acks", "all");
>                     kafkaProps.put("linger.ms","1");
>
> I tried first doing fire and forget send, thinking I would get best
> performance.
> Then I tried synchronous send, and amazingly found that I would get better
> performance with sync send.
>
> However, after 1 or 2 minute of load test, I start getting error on the
> synchronous send like this:
> ava.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
> DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed
> since batch creation
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
>         at
> org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
>         at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
>         at org.eclipse.jetty.server.Server.handle(Server.java:505)
>         at
> org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
>         at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
>         at org.eclipse.jetty.io
> .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
>         at org.eclipse.jetty.io
> .FillInterest.fillable(FillInterest.java:103)
>         at org.eclipse.jetty.io
> .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
>         at
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
>         at
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
>         at
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms
> has passed since batch creation
>
> So I am suspecting that I am producing too fast, and brokers cannot catch
> up.
> I tried bumping up the io thread from default 8 to 40, that did not help.
>
> I am getting a producing rate of only about 100 message per seconds, and
> about 1 Megabyte per seconds according to kafka metrics.
> The CPU utilization is barely noticeable (3%), network is ridiculously
> unaffected, and having googled around, this is not the kind of perf I
> should expect out of my config. I was hoping for at least 10X more if not
> 100X better. Was my expectations too high, or am I missing something in
> config that is causing this performance numbers?
>
> Some details: I produce using a jetty custom handler that I verified to be
> super-fast when I am not producing (commenting out the send()), and I am
> using a single (I also tried with 2) producer reused on all jetty threads.
>
> Any help/clue would be much appreciated, Thanks in advance, Eric Owhadi
> Esgyn Corporation.
>
>
>

RE: poor producing performance with very low CPU utilization?

Posted by Eric Owhadi <er...@esgyn.com>.
Hi Jamie,
Thanks for the hint. I played with these parameters, and found only linger.ms is playing a significant role for my test case.
It is very sensitive and highly non linear.
I get these results:
Linger.ms	message per second
80		100
84		205
85 		215 -> top
86		213
90		205
95		195
100		187
200		100

So as you can see, this is very sensitive and one can miss the peek easily.
However, 200 messages per second for 2 powerful nodes and relatively small message (12016bytes) is still at least 10X bellow what I would have hoped. When I see system resources still being barely moving, with cpu at 3%, I am sure something is not right.
Regards,
Eric

-----Original Message-----
From: Jamie <ja...@aol.co.uk.INVALID> 
Sent: Wednesday, October 2, 2019 4:27 PM
To: users@kafka.apache.org
Subject: Re: poor producing performance with very low CPU utilization?

External

Hi Eric,
I found increasing the linger.ms to between 50-100 ms significantly increases performance (fewer larger requests instead of many small ones), I'd also increase the batch size and the buffer.memory.
Thanks,
Jamie


-----Original Message-----
From: Eric Owhadi <er...@esgyn.com>
To: users@kafka.apache.org <us...@kafka.apache.org>
Sent: Wed, 2 Oct 2019 16:42
Subject: poor producing performance with very low CPU utilization?

Hi Kafka users,
I am new to Kafka and am struggling with getting acceptable producing rate.
I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as part of cloudera parcel, with 5GB java heap.
Producer version: Kafka client 2.2.1

Wed Oct  2 07:56:59 PDT 2019
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
Using -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as CSD_JAVA_OPTS Using /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir Using scripts/control.sh as process script CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CMF_CONF_DIR=/etc/cloudera-scm-agent

Date: Wed Oct  2 07:56:59 PDT 2019
Host: xxxxx.esgyn.local
Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
Zookeeper Quorum: xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
Zookeeper Chroot:
PORT: 9092
JMX_PORT: 9393
SSL_PORT: 9093
ENABLE_MONITORING: true
METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
BROKER_HEAP_SIZE: 5120
BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
BROKER_SSL_ENABLED: false
KERBEROS_AUTH_ENABLED: false
KAFKA_PRINCIPAL:
SECURITY_INTER_BROKER_PROTOCOL: INFERRED
AUTHENTICATE_ZOOKEEPER_CONNECTION: true
SUPER_USERS: kafka
Kafka version found: 2.2.1-kafka4.1.0
Sentry version found: 1.5.1-cdh5.15.0
ZK_PRINCIPAL_NAME: zookeeper
Final Zookeeper Quorum is xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
security.inter.broker.protocol inferred as PLAINTEXT LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,

I am producing messages of 12016 bytes uncompressed, then snappy compressed by kafka.
I am using a topic with 200 partitions, and a custom partitioner that I verified is doing good job at spreading the load on the 2 brokers.

My producer config look like:
                    kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
                    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
              kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer");
                    kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
                    kafkaProps.put("compression.type","snappy");
                    kafkaProps.put("batch.size","65536");
                    kafkaProps.put("acks", "all");
                    kafkaProps.put("linger.ms","1");

I tried first doing fire and forget send, thinking I would get best performance.
Then I tried synchronous send, and amazingly found that I would get better performance with sync send.

However, after 1 or 2 minute of load test, I start getting error on the synchronous send like this:
ava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
        at org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:505)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation

So I am suspecting that I am producing too fast, and brokers cannot catch up.
I tried bumping up the io thread from default 8 to 40, that did not help.

I am getting a producing rate of only about 100 message per seconds, and about 1 Megabyte per seconds according to kafka metrics.
The CPU utilization is barely noticeable (3%), network is ridiculously unaffected, and having googled around, this is not the kind of perf I should expect out of my config. I was hoping for at least 10X more if not 100X better. Was my expectations too high, or am I missing something in config that is causing this performance numbers?

Some details: I produce using a jetty custom handler that I verified to be super-fast when I am not producing (commenting out the send()), and I am using a single (I also tried with 2) producer reused on all jetty threads.

Any help/clue would be much appreciated, Thanks in advance, Eric Owhadi Esgyn Corporation.



Re: poor producing performance with very low CPU utilization?

Posted by Jamie <ja...@aol.co.uk.INVALID>.
Hi Eric, 
I found increasing the linger.ms to between 50-100 ms significantly increases performance (fewer larger requests instead of many small ones), I'd also increase the batch size and the buffer.memory. 
Thanks, 
Jamie


-----Original Message-----
From: Eric Owhadi <er...@esgyn.com>
To: users@kafka.apache.org <us...@kafka.apache.org>
Sent: Wed, 2 Oct 2019 16:42
Subject: poor producing performance with very low CPU utilization?

Hi Kafka users,
I am new to Kafka and am struggling with getting acceptable producing rate.
I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting hyperthreading. 256GB memory on a 10Gbit network
Kafka is installed as part of cloudera parcel, with 5GB java heap.
Producer version: Kafka client 2.2.1

Wed Oct  2 07:56:59 PDT 2019
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
Using -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as CSD_JAVA_OPTS
Using /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir
Using scripts/control.sh as process script
CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CMF_CONF_DIR=/etc/cloudera-scm-agent

Date: Wed Oct  2 07:56:59 PDT 2019
Host: xxxxx.esgyn.local
Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
Zookeeper Quorum: xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
Zookeeper Chroot:
PORT: 9092
JMX_PORT: 9393
SSL_PORT: 9093
ENABLE_MONITORING: true
METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
BROKER_HEAP_SIZE: 5120
BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true
BROKER_SSL_ENABLED: false
KERBEROS_AUTH_ENABLED: false
KAFKA_PRINCIPAL:
SECURITY_INTER_BROKER_PROTOCOL: INFERRED
AUTHENTICATE_ZOOKEEPER_CONNECTION: true
SUPER_USERS: kafka
Kafka version found: 2.2.1-kafka4.1.0
Sentry version found: 1.5.1-cdh5.15.0
ZK_PRINCIPAL_NAME: zookeeper
Final Zookeeper Quorum is xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
security.inter.broker.protocol inferred as PLAINTEXT
LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,

I am producing messages of 12016 bytes uncompressed, then snappy compressed by kafka.
I am using a topic with 200 partitions, and a custom partitioner that I verified is doing good job at spreading the load on the 2 brokers.

My producer config look like:
                    kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
                    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
              kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer");
                    kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
                    kafkaProps.put("compression.type","snappy");
                    kafkaProps.put("batch.size","65536");
                    kafkaProps.put("acks", "all");
                    kafkaProps.put("linger.ms","1");

I tried first doing fire and forget send, thinking I would get best performance.
Then I tried synchronous send, and amazingly found that I would get better performance with sync send.

However, after 1 or 2 minute of load test, I start getting error on the synchronous send like this:
ava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
        at org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:505)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation

So I am suspecting that I am producing too fast, and brokers cannot catch up.
I tried bumping up the io thread from default 8 to 40, that did not help.

I am getting a producing rate of only about 100 message per seconds, and about 1 Megabyte per seconds according to kafka metrics.
The CPU utilization is barely noticeable (3%), network is ridiculously unaffected, and having googled around, this is not the kind of perf I should expect out of my config. I was hoping for at least 10X more if not 100X better. Was my expectations too high, or am I missing something in config that is causing this performance numbers?

Some details: I produce using a jetty custom handler that I verified to be super-fast when I am not producing (commenting out the send()), and I am using a single (I also tried with 2) producer reused on all jetty threads.

Any help/clue would be much appreciated,
Thanks in advance,
Eric Owhadi
Esgyn Corporation.