You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/12/01 05:08:58 UTC
[james-project] 05/08: [JAMES-3841] ActiveMQ: allow configuring metric collection (i.e. conf/activemq.properties)
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ba2a05d016cfac4a7154bd0b11830896befc42c0
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Tue Nov 29 12:53:00 2022 +0100
[JAMES-3841] ActiveMQ: allow configuring metric collection (i.e. conf/activemq.properties)
---
.../queue/activemq/ActiveMQQueueModule.java | 23 ++++++
.../queue/activemq/ActiveMQConfiguration.java | 26 ++++++
.../metric/ActiveMQMetricCollectorImpl.java | 31 ++++---
.../metric/ActiveMQMetricConfiguration.java | 94 ++++++++++++++++++++++
.../metric/ActiveMQMetricCollectorTest.java | 14 ++--
.../metric/ActiveMQMetricConfigurationTest.java | 72 +++++++++++++++++
6 files changed, 241 insertions(+), 19 deletions(-)
diff --git a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
index da56c98306..2113c3cf47 100644
--- a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
+++ b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
@@ -19,11 +19,16 @@
package org.apache.james.modules.queue.activemq;
+import java.io.FileNotFoundException;
+
import javax.jms.ConnectionFactory;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.queue.activemq.ActiveMQConfiguration;
import org.apache.james.queue.activemq.ActiveMQHealthCheck;
import org.apache.james.queue.activemq.ActiveMQMailQueueFactory;
import org.apache.james.queue.activemq.EmbeddedActiveMQ;
@@ -34,6 +39,9 @@ import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
+import org.apache.james.utils.PropertiesProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@@ -44,6 +52,9 @@ import com.google.inject.multibindings.ProvidesIntoSet;
public class ActiveMQQueueModule extends AbstractModule {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQQueueModule.class);
+ private static final String FILENAME = "activemq";
+
@Override
protected void configure() {
bind(PersistenceAdapter.class).to(KahaDBPersistenceAdapter.class);
@@ -82,6 +93,18 @@ public class ActiveMQQueueModule extends AbstractModule {
return queueFactory;
}
+ @Singleton
+ @Provides
+ ActiveMQConfiguration activeMQConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
+ try {
+ Configuration configuration = propertiesProvider.getConfigurations(FILENAME);
+ return ActiveMQConfiguration.from(configuration);
+ } catch (FileNotFoundException e) {
+ LOGGER.warn("Could not find {} configuration file, using default configuration", FILENAME);
+ return ActiveMQConfiguration.getDefault();
+ }
+ }
+
@ProvidesIntoSet
InitializationOperation configureMetricCollector(ActiveMQMetricCollector metricCollector) {
return InitilizationOperationBuilder
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java
new file mode 100644
index 0000000000..f1b47780eb
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java
@@ -0,0 +1,26 @@
+package org.apache.james.queue.activemq;
+
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.james.queue.activemq.metric.ActiveMQMetricConfiguration;
+
+public class ActiveMQConfiguration {
+
+ private final ActiveMQMetricConfiguration metricConfiguration;
+
+ public static ActiveMQConfiguration getDefault() {
+ return from(new BaseConfiguration());
+ }
+
+ public static ActiveMQConfiguration from(Configuration configuration) {
+ return new ActiveMQConfiguration(ActiveMQMetricConfiguration.from(configuration));
+ }
+
+ private ActiveMQConfiguration(ActiveMQMetricConfiguration metricConfiguration) {
+ this.metricConfiguration = metricConfiguration;
+ }
+
+ public ActiveMQMetricConfiguration getMetricConfiguration() {
+ return metricConfiguration;
+ }
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
index d4ed4856b0..fc002990c9 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
@@ -19,7 +19,6 @@
package org.apache.james.queue.activemq.metric;
-import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -38,6 +37,7 @@ import javax.jms.TemporaryQueue;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.queue.activemq.ActiveMQConfiguration;
import org.apache.james.queue.api.MailQueueName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,11 +53,7 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQMetricCollectorImpl.class);
- public static final Duration REFRESH_DELAY = Duration.ofSeconds(2);
- public static final Duration REFRESH_INTERVAL = Duration.ofSeconds(5);
- public static final Duration RECEIVE_TIMEOUT = Duration.ofSeconds(1);
- public static final Duration REFRESH_TIMEOUT = RECEIVE_TIMEOUT.multipliedBy(2);
-
+ private final ActiveMQMetricConfiguration config;
private final ConnectionFactory connectionFactory;
private final MetricFactory metricFactory;
private final GaugeRegistry gaugeRegistry;
@@ -67,7 +63,8 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
private Disposable disposable;
@Inject
- public ActiveMQMetricCollectorImpl(ConnectionFactory connectionFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
+ public ActiveMQMetricCollectorImpl(ActiveMQConfiguration activeMQConfiguration, ConnectionFactory connectionFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
+ this.config = activeMQConfiguration.getMetricConfiguration();
this.connectionFactory = connectionFactory;
this.metricFactory = metricFactory;
this.gaugeRegistry = gaugeRegistry;
@@ -84,7 +81,7 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
}
private void collectStatistics(ActiveMQMetrics statistics) {
- if (!registeredStatistics.containsKey(statistics.getName())) {
+ if (config.isEnabled() && !registeredStatistics.containsKey(statistics.getName())) {
LOGGER.info("collecting statistics for {}", statistics.getName());
registeredStatistics.put(statistics.getName(), statistics);
}
@@ -92,13 +89,20 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
@Override
public void start() {
+ if (!config.isEnabled()) {
+ LOGGER.info("collecting statistics disabled");
+ return;
+ }
+
collectBrokerStatistics();
- LOGGER.info("start delay={} interval={}", REFRESH_DELAY, REFRESH_INTERVAL);
- disposable = Flux.interval(REFRESH_DELAY, REFRESH_INTERVAL)
+ LOGGER.info("start delay={} interval={} timeout={} aqmp_timeout={}",
+ config.getStartDelay(), config.getInterval(), config.getTimeout(), config.getAqmpTimeout());
+
+ disposable = Flux.interval(config.getStartDelay(), config.getInterval())
.flatMap(any -> Flux.fromStream(() -> registeredStatistics.values().stream())
.flatMap((s) -> {
- Mono<Void> task = Mono.fromCallable(() -> fetchAndUpdate(s)).timeout(REFRESH_TIMEOUT);
+ Mono<Void> task = Mono.fromCallable(() -> fetchAndUpdate(s)).timeout(config.getTimeout());
return metricFactory.decoratePublisherWithTimerMetric(s.getName() + "._time", task);
})
)
@@ -139,9 +143,10 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
msg.setJMSReplyTo(replyTo);
producer.send(msg);
- Message reply = consumer.receive(RECEIVE_TIMEOUT.toMillis());
+ long timeoutMs = config.getAqmpTimeout().toMillis();
+ Message reply = consumer.receive(timeoutMs);
if (reply == null) {
- throw new JMSException("no message received, timed out after " + RECEIVE_TIMEOUT);
+ throw new JMSException("no message received, timed out after " + timeoutMs + " ms");
} else if (!(reply instanceof MapMessage)) {
throw new JMSException("expected MapMessage but got " + reply.getClass());
}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
new file mode 100644
index 0000000000..60381cc4b9
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
@@ -0,0 +1,94 @@
+package org.apache.james.queue.activemq.metric;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.james.util.DurationParser;
+
+import com.google.common.base.Preconditions;
+
+public class ActiveMQMetricConfiguration {
+ private static final String ENABLED = "metrics.enabled";
+ private static final String START_DELAY = "metrics.start_delay";
+ private static final String INTERVAL = "metrics.interval";
+ private static final String TIMEOUT = "metrics.timeout";
+ private static final String AQMP_TIMEOUT = "metrics.aqmp_timeout";
+
+ private static final Duration MINIMAL_START_DELAY = Duration.ofSeconds(1);
+ private static final Duration MINIMAL_INTERVAL = Duration.ofSeconds(5);
+ private static final Duration MINIMAL_TIMEOUT = Duration.ofSeconds(2);
+ private static final Duration MINIMAL_AQMP_TIMEOUT = Duration.ofSeconds(1);
+
+ private final boolean enabled;
+ private final Duration startDelay;
+ private final Duration interval;
+ private final Duration timeout;
+ private final Duration aqmpTimeout;
+
+ public static ActiveMQMetricConfiguration from(Configuration configuration) {
+ return new ActiveMQMetricConfiguration(
+ configuration.getBoolean(ENABLED, true),
+ getDurationFromConfiguration(configuration, START_DELAY).orElse(MINIMAL_START_DELAY),
+ getDurationFromConfiguration(configuration, INTERVAL).orElse(MINIMAL_INTERVAL),
+ getDurationFromConfiguration(configuration, TIMEOUT).orElse(MINIMAL_TIMEOUT),
+ getDurationFromConfiguration(configuration, AQMP_TIMEOUT).orElse(MINIMAL_AQMP_TIMEOUT)
+ );
+ }
+
+ public ActiveMQMetricConfiguration(boolean enabled, Duration startDelay, Duration interval,
+ Duration timeout, Duration aqmpTimeout) {
+ this.enabled = enabled;
+ this.startDelay = startDelay;
+ this.interval = interval;
+ this.timeout = timeout;
+ this.aqmpTimeout = aqmpTimeout;
+ checkConfiguration();
+ }
+
+ private void checkConfiguration() {
+ Preconditions.checkArgument(startDelay.compareTo(MINIMAL_START_DELAY) >= 0,
+ "'%s' must be equal or greater than %d ms",
+ START_DELAY, MINIMAL_START_DELAY.toMillis());
+ Preconditions.checkArgument(interval.compareTo(MINIMAL_INTERVAL) >= 0,
+ "'%s' must be equal or greater than %d ms",
+ INTERVAL, MINIMAL_INTERVAL.toMillis());
+ Preconditions.checkArgument(timeout.compareTo(MINIMAL_TIMEOUT) >= 0,
+ "'%s' must be equal or greater than %d ms",
+ TIMEOUT, MINIMAL_TIMEOUT.toMillis());
+ Preconditions.checkArgument(aqmpTimeout.compareTo(MINIMAL_AQMP_TIMEOUT) >= 0,
+ "'%s' must be equal or greater than %d ms",
+ AQMP_TIMEOUT, MINIMAL_AQMP_TIMEOUT.toMillis());
+
+ Preconditions.checkArgument(interval.compareTo(timeout) > 0,
+ "'%s' must be less than '%s'", TIMEOUT, INTERVAL);
+ Preconditions.checkArgument(timeout.compareTo(aqmpTimeout) > 0,
+ "'%s' must be less than '%s'", AQMP_TIMEOUT, TIMEOUT);
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public Duration getStartDelay() {
+ return startDelay;
+ }
+
+ public Duration getInterval() {
+ return interval;
+ }
+
+ public Duration getTimeout() {
+ return timeout;
+ }
+
+ public Duration getAqmpTimeout() {
+ return aqmpTimeout;
+ }
+
+ private static Optional<Duration> getDurationFromConfiguration(Configuration configuration, String key) {
+ return StringUtils.isEmpty(configuration.getString(key))
+ ? Optional.empty() : Optional.of(DurationParser.parse(configuration.getString(key)));
+ }
+}
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
index d1d02dea2e..5b3d9d3312 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.activemq.metric;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static org.assertj.core.api.Assertions.assertThat;
@@ -37,6 +36,7 @@ import org.apache.james.metrics.api.Gauge;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.apache.james.queue.activemq.ActiveMQConfiguration;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.queue.jms.BrokerExtension;
import org.junit.jupiter.api.BeforeAll;
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
class ActiveMQMetricCollectorTest {
private static ActiveMQConnectionFactory connectionFactory;
+ private static final ActiveMQConfiguration EMPTY_CONFIGURATION = ActiveMQConfiguration.getDefault();
@BeforeAll
static void setup(BrokerService broker) {
@@ -61,7 +62,7 @@ class ActiveMQMetricCollectorTest {
@Test
void shouldFailToFetchAndUpdateStatisticsForUnknownQueue() {
SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
- ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+ ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
ActiveMQMetrics queueStatistics = ActiveMQMetrics.forQueue("UNKNOWN", gaugeRegistry);
assertThatThrownBy(() -> testee.fetchAndUpdate(queueStatistics))
@@ -73,7 +74,7 @@ class ActiveMQMetricCollectorTest {
@Test
void shouldFetchAndUpdateBrokerStatistics() throws Exception {
SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
- ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+ ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
ActiveMQMetrics brokerStatistics = ActiveMQMetrics.forBroker(gaugeRegistry);
long notBefore = System.currentTimeMillis();
@@ -86,7 +87,7 @@ class ActiveMQMetricCollectorTest {
@Test
void shouldFetchAndUpdateBrokerStatisticsInGaugeRegistry() throws Exception {
SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
- ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+ ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
ActiveMQMetrics brokerStatistics = ActiveMQMetrics.forBroker(gaugeRegistry);
testee.fetchAndUpdate(brokerStatistics);
@@ -100,12 +101,13 @@ class ActiveMQMetricCollectorTest {
void hasExecutionTimeMetrics() {
RecordingMetricFactory metricFactory = new RecordingMetricFactory();
NoopGaugeRegistry gaugeRegistry = new NoopGaugeRegistry();
- ActiveMQMetricCollector testee = new ActiveMQMetricCollectorImpl(connectionFactory, metricFactory, gaugeRegistry);
+ ActiveMQMetricCollector testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, metricFactory, gaugeRegistry);
testee.start();
testee.collectBrokerStatistics();
testee.collectQueueStatistics(MailQueueName.of("UNKNOWN"));
- Integer executionTimeCount = Flux.interval(ActiveMQMetricCollectorImpl.REFRESH_DELAY, Duration.ofSeconds(1))
+ Duration startDelay = EMPTY_CONFIGURATION.getMetricConfiguration().getStartDelay();
+ Integer executionTimeCount = Flux.interval(startDelay, Duration.ofSeconds(1))
.take(3,true)
.flatMap(n -> Mono.fromCallable(() -> metricFactory.executionTimesForPrefixName("ActiveMQ.").size()))
.blockLast();
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java
new file mode 100644
index 0000000000..7c3e31448d
--- /dev/null
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java
@@ -0,0 +1,72 @@
+package org.apache.james.queue.activemq.metric;
+
+import java.time.Duration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.junit.jupiter.api.Test;
+
+public class ActiveMQMetricConfigurationTest {
+
+ @Test
+ void shouldUseDefaultForEmptyConfiguration() {
+ assertThat(ActiveMQMetricConfiguration.from(new BaseConfiguration()))
+ .isNotNull();
+ }
+
+ @Test
+ void shouldNotFailForValidConfiguration() {
+ assertThat(getSampleConfig(1,10,4,3))
+ .isNotNull();
+ }
+
+ @Test
+ void shouldThrowWhenStartDelayIsLessThanMinimal() {
+ assertThatThrownBy(() -> getSampleConfig(0,10,3,3))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowWhenIntervalIsLessThanMinimal() {
+ assertThatThrownBy(() -> getSampleConfig(1,1,3,3))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowWhenTimeoutIsLessThanMinimal() {
+ assertThatThrownBy(() -> getSampleConfig(1,10,1,3))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowWhenAqmpTimeoutIsLessThanMinimal() {
+ assertThatThrownBy(() -> getSampleConfig(1,10,3,0))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowWhenIntervalIsLessThanTimeout() {
+ assertThatThrownBy(() -> getSampleConfig(1,5,10,2))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowWhenTimeoutIsLessThanAqmpTimeout() {
+ assertThatThrownBy(() -> getSampleConfig(1,10,3,5))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void shouldThrowWhenIntervalIsLessThanAqmpTimeout() {
+ assertThatThrownBy(() -> getSampleConfig(1,5,10,9))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ private ActiveMQMetricConfiguration getSampleConfig(int startDelaySec, int intervalSec, int timeoutSec, int aqmpTimeoutSec) {
+ return new ActiveMQMetricConfiguration(true,
+ Duration.ofSeconds(startDelaySec), Duration.ofSeconds(intervalSec),
+ Duration.ofSeconds(timeoutSec), Duration.ofSeconds(aqmpTimeoutSec));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org