You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Eric Wu (Jira)" <ji...@apache.org> on 2022/07/27 19:17:00 UTC
[jira] [Created] (KAFKA-14119) Sensor in metrics has potential thread safety issues
Eric Wu created KAFKA-14119:
-------------------------------
Summary: Sensor in metrics has potential thread safety issues
Key: KAFKA-14119
URL: https://issues.apache.org/jira/browse/KAFKA-14119
Project: Kafka
Issue Type: Bug
Components: metrics
Reporter: Eric Wu
There are potential issues of a `Sensor` not being protected from race conditions when it [records|https://github.com/apache/kafka/blob/6ac58ac6fcd53a512ea0bc0b3dc66f49870ff0cb/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java#L230] a value.
It can be reproduced with a unit test, e.g., in `SensorTest`:
{code:java}
@Test
public void testSensorRecordThreadSafety() {
Time time = new MockTime(0, System.currentTimeMillis(), 0);
Metrics metrics = new Metrics(time);
Sensor sensor = metrics.sensor("sensor");
MetricName metric = new MetricName("test", "test", "test", Collections.emptyMap());
sensor.add(metric, new Value());
int totalRequests = 10;
AtomicInteger count = new AtomicInteger();
Executor threadPool = Executors.newFixedThreadPool(totalRequests);
CompletableFuture[] futures = new CompletableFuture[totalRequests];
for (int i = 0; i < totalRequests; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(10); // to make it easier to repro
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sensor.record(count.addAndGet(1));
}, threadPool);
}
CompletableFuture.allOf(futures).join();
assertEquals(1, sensor.metrics().size());
double value = (double) sensor.metrics().get(0).metricValue();
assertEquals(totalRequests, value);
}{code}
It needs some tweaks to make the fields visible in the test. Given a few runs, the test should fail which demonstrates the thread safety issue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)