You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Viczai Gábor (Jira)" <ji...@apache.org> on 2022/12/26 20:15:00 UTC
[jira] [Created] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop
Viczai Gábor created KAFKA-14553:
------------------------------------
Summary: RecordAccumulator hangs in infinite NOP loop
Key: KAFKA-14553
URL: https://issues.apache.org/jira/browse/KAFKA-14553
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 3.3.1
Environment: - Spring Boot 3.0.1
- Spring Cloud 2022.0.0
Versions of dependencies are defined in boms of SB and SC:
- micrometer-tracing-bridge-brave 1.0.0
- zipkin-reporter-brave 2.16.3
- zipkin-sender-kafka 2.16.3
Reporter: Viczai Gábor
*Summary:*
There is an infinite loop in RecordAccumulator, if stickyBatchSize is configured to be 0 in BuiltinPartitioner.
(Which is the default case when using KafkaSender's default Builder.)
*Details:*
The infinite loop is caused by this while:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
and this continue particularly:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
because the partitionChanged() call in the condition always return true if batchSize is 0.
So program flow never reaches this point:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
Thus no span data sent to Kafka ever.
The problematic line in partitionChanged() is when it calls an update on the BuiltInPartitioner:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
which in fact always updates the partition because of this condition:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
therefore the next confdition in RecordAccumulator will evaluate to true also:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
thus returning 'true' and forcing the 'continue' in the while(true) loop.
Suggested fix:
I think these conditions should be changed:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
The equal signs should be removed from the conditions:
{code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > stickyBatchSize * 2) {{code}
(Btw: line 213 also needs this modification.)
*Note:*
The problem arises because KafkaSender sets the batchSize to 0.
https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
*Workaround:*
Simply set the batch size greater than zero.
{code:java}@Configuration
public class SenderConfiguration {
@Bean
KafkaSender kafkaSender() {
Properties overrides = new Properties();
overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
return KafkaSender.newBuilder()
.bootstrapServers("localhost:9092")
.topic("zipkin")
.overrides(overrides)
.build();
}
}{code}
*Using:*
- Spring Boot 3.0.1
- Spring Cloud 2022.0.0
pom.xml (fragment):
{code:xml} <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-brave</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-kafka</artifactId>
</dependency>{code}
Everything is on default settings, except a KafkaSender is explicitely created as illustrated above. (No autoconfiguration available for Kafka sender.)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)