You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/19 11:03:52 UTC

[james-project] 04/06: JAMES-3170 Fix metric measurement upon reactor publisher replay

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 520cdac2f54674f96ea7edaca913a42778e8d4d1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 12 10:36:48 2020 +0700

    JAMES-3170 Fix metric measurement upon reactor publisher replay
    
    A single metric was used for all the retries leading to an inaccurate
    measurement.
---
 .../dropwizard/DropWizardMetricFactory.java        | 15 ++++++-----
 .../metrics/dropwizard/DropWizardTimeMetric.java   | 11 ++++++++
 .../dropwizard/DropWizardMetricFactoryTest.java    | 30 ++++++++++++++++++++++
 .../james/metrics/logger/DefaultMetricFactory.java | 12 +++++----
 .../metrics/tests/RecordingMetricFactory.java      | 12 +++++----
 .../metrics/tests/RecordingMetricFactoryTest.java  | 26 +++++++++++++++++++
 .../blob/cassandra/cache/CachedBlobStore.java      | 10 +++-----
 .../blob/cassandra/cache/CachedBlobStoreTest.java  | 30 ++++++++++++++++++++++
 8 files changed, 123 insertions(+), 23 deletions(-)

diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
index 2b0081c..b18a04e 100644
--- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
+++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
@@ -28,13 +28,13 @@ import javax.inject.Inject;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
 import org.reactivestreams.Publisher;
 
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.jmx.JmxReporter;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class DropWizardMetricFactory implements MetricFactory, Startable {
 
@@ -54,21 +54,22 @@ public class DropWizardMetricFactory implements MetricFactory, Startable {
     }
 
     @Override
-    public TimeMetric timer(String name) {
+    public DropWizardTimeMetric timer(String name) {
         return new DropWizardTimeMetric(name, metricRegistry.timer(name));
     }
 
     @Override
     public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) {
-        TimeMetric timer = timer(name);
-        return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+        return Mono.fromCallable(() -> timer(name))
+            .flatMapMany(timer ->  Flux.from(publisher)
+                .doOnComplete(timer::stopAndPublish));
     }
 
     @Override
     public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) {
-        TimeMetric timer = timer(name);
-        return Flux.from(publisher)
-            .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD));
+        return Mono.fromCallable(() -> timer(name))
+            .flatMapMany(timer ->  Flux.from(publisher)
+                .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)));
     }
 
     @PostConstruct
diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java
index adf2c7a..834da92 100644
--- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java
+++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 public class DropWizardTimeMetric implements TimeMetric {
@@ -73,6 +74,16 @@ public class DropWizardTimeMetric implements TimeMetric {
         this.context = this.timer.time();
     }
 
+    @VisibleForTesting
+    Timer.Context getContext() {
+        return context;
+    }
+
+    @VisibleForTesting
+    Timer getTimer() {
+        return timer;
+    }
+
     @Override
     public String name() {
         return name;
diff --git a/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java b/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java
index 75aada0..368075c 100644
--- a/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java
+++ b/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java
@@ -19,12 +19,20 @@
 
 package org.apache.james.metrics.dropwizard;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.MetricFactoryContract;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import com.codahale.metrics.MetricRegistry;
 
+import reactor.core.publisher.Mono;
+
 class DropWizardMetricFactoryTest implements MetricFactoryContract {
 
     private DropWizardMetricFactory testee;
@@ -38,4 +46,26 @@ class DropWizardMetricFactoryTest implements MetricFactoryContract {
     public MetricFactory testee() {
         return testee;
     }
+
+    @Test
+    void decoratePublisherWithTimerMetricShouldRecordANewValueForEachRetry() {
+        Duration duration = Duration.ofMillis(100);
+        Mono.from(testee.decoratePublisherWithTimerMetric("any", Mono.delay(duration)))
+            .repeat(5)
+            .blockLast();
+
+        assertThat(testee.timer("any").getTimer().getSnapshot().get99thPercentile())
+            .isLessThan(duration.get(ChronoUnit.NANOS) * 2);
+    }
+
+    @Test
+    void decoratePublisherWithTimerMetricLogP99ShouldRecordANewValueForEachRetry() {
+        Duration duration = Duration.ofMillis(100);
+        Mono.from(testee.decoratePublisherWithTimerMetricLogP99("any", Mono.delay(duration)))
+            .repeat(5)
+            .blockLast();
+
+        assertThat(testee.timer("any").getTimer().getSnapshot().get99thPercentile())
+            .isLessThan(duration.get(ChronoUnit.NANOS) * 2);
+    }
 }
\ No newline at end of file
diff --git a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
index 919363c..e0aa75f 100644
--- a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
+++ b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class DefaultMetricFactory implements MetricFactory {
 
@@ -45,14 +46,15 @@ public class DefaultMetricFactory implements MetricFactory {
 
     @Override
     public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) {
-        TimeMetric timer = timer(name);
-        return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+        return Mono.fromCallable(() -> timer(name))
+            .flatMapMany(timer ->  Flux.from(publisher)
+                .doOnComplete(timer::stopAndPublish));
     }
 
     @Override
     public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) {
-        TimeMetric timer = timer(name);
-        return Flux.from(publisher)
-            .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD));
+        return Mono.fromCallable(() -> timer(name))
+            .flatMapMany(timer ->  Flux.from(publisher)
+                .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)));
     }
 }
