You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by slaunay <gi...@git.apache.org> on 2017/09/19 18:25:09 UTC
[GitHub] kafka pull request #3907: KAFKA-4950: Fix ConcurrentModificationException on...
GitHub user slaunay opened a pull request:
https://github.com/apache/kafka/pull/3907
KAFKA-4950: Fix ConcurrentModificationException on assigned-partitions metric
Code change:
- prevent `java.util.ConcurrentModificationException` being thrown when fetching the consumer coordinator assigned-partitions metric value from a `MetricsReporter` (e.g. a reporter exporting metrics periodically running in a separate thread) because of a race condition by using a volatile field for storing the number of assigned partitions:
```
java.util.ConcurrentModificationException: null
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.<init>(HashSet.java:119)
at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:880)
...
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
- new unit test to reproduce the issue and detect potential future regression
I am using a volatile field on `SubscriptionState` rather than changing the `PartitionStates.map` field to some thread safe `LinkedHashMap` alternative to avoid bringing an unnecessary concurrent structure to other components relying on `PartitionStates`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/slaunay/kafka bugfix/KAFKA-4950-cme-assigned-partitions-metric
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/3907.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3907
----
commit 3787ac4c6070d0d4271cac27d9b43b2fdbe8b78f
Author: Sebastien Launay <se...@opendns.com>
Date: 2017-09-15T23:26:44Z
KAFKA-4950; Fix CME on assigned-partitions metric
- prevent java.util.ConcurrentModificationException being thrown when
fetching the consumer coordinator assigned-partitions metric value from
a MetricsReporter (e.g. a reporter exporting metrics periodically running
in a separate thread) because of a race condition by using a volatile field
for storing the number of assigned partitions:
java.util.ConcurrentModificationException: null
at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.<init>(HashSet.java:119)
at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:880)
...
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
- new unit test to reproduce the issue and detect potential future regression
----
---