You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/07/06 17:54:34 UTC

[kafka] branch trunk updated: KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f8527b  KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)
1f8527b is described below

commit 1f8527b331f3d4c8e3ea16993d96caae2ea18fc5
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 6 18:54:28 2018 +0100

    KAFKA-7136: Avoid deadlocks in synchronized metrics reporters (#5341)
    
    We need to use the same lock for metric update and read to avoid NPE and concurrent modification exceptions. Sensor add/remove/update are synchronized on Sensor since they access lists and maps that are not thread-safe. Reporters are notified of metrics add/remove while holding (Sensor, Metrics) locks and reporters may synchronize on the reporter lock. Metric read may be invoked by metrics reporters while holding a reporter lock. So read/update cannot be synchronized using Sensor sinc [...]
    Locking order:
    
    - Sensor#add: Sensor -> Metrics -> MetricsReporter
    - Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter
    - KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock
    - Sensor#record: Sensor -> Sensor#metricLock
    
    
    Reviewers: Jun Rao <ju...@gmail.com>, Guozhang Wang <wa...@gmail.com>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/common/metrics/Sensor.java    |  38 +++++--
 .../apache/kafka/common/metrics/MetricsTest.java   | 117 ++++++++++++++++++---
 3 files changed, 135 insertions(+), 22 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5bf69b6..e80d5bf 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -73,7 +73,7 @@
               files="RequestResponseTest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="MemoryRecordsTest.java"/>
+              files="MemoryRecordsTest|MetricsTest"/>
 
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index e4bf1ae..ccbe8aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -48,6 +48,7 @@ public final class Sensor {
     private final Time time;
     private volatile long lastRecordTime;
     private final long inactiveSensorExpirationTimeMs;
+    private final Object metricLock;
 
     public enum RecordingLevel {
         INFO(0, "INFO"), DEBUG(1, "DEBUG");
@@ -113,6 +114,7 @@ public final class Sensor {
         this.inactiveSensorExpirationTimeMs = TimeUnit.MILLISECONDS.convert(inactiveSensorExpirationTimeSeconds, TimeUnit.SECONDS);
         this.lastRecordTime = time.milliseconds();
         this.recordingLevel = recordingLevel;
+        this.metricLock = new Object();
         checkForest(new HashSet<Sensor>());
     }
 
@@ -174,9 +176,11 @@ public final class Sensor {
         if (shouldRecord()) {
             this.lastRecordTime = timeMs;
             synchronized (this) {
-                // increment all the stats
-                for (Stat stat : this.stats)
-                    stat.record(config, value, timeMs);
+                synchronized (metricLock()) {
+                    // increment all the stats
+                    for (Stat stat : this.stats)
+                        stat.record(config, value, timeMs);
+                }
                 if (checkQuotas)
                     checkQuotas(timeMs);
             }
@@ -229,7 +233,7 @@ public final class Sensor {
             return false;
 
         this.stats.add(Utils.notNull(stat));
-        Object lock = metricLock(stat);
+        Object lock = metricLock();
         for (NamedMeasurable m : stat.stats()) {
             final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time);
             if (!metrics.containsKey(metric.metricName())) {
@@ -265,7 +269,7 @@ public final class Sensor {
             return true;
         } else {
             final KafkaMetric metric = new KafkaMetric(
-                metricLock(stat),
+                metricLock(),
                 Utils.notNull(metricName),
                 Utils.notNull(stat),
                 config == null ? this.config : config,
@@ -291,10 +295,26 @@ public final class Sensor {
     }
 
     /**
-     * KafkaMetrics of sensors which use SampledStat should be synchronized on the Sensor object
-     * to allow concurrent reads and updates. For simplicity, all sensors are synchronized on Sensor.
+     * KafkaMetrics of sensors which use SampledStat should be synchronized on the same lock
+     * for sensor record and metric value read to allow concurrent reads and updates. For simplicity,
+     * all sensors are synchronized on this object.
+     * <p>
+     * Sensor object is not used as a lock for reading metric value since metrics reporter is
+     * invoked while holding Sensor and Metrics locks to report addition and removal of metrics
+     * and synchronized reporters may deadlock if Sensor lock is used for reading metrics values.
+     * Note that Sensor object itself is used as a lock to protect the access to stats and metrics
+     * while recording metric values, adding and deleting sensors.
+     * </p><p>
+     * Locking order (assume all MetricsReporter methods may be synchronized):
+     * <ul>
+     *   <li>Sensor#add: Sensor -> Metrics -> MetricsReporter</li>
+     *   <li>Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter</li>
+     *   <li>KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock</li>
+     *   <li>Sensor#record: Sensor -> Sensor#metricLock</li>
+     * </ul>
+     * </p>
      */
-    private Object metricLock(Stat stat) {
-        return this;
+    private Object metricLock() {
+        return metricLock;
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 6acc39d..59bc84e 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -26,13 +26,16 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.List;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.common.Metric;
@@ -54,9 +57,12 @@ import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("deprecation")
 public class MetricsTest {
+    private static final Logger log = LoggerFactory.getLogger(MetricsTest.class);
 
     private static final double EPS = 0.000001;
     private MockTime time = new MockTime();
@@ -604,8 +610,12 @@ public class MetricsTest {
         }
     }
 
+    /**
+     * Verifies that concurrent sensor add, remove, updates and read don't result
+     * in errors or deadlock.
+     */
     @Test
-    public void testConcurrentAccess() throws Exception {
+    public void testConcurrentReadUpdate() throws Exception {
         final Random random = new Random();
         final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
         metrics = new Metrics(new MockTime(10));
@@ -613,16 +623,8 @@ public class MetricsTest {
 
         final AtomicBoolean alive = new AtomicBoolean(true);
         executorService = Executors.newSingleThreadExecutor();
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                while (alive.get()) {
-                    for (Sensor sensor : sensors) {
-                        sensor.record(random.nextInt(10000));
-                    }
-                }
-            }
-        });
+        executorService.submit(new ConcurrentMetricOperation(alive, "record",
+            () -> sensors.forEach(sensor -> sensor.record(random.nextInt(10000)))));
 
         for (int i = 0; i < 10000; i++) {
             if (sensors.size() > 5) {
@@ -640,6 +642,97 @@ public class MetricsTest {
         alive.set(false);
     }
 
+    /**
+     * Verifies that concurrent sensor add, remove, updates and read with a metrics reporter
+     * that synchronizes on every reporter method doesn't result in errors or deadlock.
+     */
+    @Test
+    public void testConcurrentReadUpdateReport() throws Exception {
+
+        class LockingReporter implements MetricsReporter {
+            Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>();
+            @Override
+            public synchronized void init(List<KafkaMetric> metrics) {
+            }
+
+            @Override
+            public synchronized void metricChange(KafkaMetric metric) {
+                activeMetrics.put(metric.metricName(), metric);
+            }
+
+            @Override
+            public synchronized void metricRemoval(KafkaMetric metric) {
+                activeMetrics.remove(metric.metricName(), metric);
+            }
+
+            @Override
+            public synchronized void close() {
+            }
+
+            @Override
+            public void configure(Map<String, ?> configs) {
+            }
+
+            synchronized void processMetrics() {
+                for (KafkaMetric metric : activeMetrics.values()) {
+                    assertNotNull("Invalid metric value", metric.metricValue());
+                }
+            }
+        }
+
+        final LockingReporter reporter = new LockingReporter();
+        this.metrics.close();
+        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) reporter), new MockTime(10), true);
+        final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
+        SensorCreator sensorCreator = new SensorCreator(metrics);
+
+        final Random random = new Random();
+        final AtomicBoolean alive = new AtomicBoolean(true);
+        executorService = Executors.newFixedThreadPool(3);
+
+        Future<?> writeFuture = executorService.submit(new ConcurrentMetricOperation(alive, "record",
+            () -> sensors.forEach(sensor -> sensor.record(random.nextInt(10000)))));
+        Future<?> readFuture = executorService.submit(new ConcurrentMetricOperation(alive, "read",
+            () -> sensors.forEach(sensor -> sensor.metrics().forEach(metric ->
+                assertNotNull("Invalid metric value", metric.metricValue())))));
+        Future<?> reportFuture = executorService.submit(new ConcurrentMetricOperation(alive, "report",
+            () -> reporter.processMetrics()));
+
+        for (int i = 0; i < 10000; i++) {
+            if (sensors.size() > 10) {
+                Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast();
+                metrics.removeSensor(sensor.name());
+            }
+            StatType statType = StatType.forId(random.nextInt(StatType.values().length));
+            sensors.add(sensorCreator.createSensor(statType, i));
+        }
+        assertFalse("Read failed", readFuture.isDone());
+        assertFalse("Write failed", writeFuture.isDone());
+        assertFalse("Report failed", reportFuture.isDone());
+
+        alive.set(false);
+    }
+
+    private class ConcurrentMetricOperation implements Runnable {
+        private final AtomicBoolean alive;
+        private final String opName;
+        private final Runnable op;
+        ConcurrentMetricOperation(AtomicBoolean alive, String opName, Runnable op) {
+            this.alive = alive;
+            this.opName = opName;
+            this.op = op;
+        }
+        public void run() {
+            try {
+                while (alive.get()) {
+                    op.run();
+                }
+            } catch (Throwable t) {
+                log.error("Metric {} failed with exception", opName, t);
+            }
+        }
+    }
+
     enum StatType {
         AVG(0),
         TOTAL(1),
@@ -676,7 +769,7 @@ public class MetricsTest {
         }
 
         private Sensor createSensor(StatType statType, int index) {
-            Sensor sensor = metrics.sensor("kafka.requests");
+            Sensor sensor = metrics.sensor("kafka.requests." + index);
             Map<String, String> tags = Collections.singletonMap("tag", "tag" + index);
             switch (statType) {
                 case AVG: