You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Eric Wu (Jira)" <ji...@apache.org> on 2022/07/27 20:11:00 UTC

[jira] [Resolved] (KAFKA-14119) Sensor in metrics has potential thread safety issues

     [ https://issues.apache.org/jira/browse/KAFKA-14119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eric Wu resolved KAFKA-14119.
-----------------------------
    Resolution: Not A Bug

> Sensor in metrics has potential thread safety issues
> ----------------------------------------------------
>
>                 Key: KAFKA-14119
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14119
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics
>            Reporter: Eric Wu
>            Priority: Major
>
> There are potential issues of a `Sensor` not being protected from race conditions when it [records|https://github.com/apache/kafka/blob/6ac58ac6fcd53a512ea0bc0b3dc66f49870ff0cb/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java#L230] a value.
> It can be reproduced with a unit test, e.g., in `SensorTest`:
> {code:java}
> @Test
> public void testSensorRecordThreadSafety() {
>     Time time = new MockTime(0, System.currentTimeMillis(), 0);
>     Metrics metrics = new Metrics(time);
>     Sensor sensor = metrics.sensor("sensor");
>     MetricName metric = new MetricName("test", "test", "test", Collections.emptyMap());
>     sensor.add(metric, new Value());
>     int totalRequests = 10;
>     AtomicInteger count = new AtomicInteger();
>     Executor threadPool = Executors.newFixedThreadPool(totalRequests);
>     CompletableFuture[] futures = new CompletableFuture[totalRequests];
>     for (int i = 0; i < totalRequests; i++) {
>         futures[i] = CompletableFuture.runAsync(() -> {
>             try {
>                 Thread.sleep(10); // to make it easier to repro
>             } catch (InterruptedException e) {
>                 throw new RuntimeException(e);
>             }
>             sensor.record(count.addAndGet(1));
>         }, threadPool);
>     }
>     CompletableFuture.allOf(futures).join();
>     
>     assertEquals(1, sensor.metrics().size());
>     double value = (double) sensor.metrics().get(0).metricValue();
>     assertEquals(totalRequests, value);
> }{code}
> It needs some tweaks to make the fields visible in the test. Given a few runs, the test should fail which demonstrates the thread safety issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)