You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/19 11:03:52 UTC
[james-project] 04/06: JAMES-3170 Fix metric measurement upon
reactor publisher replay
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 520cdac2f54674f96ea7edaca913a42778e8d4d1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 12 10:36:48 2020 +0700
JAMES-3170 Fix metric measurement upon reactor publisher replay
A single metric was used for all the retries leading to an inaccurate
measurement.
---
.../dropwizard/DropWizardMetricFactory.java | 15 ++++++-----
.../metrics/dropwizard/DropWizardTimeMetric.java | 11 ++++++++
.../dropwizard/DropWizardMetricFactoryTest.java | 30 ++++++++++++++++++++++
.../james/metrics/logger/DefaultMetricFactory.java | 12 +++++----
.../metrics/tests/RecordingMetricFactory.java | 12 +++++----
.../metrics/tests/RecordingMetricFactoryTest.java | 26 +++++++++++++++++++
.../blob/cassandra/cache/CachedBlobStore.java | 10 +++-----
.../blob/cassandra/cache/CachedBlobStoreTest.java | 30 ++++++++++++++++++++++
8 files changed, 123 insertions(+), 23 deletions(-)
diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
index 2b0081c..b18a04e 100644
--- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
+++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactory.java
@@ -28,13 +28,13 @@ import javax.inject.Inject;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
import org.reactivestreams.Publisher;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class DropWizardMetricFactory implements MetricFactory, Startable {
@@ -54,21 +54,22 @@ public class DropWizardMetricFactory implements MetricFactory, Startable {
}
@Override
- public TimeMetric timer(String name) {
+ public DropWizardTimeMetric timer(String name) {
return new DropWizardTimeMetric(name, metricRegistry.timer(name));
}
@Override
public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) {
- TimeMetric timer = timer(name);
- return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+ return Mono.fromCallable(() -> timer(name))
+ .flatMapMany(timer -> Flux.from(publisher)
+ .doOnComplete(timer::stopAndPublish));
}
@Override
public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) {
- TimeMetric timer = timer(name);
- return Flux.from(publisher)
- .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD));
+ return Mono.fromCallable(() -> timer(name))
+ .flatMapMany(timer -> Flux.from(publisher)
+ .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)));
}
@PostConstruct
diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java
index adf2c7a..834da92 100644
--- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java
+++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardTimeMetric.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
public class DropWizardTimeMetric implements TimeMetric {
@@ -73,6 +74,16 @@ public class DropWizardTimeMetric implements TimeMetric {
this.context = this.timer.time();
}
+ @VisibleForTesting
+ Timer.Context getContext() {
+ return context;
+ }
+
+ @VisibleForTesting
+ Timer getTimer() {
+ return timer;
+ }
+
@Override
public String name() {
return name;
diff --git a/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java b/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java
index 75aada0..368075c 100644
--- a/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java
+++ b/metrics/metrics-dropwizard/src/test/java/org/apache/james/metrics/dropwizard/DropWizardMetricFactoryTest.java
@@ -19,12 +19,20 @@
package org.apache.james.metrics.dropwizard;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.MetricFactoryContract;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import com.codahale.metrics.MetricRegistry;
+import reactor.core.publisher.Mono;
+
class DropWizardMetricFactoryTest implements MetricFactoryContract {
private DropWizardMetricFactory testee;
@@ -38,4 +46,26 @@ class DropWizardMetricFactoryTest implements MetricFactoryContract {
public MetricFactory testee() {
return testee;
}
+
+ @Test
+ void decoratePublisherWithTimerMetricShouldRecordANewValueForEachRetry() {
+ Duration duration = Duration.ofMillis(100);
+ Mono.from(testee.decoratePublisherWithTimerMetric("any", Mono.delay(duration)))
+ .repeat(5)
+ .blockLast();
+
+ assertThat(testee.timer("any").getTimer().getSnapshot().get99thPercentile())
+ .isLessThan(duration.get(ChronoUnit.NANOS) * 2);
+ }
+
+ @Test
+ void decoratePublisherWithTimerMetricLogP99ShouldRecordANewValueForEachRetry() {
+ Duration duration = Duration.ofMillis(100);
+ Mono.from(testee.decoratePublisherWithTimerMetricLogP99("any", Mono.delay(duration)))
+ .repeat(5)
+ .blockLast();
+
+ assertThat(testee.timer("any").getTimer().getSnapshot().get99thPercentile())
+ .isLessThan(duration.get(ChronoUnit.NANOS) * 2);
+ }
}
\ No newline at end of file
diff --git a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
index 919363c..e0aa75f 100644
--- a/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
+++ b/metrics/metrics-logger/src/main/java/org/apache/james/metrics/logger/DefaultMetricFactory.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class DefaultMetricFactory implements MetricFactory {
@@ -45,14 +46,15 @@ public class DefaultMetricFactory implements MetricFactory {
@Override
public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) {
- TimeMetric timer = timer(name);
- return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+ return Mono.fromCallable(() -> timer(name))
+ .flatMapMany(timer -> Flux.from(publisher)
+ .doOnComplete(timer::stopAndPublish));
}
@Override
public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) {
- TimeMetric timer = timer(name);
- return Flux.from(publisher)
- .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD));
+ return Mono.fromCallable(() -> timer(name))
+ .flatMapMany(timer -> Flux.from(publisher)
+ .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)));
}
}
diff --git a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
index 63a8922..78aeecd 100644
--- a/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
+++ b/metrics/metrics-tests/src/main/java/org/apache/james/metrics/tests/RecordingMetricFactory.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class RecordingMetricFactory implements MetricFactory {
private final Multimap<String, Duration> executionTimes = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
@@ -63,15 +64,16 @@ public class RecordingMetricFactory implements MetricFactory {
@Override
public <T> Publisher<T> decoratePublisherWithTimerMetric(String name, Publisher<T> publisher) {
- TimeMetric timer = timer(name);
- return Flux.from(publisher).doOnComplete(timer::stopAndPublish);
+ return Mono.fromCallable(() -> timer(name))
+ .flatMapMany(timer -> Flux.from(publisher)
+ .doOnComplete(timer::stopAndPublish));
}
@Override
public <T> Publisher<T> decoratePublisherWithTimerMetricLogP99(String name, Publisher<T> publisher) {
- TimeMetric timer = timer(name);
- return Flux.from(publisher)
- .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD));
+ return Mono.fromCallable(() -> timer(name))
+ .flatMapMany(timer -> Flux.from(publisher)
+ .doOnComplete(() -> timer.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD)));
}
public Collection<Duration> executionTimesFor(String name) {
diff --git a/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java b/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java
index c125ea3..bbdcf92 100644
--- a/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java
+++ b/metrics/metrics-tests/src/test/java/org/apache/james/metrics/tests/RecordingMetricFactoryTest.java
@@ -32,6 +32,8 @@ import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
class RecordingMetricFactoryTest implements MetricFactoryContract {
private static final String TIME_METRIC_NAME = "timerMetric";
@@ -118,4 +120,28 @@ class RecordingMetricFactoryTest implements MetricFactoryContract {
assertThat(testee.countFor(METRIC_NAME))
.isEqualTo(5);
}
+
+ @Test
+ void decoratePublisherWithTimerMetricShouldRecordANewValueForEachRetry() {
+ Duration duration = Duration.ofMillis(100);
+ Mono.from(testee.decoratePublisherWithTimerMetric("any", Mono.delay(duration)))
+ .repeat(5)
+ .blockLast();
+
+ assertThat(testee.executionTimesFor("any"))
+ .hasSize(6)
+ .allSatisfy(timing -> assertThat(timing).isLessThan(duration.multipliedBy(2)));
+ }
+
+ @Test
+ void decoratePublisherWithTimerMetricLogP99ShouldRecordANewValueForEachRetry() {
+ Duration duration = Duration.ofMillis(100);
+ Mono.from(testee.decoratePublisherWithTimerMetricLogP99("any", Mono.delay(duration)))
+ .repeat(5)
+ .blockLast();
+
+ assertThat(testee.executionTimesFor("any"))
+ .hasSize(6)
+ .allSatisfy(timing -> assertThat(timing).isLessThan(duration.multipliedBy(2)));
+ }
}
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 48a8cfd..0e998c0 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -38,7 +38,6 @@ import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
import org.reactivestreams.Publisher;
import com.google.common.base.Preconditions;
@@ -277,10 +276,9 @@ public class CachedBlobStore implements BlobStore {
}
private Mono<byte[]> readBytesFromBackend(BucketName bucketName, BlobId blobId) {
- TimeMetric timer = metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME);
-
- return Mono.from(backend.readBytes(bucketName, blobId))
- .doOnSuccess(any -> timer.stopAndPublish())
- .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish());
+ return Mono.fromCallable(() -> metricFactory.timer(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
+ .flatMap(timer -> Mono.from(backend.readBytes(bucketName, blobId))
+ .doOnSuccess(any -> timer.stopAndPublish())
+ .doOnError(ObjectNotFoundException.class, any -> timer.stopAndPublish()));
}
}
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
index 5caff3f..1d2abcd 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java
@@ -446,6 +446,36 @@ public class CachedBlobStoreTest implements BlobStoreContract {
}
@Test
+ void readBytesShouldRecordDistinctTimingsWhenRepeatAndBackendRead() {
+ BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block();
+
+ Duration delay = Duration.ofMillis(500);
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId))
+ .then(Mono.delay(delay))
+ .repeat(2)
+ .blockLast();
+
+ assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME))
+ .hasSize(3)
+ .allSatisfy(timing -> assertThat(timing).isLessThan(delay));
+ }
+
+ @Test
+ void readBytesShouldRecordDistinctTimingsWhenRepeat() {
+ BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+ Duration delay = Duration.ofMillis(500);
+ Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId))
+ .then(Mono.delay(delay))
+ .repeat(2)
+ .blockLast();
+
+ assertThat(metricFactory.executionTimesFor(BLOBSTORE_CACHED_LATENCY_METRIC_NAME))
+ .hasSize(3)
+ .allSatisfy(timing -> assertThat(timing).isLessThan(delay));
+ }
+
+ @Test
void readBlobStoreCacheShouldCountWhenHit() {
BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org