You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/10/08 04:17:06 UTC
[james-project] 02/03: JAMES-2760 Add a configuration parameter for
enabling/disabling the metrics on mail queue size of RabbitMQ
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7bf2df7c270889ae8ae2e0975c7b239957386e5a
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Thu Oct 3 15:35:35 2019 +0700
JAMES-2760 Add a configuration parameter for enabling/disabling the metrics on mail queue size of RabbitMQ
---
.../apache/james/modules/TestRabbitMQModule.java | 7 +
.../james/modules/rabbitmq/RabbitMQModule.java | 7 +
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 10 +-
.../view/RabbitMQMailQueueConfiguration.java | 96 ++++++
.../RabbitMQMailQueueConfigurationChangeTest.java | 10 +-
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 352 ++++++++++++---------
.../rabbitmq/RabbitMqMailQueueFactoryTest.java | 8 +-
.../view/RabbitMQMailQueueConfigurationTest.java | 52 +++
8 files changed, 391 insertions(+), 151 deletions(-)
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java
index e17539b..35438ac 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/modules/TestRabbitMQModule.java
@@ -31,6 +31,7 @@ import org.apache.james.CleanupTasksPerformer;
import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueManagement;
+import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import com.google.inject.AbstractModule;
@@ -69,6 +70,12 @@ public class TestRabbitMQModule extends AbstractModule {
.build();
}
+ @Provides
+ @Singleton
+ private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration() {
+ return RabbitMQMailQueueConfiguration.sizeMetricsEnabled();
+ }
+
public static class QueueCleanUp implements CleanupTasksPerformer.CleanupTask {
private final RabbitMQMailQueueManagement api;
diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
index 09a7bfe..16a168e 100644
--- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java
@@ -34,6 +34,7 @@ import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
+import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.BrowseStartDAO;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
@@ -119,4 +120,10 @@ public class RabbitMQModule extends AbstractModule {
private CassandraMailQueueViewConfiguration getMailQueueViewConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
return CassandraMailQueueViewConfiguration.from(configuration);
}
+
+ @Provides
+ @Singleton
+ private RabbitMQMailQueueConfiguration getMailQueueSizeConfiguration(@Named(RABBITMQ_CONFIGURATION_NAME) Configuration configuration) {
+ return RabbitMQMailQueueConfiguration.from(configuration);
+ }
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index adaf3de..a888da0 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -38,6 +38,7 @@ import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
+import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import com.github.fge.lambdas.Throwing;
@@ -56,6 +57,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
private final MailQueueView.Factory mailQueueViewFactory;
private final Clock clock;
private final MailQueueItemDecoratorFactory decoratorFactory;
+ private final RabbitMQMailQueueConfiguration configuration;
@Inject
@VisibleForTesting PrivateFactory(MetricFactory metricFactory,
@@ -65,7 +67,8 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
BlobId.Factory blobIdFactory,
MailQueueView.Factory mailQueueViewFactory,
Clock clock,
- MailQueueItemDecoratorFactory decoratorFactory) {
+ MailQueueItemDecoratorFactory decoratorFactory,
+ RabbitMQMailQueueConfiguration configuration) {
this.metricFactory = metricFactory;
this.gaugeRegistry = gaugeRegistry;
this.rabbitClient = rabbitClient;
@@ -75,6 +78,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
this.decoratorFactory = decoratorFactory;
this.mailReferenceSerializer = new MailReferenceSerializer();
this.mailLoader = Throwing.function(new MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow();
+ this.configuration = configuration;
}
RabbitMQMailQueue create(MailQueueName mailQueueName) {
@@ -96,7 +100,9 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
}
private void registerGaugeFor(RabbitMQMailQueue rabbitMQMailQueue) {
- this.gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + rabbitMQMailQueue.getName(), rabbitMQMailQueue::getSize);
+ if (configuration.isSizeMetricsEnabled()) {
+ this.gaugeRegistry.register(QUEUE_SIZE_METRIC_NAME_PREFIX + rabbitMQMailQueue.getName(), rabbitMQMailQueue::getSize);
+ }
}
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfiguration.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfiguration.java
new file mode 100644
index 0000000..b05c0bc
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfiguration.java
@@ -0,0 +1,96 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.configuration2.Configuration;
+
+public class RabbitMQMailQueueConfiguration {
+ private static final boolean DEFAULT_SIZE_METRICS_ENABLED = true;
+
+ public static class Builder {
+ private Optional<Boolean> sizeMetricsEnabled;
+
+ public Builder sizeMetricsEnabled(boolean sizeMetricsEnabled) {
+ this.sizeMetricsEnabled = Optional.of(sizeMetricsEnabled);
+ return this;
+ }
+
+ public Builder sizeMetricsEnabled(Optional<Boolean> sizeMetricsEnabled) {
+ this.sizeMetricsEnabled = sizeMetricsEnabled;
+ return this;
+ }
+
+ public RabbitMQMailQueueConfiguration build() {
+ return new RabbitMQMailQueueConfiguration(sizeMetricsEnabled.orElse(DEFAULT_SIZE_METRICS_ENABLED));
+ }
+ }
+
+ public static final String SIZE_METRICS_ENABLED_PROPERTY = "mailqueue.size.metricsEnabled";
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static RabbitMQMailQueueConfiguration from(Configuration configuration) {
+ return builder()
+ .sizeMetricsEnabled(Optional.ofNullable(configuration.getBoolean(SIZE_METRICS_ENABLED_PROPERTY, null)))
+ .build();
+ }
+
+ public static RabbitMQMailQueueConfiguration sizeMetricsEnabled() {
+ return builder()
+ .sizeMetricsEnabled(true)
+ .build();
+ }
+
+ public static RabbitMQMailQueueConfiguration sizeMetricsDisabled() {
+ return builder()
+ .sizeMetricsEnabled(false)
+ .build();
+ }
+
+ private final boolean sizeMetricsEnabled;
+
+ private RabbitMQMailQueueConfiguration(boolean sizeMetricsEnabled) {
+ this.sizeMetricsEnabled = sizeMetricsEnabled;
+ }
+
+ public boolean isSizeMetricsEnabled() {
+ return sizeMetricsEnabled;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof RabbitMQMailQueueConfiguration) {
+ RabbitMQMailQueueConfiguration that = (RabbitMQMailQueueConfiguration) o;
+
+ return Objects.equals(this.sizeMetricsEnabled, that.sizeMetricsEnabled);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(sizeMetricsEnabled);
+ }
+}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
index b17e938..464faf2 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java
@@ -31,11 +31,11 @@ import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStore;
@@ -46,6 +46,7 @@ import org.apache.james.metrics.api.NoopMetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
+import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
@@ -110,6 +111,10 @@ class RabbitMQMailQueueConfigurationChangeTest {
mailQueueViewConfiguration,
mimeMessageStoreFactory);
+ RabbitMQMailQueueConfiguration mailQueueSizeConfiguration = RabbitMQMailQueueConfiguration.builder()
+ .sizeMetricsEnabled(true)
+ .build();
+
RabbitMQMailQueueFactory.PrivateFactory privateFactory = new RabbitMQMailQueueFactory.PrivateFactory(
new NoopMetricFactory(),
new NoopGaugeRegistry(),
@@ -118,7 +123,8 @@ class RabbitMQMailQueueConfigurationChangeTest {
BLOB_ID_FACTORY,
mailQueueViewFactory,
clock,
- new RawMailQueueItemDecoratorFactory());
+ new RawMailQueueItemDecoratorFactory(),
+ mailQueueSizeConfiguration);
RabbitMQMailQueueFactory mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, privateFactory);
return mailQueueFactory.createQueue(SPOOL);
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 703b233..ea738d9 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -23,6 +23,9 @@ import static java.time.temporal.ChronoUnit.HOURS;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import java.time.Duration;
import java.time.Instant;
@@ -30,23 +33,24 @@ import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
-import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.cassandra.CassandraBlobModule;
import org.apache.james.blob.cassandra.CassandraBlobStore;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
+import org.apache.james.metrics.api.Gauge;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueMetricContract;
import org.apache.james.queue.api.MailQueueMetricExtension;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
+import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModule;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory;
@@ -57,15 +61,17 @@ import org.apache.mailet.Mail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.ArgumentCaptor;
import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQueueMetricContract {
+class RabbitMQMailQueueTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
private static final int THREE_BUCKET_COUNT = 3;
private static final int UPDATE_BROWSE_START_PACE = 2;
@@ -92,159 +98,213 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ
private RabbitMQMailQueue mailQueue;
private RabbitMQMailQueueManagement mqManagementApi;
- @Override
- public void enQueue(Mail mail) throws MailQueue.MailQueueException {
- ManageableMailQueueContract.super.enQueue(mail);
- clock.tick();
+ @AfterEach
+ void tearDown() {
+ mqManagementApi.deleteAllQueues();
}
- @BeforeEach
- void setup(DockerRabbitMQ rabbitMQ, CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
- CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
- MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
- clock = new UpdatableTickingClock(IN_SLICE_1);
+ @Nested
+ class MailQueueSizeMetricsEnabled implements ManageableMailQueueContract, MailQueueMetricContract {
+ @BeforeEach
+ void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
+ CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+ MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
+ clock = new UpdatableTickingClock(IN_SLICE_1);
- MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
- CassandraMailQueueViewConfiguration.builder()
+ MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
+ CassandraMailQueueViewConfiguration.builder()
.bucketCount(THREE_BUCKET_COUNT)
.updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
.sliceWindow(ONE_HOUR_SLICE_WINDOW)
.build(),
- mimeMessageStoreFactory);
-
- RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
- RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
- metricTestSystem.getMetricFactory(),
- metricTestSystem.getSpyGaugeRegistry(),
- rabbitClient,
- mimeMessageStoreFactory,
- BLOB_ID_FACTORY,
- mailQueueViewFactory,
- clock,
- new RawMailQueueItemDecoratorFactory());
- mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
- mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
- mailQueue = mailQueueFactory.createQueue(SPOOL);
- }
-
- @AfterEach
- void tearDown() {
- mqManagementApi.deleteAllQueues();
- }
-
- @Override
- public MailQueue getMailQueue() {
- return mailQueue;
+ mimeMessageStoreFactory);
+
+ RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder()
+ .sizeMetricsEnabled(true)
+ .build();
+
+ RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
+ RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
+ metricTestSystem.getMetricFactory(),
+ metricTestSystem.getSpyGaugeRegistry(),
+ rabbitClient,
+ mimeMessageStoreFactory,
+ BLOB_ID_FACTORY,
+ mailQueueViewFactory,
+ clock,
+ new RawMailQueueItemDecoratorFactory(),
+ configuration);
+ mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
+ mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
+ mailQueue = mailQueueFactory.createQueue(SPOOL);
+ }
+
+ @Override
+ public void enQueue(Mail mail) throws MailQueue.MailQueueException {
+ ManageableMailQueueContract.super.enQueue(mail);
+ clock.tick();
+ }
+
+ @Override
+ public MailQueue getMailQueue() {
+ return mailQueue;
+ }
+
+ @Override
+ public ManageableMailQueue getManageableMailQueue() {
+ return mailQueue;
+ }
+
+ @Test
+ void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception {
+ ManageableMailQueue mailQueue = getManageableMailQueue();
+ int emailCount = 5;
+
+ clock.setInstant(IN_SLICE_1);
+ enqueueSomeMails(namePatternForSlice(1), emailCount);
+
+ clock.setInstant(IN_SLICE_2);
+ enqueueSomeMails(namePatternForSlice(2), emailCount);
+
+ clock.setInstant(IN_SLICE_3);
+ enqueueSomeMails(namePatternForSlice(3), emailCount);
+
+ clock.setInstant(IN_SLICE_5);
+ enqueueSomeMails(namePatternForSlice(5), emailCount);
+
+ clock.setInstant(IN_SLICE_7);
+ Stream<String> names = Iterators.toStream(mailQueue.browse())
+ .map(ManageableMailQueue.MailQueueItemView::getMail)
+ .map(Mail::getName);
+
+ assertThat(names).containsExactly(
+ "1-1", "1-2", "1-3", "1-4", "1-5",
+ "2-1", "2-2", "2-3", "2-4", "2-5",
+ "3-1", "3-2", "3-3", "3-4", "3-5",
+ "5-1", "5-2", "5-3", "5-4", "5-5");
+ }
+
+ @Test
+ void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
+ ManageableMailQueue mailQueue = getManageableMailQueue();
+ int emailCount = 5;
+
+ clock.setInstant(IN_SLICE_1);
+ enqueueSomeMails(namePatternForSlice(1), emailCount);
+
+ clock.setInstant(IN_SLICE_2);
+ enqueueSomeMails(namePatternForSlice(2), emailCount);
+
+ clock.setInstant(IN_SLICE_3);
+ enqueueSomeMails(namePatternForSlice(3), emailCount);
+
+ clock.setInstant(IN_SLICE_5);
+ enqueueSomeMails(namePatternForSlice(5), emailCount);
+
+ clock.setInstant(IN_SLICE_7);
+ dequeueMails(5);
+ dequeueMails(5);
+ dequeueMails(3);
+
+ Stream<String> names = Iterators.toStream(mailQueue.browse())
+ .map(ManageableMailQueue.MailQueueItemView::getMail)
+ .map(Mail::getName);
+
+ assertThat(names)
+ .containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5");
+ }
+
+ private Function<Integer, String> namePatternForSlice(int sliceId) {
+ return i -> sliceId + "-" + i;
+ }
+
+ @Test
+ void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) {
+ String name = "myQueue";
+ mailQueueFactory.createQueue(name);
+
+ boolean initialized = CassandraMailQueueViewTestFactory.isInitialized(cassandra.getConf(), MailQueueName.fromString(name));
+ assertThat(initialized).isTrue();
+ }
+
+ @Test
+ void enQueueShouldNotThrowOnMailNameWithNegativeHash() {
+ String negativehashedString = "this sting will have a negative hash"; //hash value: -1256871313
+
+ assertThatCode(() -> getMailQueue().enQueue(defaultMail().name(negativehashedString).build()))
+ .doesNotThrowAnyException();
+ }
+
+ @Disabled("JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable." +
+ "The related test is disabled, and need to be re-enabled after investigation and a fix.")
+ @Test
+ @Override
+ public void concurrentEnqueueDequeueShouldNotFail() {
+
+ }
+
+ private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) {
+ IntStream.rangeClosed(1, emailCount)
+ .forEach(Throwing.intConsumer(i -> enQueue(defaultMail()
+ .name(namePattern.apply(i))
+ .build())));
+ }
+
+ private void dequeueMails(int times) {
+ Flux.from(getManageableMailQueue()
+ .deQueue())
+ .take(times)
+ .flatMap(mailQueueItem -> Mono.fromCallable(() -> {
+ mailQueueItem.done(true);
+ return mailQueueItem;
+ }))
+ .blockLast();
+ }
}
- @Override
- public ManageableMailQueue getManageableMailQueue() {
- return mailQueue;
- }
-
- @Test
- void browseShouldReturnCurrentlyEnqueuedMailFromAllSlices() throws Exception {
- ManageableMailQueue mailQueue = getManageableMailQueue();
- int emailCount = 5;
-
- clock.setInstant(IN_SLICE_1);
- enqueueSomeMails(namePatternForSlice(1), emailCount);
-
- clock.setInstant(IN_SLICE_2);
- enqueueSomeMails(namePatternForSlice(2), emailCount);
-
- clock.setInstant(IN_SLICE_3);
- enqueueSomeMails(namePatternForSlice(3), emailCount);
-
- clock.setInstant(IN_SLICE_5);
- enqueueSomeMails(namePatternForSlice(5), emailCount);
-
- clock.setInstant(IN_SLICE_7);
- Stream<String> names = Iterators.toStream(mailQueue.browse())
- .map(ManageableMailQueue.MailQueueItemView::getMail)
- .map(Mail::getName);
-
- assertThat(names).containsExactly(
- "1-1", "1-2", "1-3", "1-4", "1-5",
- "2-1", "2-2", "2-3", "2-4", "2-5",
- "3-1", "3-2", "3-3", "3-4", "3-5",
- "5-1", "5-2", "5-3", "5-4", "5-5");
- }
-
- @Test
- void browseAndDequeueShouldCombineWellWhenDifferentSlices() throws Exception {
- ManageableMailQueue mailQueue = getManageableMailQueue();
- int emailCount = 5;
-
- clock.setInstant(IN_SLICE_1);
- enqueueSomeMails(namePatternForSlice(1), emailCount);
+ @Nested
+ class MailQueueSizeMetricsDisabled {
+ @RegisterExtension
+ MailQueueMetricExtension mailQueueMetricExtension = new MailQueueMetricExtension();
- clock.setInstant(IN_SLICE_2);
- enqueueSomeMails(namePatternForSlice(2), emailCount);
+ @BeforeEach
+ void setup(CassandraCluster cassandra, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) throws Exception {
+ CassandraBlobStore blobStore = new CassandraBlobStore(cassandra.getConf());
+ MimeMessageStore.Factory mimeMessageStoreFactory = MimeMessageStore.factory(blobStore);
+ clock = new UpdatableTickingClock(IN_SLICE_1);
- clock.setInstant(IN_SLICE_3);
- enqueueSomeMails(namePatternForSlice(3), emailCount);
-
- clock.setInstant(IN_SLICE_5);
- enqueueSomeMails(namePatternForSlice(5), emailCount);
-
- clock.setInstant(IN_SLICE_7);
- dequeueMails(5);
- dequeueMails(5);
- dequeueMails(3);
-
- Stream<String> names = Iterators.toStream(mailQueue.browse())
- .map(ManageableMailQueue.MailQueueItemView::getMail)
- .map(Mail::getName);
-
- assertThat(names)
- .containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5");
- }
-
- private Function<Integer, String> namePatternForSlice(int sliceId) {
- return i -> sliceId + "-" + i;
- }
-
- @Test
- void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) {
- String name = "myQueue";
- mailQueueFactory.createQueue(name);
-
- boolean initialized = CassandraMailQueueViewTestFactory.isInitialized(cassandra.getConf(), MailQueueName.fromString(name));
- assertThat(initialized).isTrue();
- }
-
- @Test
- void enQueueShouldNotThrowOnMailNameWithNegativeHash() {
- String negativehashedString = "this sting will have a negative hash"; //hash value: -1256871313
-
- assertThatCode(() -> getMailQueue().enQueue(defaultMail().name(negativehashedString).build()))
- .doesNotThrowAnyException();
- }
-
- @Disabled("JAMES-2614 RabbitMQMailQueueTest::concurrentEnqueueDequeueShouldNotFail is unstable." +
- "The related test is disabled, and need to be re-enabled after investigation and a fix.")
- @Test
- @Override
- public void concurrentEnqueueDequeueShouldNotFail() {
-
- }
-
- private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) {
- IntStream.rangeClosed(1, emailCount)
- .forEach(Throwing.intConsumer(i -> enQueue(defaultMail()
- .name(namePattern.apply(i))
- .build())));
- }
-
- private void dequeueMails(int times) {
- Flux.from(getManageableMailQueue()
- .deQueue())
- .take(times)
- .flatMap(mailQueueItem -> Mono.fromCallable(() -> {
- mailQueueItem.done(true);
- return mailQueueItem;
- }))
- .blockLast();
+ MailQueueView.Factory mailQueueViewFactory = CassandraMailQueueViewTestFactory.factory(clock, cassandra.getConf(),
+ CassandraMailQueueViewConfiguration.builder()
+ .bucketCount(THREE_BUCKET_COUNT)
+ .updateBrowseStartPace(UPDATE_BROWSE_START_PACE)
+ .sliceWindow(ONE_HOUR_SLICE_WINDOW)
+ .build(),
+ mimeMessageStoreFactory);
+
+ RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder()
+ .sizeMetricsEnabled(false)
+ .build();
+
+ RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
+ RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
+ metricTestSystem.getMetricFactory(),
+ metricTestSystem.getSpyGaugeRegistry(),
+ rabbitClient,
+ mimeMessageStoreFactory,
+ BLOB_ID_FACTORY,
+ mailQueueViewFactory,
+ clock,
+ new RawMailQueueItemDecoratorFactory(),
+ configuration);
+ mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
+ mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
+ mailQueue = mailQueueFactory.createQueue(SPOOL);
+ }
+
+ @Test
+ void constructorShouldNotRegisterGetQueueSizeGaugeWhenSizeMetricsDisabled(MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
+ ArgumentCaptor<Gauge<?>> gaugeCaptor = ArgumentCaptor.forClass(Gauge.class);
+ verify(metricTestSystem.getSpyGaugeRegistry(), never()).register(any(), gaugeCaptor.capture());
+ }
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index 6280d41..625f988 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -37,6 +37,7 @@ import org.apache.james.metrics.api.NoopMetricFactory;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueFactoryContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
+import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.AfterEach;
@@ -61,6 +62,10 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
when(mailQueueViewFactory.create(any()))
.thenReturn(mailQueueView);
+ RabbitMQMailQueueConfiguration configuration = RabbitMQMailQueueConfiguration.builder()
+ .sizeMetricsEnabled(true)
+ .build();
+
RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool());
RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory(
new NoopMetricFactory(),
@@ -70,7 +75,8 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
BLOB_ID_FACTORY,
mailQueueViewFactory,
Clock.systemUTC(),
- new RawMailQueueItemDecoratorFactory());
+ new RawMailQueueItemDecoratorFactory(),
+ configuration);
mqManagementApi = new RabbitMQMailQueueManagement(rabbitMQExtension.managementAPI());
mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory);
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfigurationTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfigurationTest.java
new file mode 100644
index 0000000..fefaff8
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/RabbitMQMailQueueConfigurationTest.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.junit.jupiter.api.Test;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+class RabbitMQMailQueueConfigurationTest {
+ @Test
+ void shouldMatchBeanContract() {
+ EqualsVerifier.forClass(RabbitMQMailQueueConfiguration.class).verify();
+ }
+
+ @Test
+ void fromShouldReturnDefaultForEmptyConfiguration() {
+ RabbitMQMailQueueConfiguration actual = RabbitMQMailQueueConfiguration.from(new PropertiesConfiguration());
+
+ assertThat(actual)
+ .isEqualTo(RabbitMQMailQueueConfiguration.sizeMetricsEnabled());
+ }
+
+ @Test
+ void fromShouldReturnConfiguredSizeMetricsEnabled() {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty(RabbitMQMailQueueConfiguration.SIZE_METRICS_ENABLED_PROPERTY, false);
+ RabbitMQMailQueueConfiguration actual = RabbitMQMailQueueConfiguration.from(configuration);
+
+ assertThat(actual.isSizeMetricsEnabled())
+ .isEqualTo(false);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org