You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Tomasz Nguyen (Jira)" <ji...@apache.org> on 2021/02/25 19:15:00 UTC
[jira] [Created] (KAFKA-12375) ReplaceStreamThread creates a new
consumer with the same name as the one it's replacing
Tomasz Nguyen created KAFKA-12375:
-------------------------------------
Summary: ReplaceStreamThread creates a new consumer with the same name as the one it's replacing
Key: KAFKA-12375
URL: https://issues.apache.org/jira/browse/KAFKA-12375
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.8.0
Reporter: Tomasz Nguyen
Fix For: 2.8.0
I was debugging the kafka-streams soak cluster and noticed that replacing a stream thread was causing the streams application to fail. I have managed to find the following stacktrace:
{code:java}
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=i-0cdac8830ee1b8f01-StreamThread-1-restore-consumer at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:815) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:338) at org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:896) at org.apache.kafka.streams.KafkaStreams.addStreamThread(KafkaStreams.java:977) at org.apache.kafka.streams.KafkaStreams.replaceStreamThread(KafkaStreams.java:467) at org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:487) at org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
{code}
followed by:
{code:java}
Exception in thread "i-0e4d869ffd67ec825-StreamThread-1" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2446) at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2430) at org.apache.kafka.clients.consumer.KafkaConsumer.enforceRebalance(KafkaConsumer.java:2261) at org.apache.kafka.streams.processor.internals.StreamThread.sendShutdownRequest(StreamThread.java:666) at org.apache.kafka.streams.KafkaStreams.lambda$handleStreamsUncaughtException$4(KafkaStreams.java:508) at org.apache.kafka.streams.KafkaStreams.processStreamThread(KafkaStreams.java:1579) at org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:508) at org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
{code}
My understanding so far is that we re-use the consumer name across thread generations which can hit a few flavours of a race condition. My suggestion would be to add the generation-id to the consumer name.
This could be done by adding a thread generation id here
https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java#L66
or by adding an overload here: https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L390
{code:java}
// Some comments here
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), threadIdx, generationId);
{code}
I have not yet checked if there are any implications to either of these solutions
--
This message was sent by Atlassian Jira
(v8.3.4#803005)