You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/07/06 17:54:34 UTC
[kafka] branch trunk updated: KAFKA-7136: Avoid deadlocks in
synchronized metrics reporters (#5341)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1f8527b KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)
1f8527b is described below
commit 1f8527b331f3d4c8e3ea16993d96caae2ea18fc5
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 6 18:54:28 2018 +0100
KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)
We need to use the same lock for metric update and read to avoid NPE and concurrent modification exceptions. Sensor add/remove/update are synchronized on Sensor since they access lists and maps that are not thread-safe. Reporters are notified of metrics add/remove while holding (Sensor, Metrics) locks and reporters may synchronize on the reporter lock. Metric read may be invoked by metrics reporters while holding a reporter lock. So read/update cannot be synchronized using Sensor sinc [...]
Locking order:
- Sensor#add: Sensor -> Metrics -> MetricsReporter
- Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter
- KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock
- Sensor#record: Sensor -> Sensor#metricLock
Reviewers: Jun Rao <ju...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/common/metrics/Sensor.java | 38 +++++--
.../apache/kafka/common/metrics/MetricsTest.java | 117 ++++++++++++++++++---
3 files changed, 135 insertions(+), 22 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5bf69b6..e80d5bf 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -73,7 +73,7 @@
files="RequestResponseTest.java"/>
<suppress checks="NPathComplexity"
- files="MemoryRecordsTest.java"/>
+ files="MemoryRecordsTest|MetricsTest"/>
<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index e4bf1ae..ccbe8aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -48,6 +48,7 @@ public final class Sensor {
private final Time time;
private volatile long lastRecordTime;
private final long inactiveSensorExpirationTimeMs;
+ private final Object metricLock;
public enum RecordingLevel {
INFO(0, "INFO"), DEBUG(1, "DEBUG");
@@ -113,6 +114,7 @@ public final class Sensor {
this.inactiveSensorExpirationTimeMs = TimeUnit.MILLISECONDS.convert(inactiveSensorExpirationTimeSeconds, TimeUnit.SECONDS);
this.lastRecordTime = time.milliseconds();
this.recordingLevel = recordingLevel;
+ this.metricLock = new Object();
checkForest(new HashSet<Sensor>());
}
@@ -174,9 +176,11 @@ public final class Sensor {
if (shouldRecord()) {
this.lastRecordTime = timeMs;
synchronized (this) {
- // increment all the stats
- for (Stat stat : this.stats)
- stat.record(config, value, timeMs);
+ synchronized (metricLock()) {
+ // increment all the stats
+ for (Stat stat : this.stats)
+ stat.record(config, value, timeMs);
+ }
if (checkQuotas)
checkQuotas(timeMs);
}
@@ -229,7 +233,7 @@ public final class Sensor {
return false;
this.stats.add(Utils.notNull(stat));
- Object lock = metricLock(stat);
+ Object lock = metricLock();
for (NamedMeasurable m : stat.stats()) {
final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
if (!metrics.containsKey(metric.metricName())) {
@@ -265,7 +269,7 @@ public final class Sensor {
return true;
} else {
final KafkaMetric metric = new KafkaMetric(
- metricLock(stat),
+ metricLock(),
Utils.notNull(metricName),
Utils.notNull(stat),
config == null ? this.config : config,
@@ -291,10 +295,26 @@ public final class Sensor {
}
/**
- * KafkaMetrics of sensors which use SampledStat should be synchronized on the Sensor object
- * to allow concurrent reads and updates. For simplicity, all sensors are synchronized on Sensor.
+ * KafkaMetrics of sensors which use SampledStat should be synchronized on the same lock
+ * for sensor record and metric value read to allow concurrent reads and updates. For simplicity,
+ * all sensors are synchronized on this object.
+ * <p>
+ * Sensor object is not used as a lock for reading metric value since metrics reporter is
+ * invoked while holding Sensor and Metrics locks to report addition and removal of metrics
+ * and synchronized reporters may deadlock if Sensor lock is used for reading metrics values.
+ * Note that Sensor object itself is used as a lock to protect the access to stats and metrics
+ * while recording metric values, adding and deleting sensors.
+ * </p><p>
+ * Locking order (assume all MetricsReporter methods may be synchronized):
+ * <ul>
+ * <li>Sensor#add: Sensor -> Metrics -> MetricsReporter</li>
+ * <li>Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter</li>
+ * <li>KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock</li>
+ * <li>Sensor#record: Sensor -> Sensor#metricLock</li>
+ * </ul>
+ * </p>
*/
- private Object metricLock(Stat stat) {
- return this;
+ private Object metricLock() {
+ return metricLock;
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 6acc39d..59bc84e 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -26,13 +26,16 @@ import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
+import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Metric;
@@ -54,9 +57,12 @@ import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("deprecation")
public class MetricsTest {
+ private static final Logger log = LoggerFactory.getLogger(MetricsTest.class);
private static final double EPS = 0.000001;
private MockTime time = new MockTime();
@@ -604,8 +610,12 @@ public class MetricsTest {
}
}
+ /**
+ * Verifies that concurrent sensor add, remove, updates and read don't result
+ * in errors or deadlock.
+ */
@Test
- public void testConcurrentAccess() throws Exception {
+ public void testConcurrentReadUpdate() throws Exception {
final Random random = new Random();
final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
metrics = new Metrics(new MockTime(10));
@@ -613,16 +623,8 @@ public class MetricsTest {
final AtomicBoolean alive = new AtomicBoolean(true);
executorService = Executors.newSingleThreadExecutor();
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- while (alive.get()) {
- for (Sensor sensor : sensors) {
- sensor.record(random.nextInt(10000));
- }
- }
- }
- });
+ executorService.submit(new ConcurrentMetricOperation(alive, "record",
+ () -> sensors.forEach(sensor -> sensor.record(random.nextInt(10000)))));
for (int i = 0; i < 10000; i++) {
if (sensors.size() > 5) {
@@ -640,6 +642,97 @@ public class MetricsTest {
alive.set(false);
}
+ /**
+ * Verifies that concurrent sensor add, remove, updates and read with a metrics reporter
+ * that synchronizes on every reporter method doesn't result in errors or deadlock.
+ */
+ @Test
+ public void testConcurrentReadUpdateReport() throws Exception {
+
+ class LockingReporter implements MetricsReporter {
+ Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>();
+ @Override
+ public synchronized void init(List<KafkaMetric> metrics) {
+ }
+
+ @Override
+ public synchronized void metricChange(KafkaMetric metric) {
+ activeMetrics.put(metric.metricName(), metric);
+ }
+
+ @Override
+ public synchronized void metricRemoval(KafkaMetric metric) {
+ activeMetrics.remove(metric.metricName(), metric);
+ }
+
+ @Override
+ public synchronized void close() {
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ }
+
+ synchronized void processMetrics() {
+ for (KafkaMetric metric : activeMetrics.values()) {
+ assertNotNull("Invalid metric value", metric.metricValue());
+ }
+ }
+ }
+
+ final LockingReporter reporter = new LockingReporter();
+ this.metrics.close();
+ this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) reporter), new MockTime(10), true);
+ final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
+ SensorCreator sensorCreator = new SensorCreator(metrics);
+
+ final Random random = new Random();
+ final AtomicBoolean alive = new AtomicBoolean(true);
+ executorService = Executors.newFixedThreadPool(3);
+
+ Future<?> writeFuture = executorService.submit(new ConcurrentMetricOperation(alive, "record",
+ () -> sensors.forEach(sensor -> sensor.record(random.nextInt(10000)))));
+ Future<?> readFuture = executorService.submit(new ConcurrentMetricOperation(alive, "read",
+ () -> sensors.forEach(sensor -> sensor.metrics().forEach(metric ->
+ assertNotNull("Invalid metric value", metric.metricValue())))));
+ Future<?> reportFuture = executorService.submit(new ConcurrentMetricOperation(alive, "report",
+ () -> reporter.processMetrics()));
+
+ for (int i = 0; i < 10000; i++) {
+ if (sensors.size() > 10) {
+ Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast();
+ metrics.removeSensor(sensor.name());
+ }
+ StatType statType = StatType.forId(random.nextInt(StatType.values().length));
+ sensors.add(sensorCreator.createSensor(statType, i));
+ }
+ assertFalse("Read failed", readFuture.isDone());
+ assertFalse("Write failed", writeFuture.isDone());
+ assertFalse("Report failed", reportFuture.isDone());
+
+ alive.set(false);
+ }
+
+ private class ConcurrentMetricOperation implements Runnable {
+ private final AtomicBoolean alive;
+ private final String opName;
+ private final Runnable op;
+ ConcurrentMetricOperation(AtomicBoolean alive, String opName, Runnable op) {
+ this.alive = alive;
+ this.opName = opName;
+ this.op = op;
+ }
+ public void run() {
+ try {
+ while (alive.get()) {
+ op.run();
+ }
+ } catch (Throwable t) {
+ log.error("Metric {} failed with exception", opName, t);
+ }
+ }
+ }
+
enum StatType {
AVG(0),
TOTAL(1),
@@ -676,7 +769,7 @@ public class MetricsTest {
}
private Sensor createSensor(StatType statType, int index) {
- Sensor sensor = metrics.sensor("kafka.requests");
+ Sensor sensor = metrics.sensor("kafka.requests." + index);
Map<String, String> tags = Collections.singletonMap("tag", "tag" + index);
switch (statType) {
case AVG: