You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Raman Gupta (JIRA)" <ji...@apache.org> on 2018/07/10 04:48:00 UTC
[jira] [Created] (KAFKA-7143) Cannot use KafkaConsumer with Kotlin
coroutines due to Thread id check
Raman Gupta created KAFKA-7143:
----------------------------------
Summary: Cannot use KafkaConsumer with Kotlin coroutines due to Thread id check
Key: KAFKA-7143
URL: https://issues.apache.org/jira/browse/KAFKA-7143
Project: Kafka
Issue Type: Improvement
Components: clients
Affects Versions: 1.1.0
Reporter: Raman Gupta
I am using a new KafkaConsumer via Kotlin. My application makes use of Kotlin [coroutines|https://kotlinlang.org/docs/reference/coroutines.html], which supports a style of async programming that avoids the need for callbacks (and existing callback-based API's such as Kafka's can easily be adapted to this). With coroutines, methods with callbacks are suspended, and resumed once the call is complete. With this approach, while access to the KafkaConsumer is done in a thread-safe way, it does NOT happen from a single thread -- a different underlying thread may actually execute the code after the suspension point.
However, the KafkaConsumer includes additional checks to verify not only the thread safety of the client, but that the *same thread* is being used -- if the same thread (by id) is not being used the consumer throws an exception like:
{code}
Exception in thread "ForkJoinPool.commonPool-worker-25" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1321)
{code}
I understand this check is present to protect people from themselves, but I'd like the ability to disable this check so that this code can be used effectively by libraries such as Kotlin coroutines.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)