You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/05/10 12:13:21 UTC

[kafka] 02/02: KAFKA-6870 Concurrency conflicts in SampledStat (#4985)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cf703662698b75b0f163b9191c0a6c2d49895a1b
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu May 10 19:27:45 2018 +0800

    KAFKA-6870 Concurrency conflicts in SampledStat (#4985)
    
    Make `KafkaMetric.measurableValue` thread-safe
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../apache/kafka/common/metrics/KafkaMetric.java   | 14 ++---
 .../apache/kafka/common/metrics/SensorTest.java    | 70 +++++++++++++++++++++-
 2 files changed, 74 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index f04981a..48999e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -55,9 +55,7 @@ public final class KafkaMetric implements Metric {
     @Override
     @Deprecated
     public double value() {
-        synchronized (this.lock) {
-            return measurableValue(time.milliseconds());
-        }
+        return measurableValue(time.milliseconds());
     }
 
     @Override
@@ -81,10 +79,12 @@ public final class KafkaMetric implements Metric {
     }
 
     double measurableValue(long timeMs) {
-        if (this.metricValueProvider instanceof Measurable)
-            return ((Measurable) metricValueProvider).measure(config, timeMs);
-        else
-            return 0;
+        synchronized (this.lock) {
+            if (this.metricValueProvider instanceof Measurable)
+                return ((Measurable) metricValueProvider).measure(config, timeMs);
+            else
+                return 0;
+        }
     }
 
     public void config(MetricConfig config) {
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index d22111e..74d4036 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -16,13 +16,25 @@
  */
 package org.apache.kafka.common.metrics;
 
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.SystemTime;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.kafka.common.utils.SystemTime;
-import org.junit.Test;
-
 public class SensorTest {
     @Test
     public void testRecordLevelEnum() {
@@ -59,4 +71,56 @@ public class SensorTest {
             0, Sensor.RecordingLevel.DEBUG);
         assertFalse(debugSensor.shouldRecord());
     }
+
+    /**
+     * The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads.
+     */
+    @Test
+    public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException {
+        final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+            // decreasing the value of time window make SampledStat always record the given value
+            .timeWindow(1, TimeUnit.MILLISECONDS)
+            // increasing the value of samples make SampledStat store more samples
+            .samples(100));
+        final Sensor sensor = metrics.sensor("sensor");
+
+        sensor.add(metrics.metricName("test-metric", "test-group"), new Rate());
+        final int threadCount = 10;
+        final CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService service = Executors.newFixedThreadPool(threadCount);
+        List<Future<Throwable>> workers = new ArrayList<>(threadCount);
+        boolean needShutdown = true;
+        try {
+            for (int i = 0; i != threadCount; ++i) {
+                final int index = i;
+                workers.add(service.submit(new Callable<Throwable>() {
+                    @Override
+                    public Throwable call() {
+                        try {
+                            assertTrue(latch.await(5, TimeUnit.SECONDS));
+                            for (int j = 0; j != 20; ++j) {
+                                sensor.record(j * index, System.currentTimeMillis() + j, false);
+                                sensor.checkQuotas();
+                            }
+                            return null;
+                        } catch (Throwable e) {
+                            return e;
+                        }
+                    }
+                }));
+            }
+            latch.countDown();
+            service.shutdown();
+            assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+            needShutdown = false;
+            for (Future<Throwable> callable : workers) {
+                assertTrue("If this failure happen frequently, we can try to increase the wait time", callable.isDone());
+                assertNull("Sensor#checkQuotas SHOULD be thread-safe!", callable.get());
+            }
+        } finally {
+            if (needShutdown) {
+                service.shutdownNow();
+            }
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.