You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/05/10 11:27:56 UTC

[kafka] branch trunk updated: KAFKA-6870 Concurrency conflicts in SampledStat (#4985)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f7c11a  KAFKA-6870 Concurrency conflicts in SampledStat (#4985)
4f7c11a is described below

commit 4f7c11a1df26836c7a15f062a5431adb3d371a86
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu May 10 19:27:45 2018 +0800

    KAFKA-6870 Concurrency conflicts in SampledStat (#4985)
    
    Make `KafkaMetric.measurableValue` thread-safe
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../apache/kafka/common/metrics/KafkaMetric.java   | 14 ++---
 .../apache/kafka/common/metrics/SensorTest.java    | 62 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index f04981a..48999e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -55,9 +55,7 @@ public final class KafkaMetric implements Metric {
     @Override
     @Deprecated
     public double value() {
-        synchronized (this.lock) {
-            return measurableValue(time.milliseconds());
-        }
+        return measurableValue(time.milliseconds());
     }
 
     @Override
@@ -81,10 +79,12 @@ public final class KafkaMetric implements Metric {
     }
 
     double measurableValue(long timeMs) {
-        if (this.metricValueProvider instanceof Measurable)
-            return ((Measurable) metricValueProvider).measure(config, timeMs);
-        else
-            return 0;
+        synchronized (this.lock) {
+            if (this.metricValueProvider instanceof Measurable)
+                return ((Measurable) metricValueProvider).measure(config, timeMs);
+            else
+                return 0;
+        }
     }
 
     public void config(MetricConfig config) {
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index a1c0814..8e3dfeb 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -19,19 +19,29 @@ package org.apache.kafka.common.metrics;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -124,4 +134,56 @@ public class SensorTest {
         assertEquals(1, sensor.metrics().size());
         assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass());
     }
+
+    /**
+     * The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads.
+     */
+    @Test
+    public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException {
+        final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+            // decreasing the value of time window make SampledStat always record the given value
+            .timeWindow(1, TimeUnit.MILLISECONDS)
+            // increasing the value of samples make SampledStat store more samples
+            .samples(100));
+        final Sensor sensor = metrics.sensor("sensor");
+
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Rate()));
+        final int threadCount = 10;
+        final CountDownLatch latch = new CountDownLatch(1);
+        ExecutorService service = Executors.newFixedThreadPool(threadCount);
+        List<Future<Throwable>> workers = new ArrayList<>(threadCount);
+        boolean needShutdown = true;
+        try {
+            for (int i = 0; i != threadCount; ++i) {
+                final int index = i;
+                workers.add(service.submit(new Callable<Throwable>() {
+                    @Override
+                    public Throwable call() {
+                        try {
+                            assertTrue(latch.await(5, TimeUnit.SECONDS));
+                            for (int j = 0; j != 20; ++j) {
+                                sensor.record(j * index, System.currentTimeMillis() + j, false);
+                                sensor.checkQuotas();
+                            }
+                            return null;
+                        } catch (Throwable e) {
+                            return e;
+                        }
+                    }
+                }));
+            }
+            latch.countDown();
+            service.shutdown();
+            assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+            needShutdown = false;
+            for (Future<Throwable> callable : workers) {
+                assertTrue("If this failure happen frequently, we can try to increase the wait time", callable.isDone());
+                assertNull("Sensor#checkQuotas SHOULD be thread-safe!", callable.get());
+            }
+        } finally {
+            if (needShutdown) {
+                service.shutdownNow();
+            }
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.