You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Alexander Sibiryakov (Jira)" <ji...@apache.org> on 2020/07/31 13:11:00 UTC

[jira] [Created] (KAFKA-10335) Blocking of producer IO thread when calling send() from callback

Alexander Sibiryakov created KAFKA-10335:
--------------------------------------------

             Summary: Blocking of producer IO thread when calling send() from callback
                 Key: KAFKA-10335
                 URL: https://issues.apache.org/jira/browse/KAFKA-10335
             Project: Kafka
          Issue Type: Bug
          Components: clients, producer 
            Reporter: Alexander Sibiryakov


We had application which supposed to be using KafkaProducer to deliver results of some work. Sometimes delivery of results weren't successful because of network connectivity errors or maintenance happening on the broker side. In such cases we wanted application to send an event with error and original message details. All good, but we wanted errors to be delivered to a separate topic. So we implemented a callback in send() method, using the same producer instance and calling send() from there.

This application worked for some time, but then we encountered that its producer was stuck. Almost no CPU consumption and expiring batches for hours. After connecting with debugger it turned out that sender IO thread is blocking. When record is expired, a callback was called, which contained a call to send(), implying usage of a new topic, which metadata is not present in producer's client cache. When send() is missing metadata, it is allowed to block for up to max.block.ms interval, which is 60 secs by default. If application is active, then it will quickly result in a large amount of accumulated records. Every record will block IO thread for 60s. Therefore sender IO thread is essentially blocked. In Producer only Sender IO thread contains a call to client's poll() method, which is responsible for all the network communication and metadata update. If poll() is executed with significant delay then it will result to errors, connected with various timeouts. That's it we've got a stuck producer with little chance to recover.

To summarise, pre-requisites for the problem are sending from callback, using the same producer instance and usage of topic which wasn't seen before.

I think it is important to decide if the issue is KafkaProducer misuse or its bug. Code is callbacks shouldn't block, that is clear, but at the same time, no one expects already initialised producer to block.

Depending on decision I could produce a fix, it can be either a warning when user is trying to call a send() from callback, or reduction of max allowed blocking time for metadata update. It could be just docs changes, or even nothing.

Here is code to reproduce the issue, the output it is producing follows the code snippet. Tested on Confluent Cloud, from my desktop with 100 Mbps connection.
{code:java}
    public static void main(String[] args) throws IOException {
        byte[] blob = new byte[262144];
        Properties properties = new Properties();
        properties.load(new FileReader("kafka-staging.properties"));
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.setProperty("request.timeout.ms", "5000");
        properties.setProperty("delivery.timeout.ms", "5000");
        KafkaProducer<String, byte[]> producer = new KafkaProducer(properties);
        while (true) {
            ProducerRecord<String, byte[]> record = new ProducerRecord<>("alex-test-valid-data", blob);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println(exception);
                        long start = System.currentTimeMillis();
                        ProducerRecord<String, byte[]> record = new ProducerRecord<>("alex-test-errors", blob);
                        producer.send(record);  // blocking caused by metadata update
                        long timeElapsed = System.currentTimeMillis() - start;
                        System.err.println("time spent blocking IO thread: " + timeElapsed);
                    }
                }
            });
        }
    }
{code}
{noformat}
[2020-07-31 14:35:51,936: INFO/main] (AbstractConfig.java:347) - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [pkc-l915e.europe-west1.gcp.confluent.cloud:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 5000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 5000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = [hidden]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = PLAIN
	security.protocol = SASL_SSL
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

[2020-07-31 14:35:52,099: INFO/main] (AbstractLogin.java:61) - Successfully logged in.
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:117) - Kafka version: 5.4.0-ccs
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:118) - Kafka commitId: f4201a82bea68cc7
[2020-07-31 14:35:52,291: INFO/main] (AppInfoParser.java:119) - Kafka startTimeMs: 1596198952287
[2020-07-31 14:35:52,853: INFO/kafka-producer-network-thread | producer-1] (Metadata.java:261) - [Producer clientId=producer-1] Cluster ID: lkc-43m2m
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-0:5001 ms has passed since batch creation
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60001
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
time spent blocking IO thread: 60002
time spent blocking IO thread: 60017
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
[2020-07-31 14:38:07,219: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-3 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60003
[2020-07-31 14:39:07,223: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
time spent blocking IO thread: 60002
time spent blocking IO thread: 60001
[2020-07-31 14:40:07,224: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-5 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60001
[2020-07-31 14:41:07,225: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-1 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
time spent blocking IO thread: 60004
time spent blocking IO thread: 60004
[2020-07-31 14:42:07,229: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-4 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60000
[2020-07-31 14:43:07,229: WARN/kafka-producer-network-thread | producer-1] (Sender.java:682) - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition alex-test-valid-data-2 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422600 ms has passed since batch creation
time spent blocking IO thread: 60003
time spent blocking IO thread: 60001
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422490 ms has passed since batch creation
org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.
time spent blocking IO thread: 60002
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422315 ms has passed since batch creation
time spent blocking IO thread: 60003
time spent blocking IO thread: 60003
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for alex-test-valid-data-5:422124 ms has passed since batch creation
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)