diff --git a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
index 63a8922..78aeecd 100644
--- a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
+++ b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class RecordingMetricFactory implements MetricFactory {
     private final Multimap<String, Duration> executionTimes = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -63,15 +64,16 @@ public class RecordingMetricFactory implements MetricFactory {
 
     @Override
     public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) {
-        TimeMetric timer = timer(name);
-        return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+        return Mono.fromCallable(() -> timer(name))
+            .flatMapMany(timer ->  Flux.from(publisher)
+                .doOnComplete(timer::stopAndPublish));
     }
 
     @Override
     public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) {
-        TimeMetric timer = timer(name);
-        return Flux.from(publisher)
-            .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD));
+        return Mono.fromCallable(() -> timer(name))
+            .flatMapMany(timer ->  Flux.from(publisher)
+                .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)));
     }
 
     public Collection<Duration> executionTimesFor(String name) {
diff --git a/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java b/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java
index c125ea3..bbdcf92 100644
--- a/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java
+++ b/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java
@@ -32,6 +32,8 @@ import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Mono;
+
 class RecordingMetricFactoryTest implements MetricFactoryContract {
 
     private static final String TIME_METRIC_NAME = "timerMetric";
@@ -118,4 +120,28 @@ class RecordingMetricFactoryTest implements MetricFactoryContract {
         assertThat(testee.countFor(METRIC_NAME))
             .isEqualTo(5);
     }
+
+    @Test
+    void decoratePublisherWithTimerMetricShouldRecordANewValueForEachRetry() {
+        Duration duration = Duration.ofMillis(100);
+        Mono.from(testee.decoratePublisherWithTimerMetric("any", Mono.delay(duration)))
+            .repeat(5)
+            .blockLast();
+
+        assertThat(testee.executionTimesFor("any"))
+            .hasSize(6)
+            .allSatisfy(timing -> assertThat(timing).isLessThan(duration.multipliedBy(2)));
+    }
+
+    @Test
+    void decoratePublisherWithTimerMetricLogP99ShouldRecordANewValueForEachRetry() {
+        Duration duration = Duration.ofMillis(100);
+        Mono.from(testee.decoratePublisherWithTimerMetricLogP99("any", Mono.delay(duration)))
+            .repeat(5)
+            .blockLast();
+
+        assertThat(testee.executionTimesFor("any"))
+            .hasSize(6)
+            .allSatisfy(timing -> assertThat(timing).isLessThan(duration.multipliedBy(2)));
+    }
 }
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 48a8cfd..0e998c0 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -38,7 +38,6 @@ import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.blob.api.ObjectStoreIOException;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
 import org.reactivestreams.Publisher;
 
 import com.google.common.base.Preconditions;
@@ -277,10 +276,9 @@ public class CachedBlobStore implements BlobStore {
     }
 
     private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
-        TimeMetric timer = metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME);
-
-        return Mono.from(backend.readBytes(bucketName, blobId))
-            .doOnSuccess(any -> timer.stopAndPublish())
-            .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish());
+        return Mono.fromCallable(() -> metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
+            .flatMap(timer -> Mono.from(backend.readBytes(bucketName, blobId))
+                .doOnSuccess(any -> timer.stopAndPublish())
+                .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish()));
     }
 }
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index 5caff3f..1d2abcd 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -446,6 +446,36 @@ public class CachedBlobStoreTest implements BlobStoreContract {
         }
 
         @Test
+        void readBytesShouldRecordDistinctTimingsWhenRepeatAndBackendRead() {
+            BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block();
+
+            Duration delay = Duration.ofMillis(500);
+            Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId))
+                .then(Mono.delay(delay))
+                .repeat(2)
+                .blockLast();
+
+            assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
+                .hasSize(3)
+                .allSatisfy(timing -> assertThat(timing).isLessThan(delay));
+        }
+
+        @Test
+        void readBytesShouldRecordDistinctTimingsWhenRepeat() {
+            BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+            Duration delay = Duration.ofMillis(500);
+            Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId))
+                .then(Mono.delay(delay))
+                .repeat(2)
+                .blockLast();
+
+            assertThat(metricFactory.executionTimesFor(BLOBSTORE_CACHED_LATENCY_METRIC_NAME))
+                .hasSize(3)
+                .allSatisfy(timing -> assertThat(timing).isLessThan(delay));
+        }
+
+        @Test
         void readBlobStoreCacheShouldCountWhenHit() {
             BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org