You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/05/10 12:13:21 UTC
[kafka] 02/02: KAFKA-6870 Concurrency conflicts in SampledStat
(#4985)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit cf703662698b75b0f163b9191c0a6c2d49895a1b
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu May 10 19:27:45 2018 +0800
KAFKA-6870 Concurrency conflicts in SampledStat (#4985)
Make `KafkaMetric.measurableValue` thread-safe
Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
.../apache/kafka/common/metrics/KafkaMetric.java | 14 ++---
.../apache/kafka/common/metrics/SensorTest.java | 70 +++++++++++++++++++++-
2 files changed, 74 insertions(+), 10 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index f04981a..48999e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -55,9 +55,7 @@ public final class KafkaMetric implements Metric {
@Override
@Deprecated
public double value() {
- synchronized (this.lock) {
- return measurableValue(time.milliseconds());
- }
+ return measurableValue(time.milliseconds());
}
@Override
@@ -81,10 +79,12 @@ public final class KafkaMetric implements Metric {
}
double measurableValue(long timeMs) {
- if (this.metricValueProvider instanceof Measurable)
- return ((Measurable) metricValueProvider).measure(config, timeMs);
- else
- return 0;
+ synchronized (this.lock) {
+ if (this.metricValueProvider instanceof Measurable)
+ return ((Measurable) metricValueProvider).measure(config, timeMs);
+ else
+ return 0;
+ }
}
public void config(MetricConfig config) {
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index d22111e..74d4036 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -16,13 +16,25 @@
*/
package org.apache.kafka.common.metrics;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.SystemTime;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import org.apache.kafka.common.utils.SystemTime;
-import org.junit.Test;
-
public class SensorTest {
@Test
public void testRecordLevelEnum() {
@@ -59,4 +71,56 @@ public class SensorTest {
0, Sensor.RecordingLevel.DEBUG);
assertFalse(debugSensor.shouldRecord());
}
+
+ /**
+ * The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads.
+ */
+ @Test
+ public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException {
+ final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+ // decreasing the value of time window make SampledStat always record the given value
+ .timeWindow(1, TimeUnit.MILLISECONDS)
+ // increasing the value of samples make SampledStat store more samples
+ .samples(100));
+ final Sensor sensor = metrics.sensor("sensor");
+
+ sensor.add(metrics.metricName("test-metric", "test-group"), new Rate());
+ final int threadCount = 10;
+ final CountDownLatch latch = new CountDownLatch(1);
+ ExecutorService service = Executors.newFixedThreadPool(threadCount);
+ List<Future<Throwable>> workers = new ArrayList<>(threadCount);
+ boolean needShutdown = true;
+ try {
+ for (int i = 0; i != threadCount; ++i) {
+ final int index = i;
+ workers.add(service.submit(new Callable<Throwable>() {
+ @Override
+ public Throwable call() {
+ try {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ for (int j = 0; j != 20; ++j) {
+ sensor.record(j * index, System.currentTimeMillis() + j, false);
+ sensor.checkQuotas();
+ }
+ return null;
+ } catch (Throwable e) {
+ return e;
+ }
+ }
+ }));
+ }
+ latch.countDown();
+ service.shutdown();
+ assertTrue(service.awaitTermination(10, TimeUnit.SECONDS));
+ needShutdown = false;
+ for (Future<Throwable> callable : workers) {
+ assertTrue("If this failure happen frequently, we can try to increase the wait time", callable.isDone());
+ assertNull("Sensor#checkQuotas SHOULD be thread-safe!", callable.get());
+ }
+ } finally {
+ if (needShutdown) {
+ service.shutdownNow();
+ }
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.