You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by slaunay <gi...@git.apache.org> on 2017/09/19 18:25:09 UTC

[GitHub] kafka pull request #3907: KAFKA-4950: Fix ConcurrentModificationException on...

GitHub user slaunay opened a pull request:

    https://github.com/apache/kafka/pull/3907

    KAFKA-4950: Fix ConcurrentModificationException on assigned-partitions metric

    Code change:
    - prevent `java.util.ConcurrentModificationException` being thrown when fetching the consumer coordinator assigned-partitions metric value from a `MetricsReporter` (e.g. a reporter exporting metrics periodically running in a separate thread) because of a race condition by using a volatile field for storing the number of assigned partitions:
    ```
    java.util.ConcurrentModificationException: null
            at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
            at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
            at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
            at java.util.HashSet.<init>(HashSet.java:119)
            at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
            at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293)
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:880)
            ...
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    - new unit test to reproduce the issue and detect potential future regression
    
    I am using a volatile field on `SubscriptionState` rather than changing the `PartitionStates.map` field to some thread safe `LinkedHashMap` alternative to avoid bringing an unnecessary concurrent structure to other components relying on `PartitionStates`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/slaunay/kafka bugfix/KAFKA-4950-cme-assigned-partitions-metric

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3907
    
----
commit 3787ac4c6070d0d4271cac27d9b43b2fdbe8b78f
Author: Sebastien Launay <se...@opendns.com>
Date:   2017-09-15T23:26:44Z

    KAFKA-4950; Fix CME on assigned-partitions metric
    
    - prevent java.util.ConcurrentModificationException being thrown when
      fetching the consumer coordinator assigned-partitions metric value from
      a MetricsReporter (e.g. a reporter exporting metrics periodically running
      in a separate thread) because of a race condition by using a volatile field
      for storing the number of assigned partitions:
    java.util.ConcurrentModificationException: null
            at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
            at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
            at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
            at java.util.HashSet.<init>(HashSet.java:119)
            at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
            at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293)
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:880)
            ...
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    - new unit test to reproduce the issue and detect potential future regression

----


---