You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/26 01:33:15 UTC
[james-project] 03/05: JAMES-3810 Propose a health check for CassandraMailQueueView browse start update process
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 75f68a20f3df5dad5c942407cf1493a21f6b7913
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Aug 23 18:04:09 2022 +0700
JAMES-3810 Propose a health check for CassandraMailQueueView browse start update process
---
.../rabbitmq/CassandraMailQueueViewModule.java | 5 ++
.../rabbitmq/view/cassandra/BrowseStartDAO.java | 12 +++
.../view/cassandra/BrowseStartHealthCheck.java | 69 +++++++++++++++
.../view/cassandra/BrowseStartDAOTest.java | 2 +-
.../view/cassandra/BrowseStartHealthCheckTest.java | 97 ++++++++++++++++++++++
5 files changed, 184 insertions(+), 1 deletion(-)
diff --git a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java
index 986110dc55..625e96e1b5 100644
--- a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java
+++ b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java
@@ -23,12 +23,14 @@ import javax.inject.Singleton;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.lifecycle.api.StartUpCheck;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.BrowseStartDAO;
+import org.apache.james.queue.rabbitmq.view.cassandra.BrowseStartHealthCheck;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueMailDelete;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueMailStore;
@@ -72,6 +74,9 @@ public class CassandraMailQueueViewModule extends AbstractModule {
Multibinder.newSetBinder(binder(), BlobReferenceSource.class)
.addBinding().to(MailQueueViewBlobReferenceSource.class);
+
+ Multibinder.newSetBinder(binder(), HealthCheck.class)
+ .addBinding().to(BrowseStartHealthCheck.class);
}
@Provides
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
index 01c0d811ca..89bb23deb6 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
@@ -31,6 +31,7 @@ import java.time.Instant;
import javax.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.queue.rabbitmq.MailQueueName;
@@ -39,12 +40,14 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class BrowseStartDAO {
private final CassandraAsyncExecutor executor;
private final PreparedStatement selectOne;
+ private final PreparedStatement selectAll;
private final PreparedStatement insertOne;
private final PreparedStatement updateOne;
@@ -57,6 +60,10 @@ public class BrowseStartDAO {
.whereColumn(QUEUE_NAME).isEqualTo(bindMarker(QUEUE_NAME))
.build());
+ this.selectAll = session.prepare(selectFrom(TABLE_NAME)
+ .all()
+ .build());
+
this.updateOne = session.prepare(update(TABLE_NAME)
.setColumn(BROWSE_START, bindMarker(BROWSE_START))
.whereColumn(QUEUE_NAME).isEqualTo(bindMarker(QUEUE_NAME))
@@ -86,6 +93,11 @@ public class BrowseStartDAO {
.setString(QUEUE_NAME, mailQueueName.asString()));
}
+ Flux<Pair<MailQueueName, Instant>> listAll() {
+ return executor.executeRows(selectAll.bind())
+ .map(row -> Pair.of(MailQueueName.fromString(row.getString(QUEUE_NAME)), row.getInstant(BROWSE_START)));
+ }
+
@VisibleForTesting
Mono<Row> selectOne(MailQueueName queueName) {
return executor.executeSingleRow(
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheck.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheck.java
new file mode 100644
index 0000000000..5920733305
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheck.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+import org.reactivestreams.Publisher;
+
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Mono;
+
+public class BrowseStartHealthCheck implements HealthCheck {
+ private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQMailQueue BrowseStart");
+ private static final Duration GRACE_PERIOD = Duration.ofDays(7);
+
+ private final BrowseStartDAO browseStartDAO;
+ private final Clock clock;
+
+ @Inject
+ public BrowseStartHealthCheck(BrowseStartDAO browseStartDAO, Clock clock) {
+ this.browseStartDAO = browseStartDAO;
+ this.clock = clock;
+ }
+
+ @Override
+ public ComponentName componentName() {
+ return COMPONENT_NAME;
+ }
+
+ @Override
+ public Publisher<Result> check() {
+ return browseStartDAO.listAll()
+ .filter(pair -> pair.getValue().isBefore(clock.instant().minus(GRACE_PERIOD)))
+ .map(Pair::getKey)
+ .collect(ImmutableList.toImmutableList())
+ .filter(list -> !list.isEmpty())
+ .map(tooOldBrowseStart -> Result.degraded(COMPONENT_NAME, String.format("The following mail queues %s have out of date browse starts (older than 7 days)" +
+ " which can cause performance issues. We recommend auditing the mail queue content, and resuming the delivery of oldest items, which would " +
+ "allow browse start updates to take place correctly again.", tooOldBrowseStart.toString())))
+ .switchIfEmpty(Mono.just(Result.healthy(COMPONENT_NAME)))
+ .onErrorResume(e -> Mono.just(
+ Result.unhealthy(COMPONENT_NAME, "Error while checking browse start", e)));
+ }
+}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
index 7b1bc73947..53e7b7c33e 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
@@ -44,7 +44,7 @@ class BrowseStartDAOTest {
@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
- CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE,CassandraMailQueueViewModule.MODULE));
+ CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE, CassandraMailQueueViewModule.MODULE));
private BrowseStartDAO testee;
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheckTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheckTest.java
new file mode 100644
index 0000000000..eb7b8266fa
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheckTest.java
@@ -0,0 +1,97 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.core.healthcheck.ResultStatus;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Mono;
+
+class BrowseStartHealthCheckTest {
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+ CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE,CassandraMailQueueViewModule.MODULE));
+
+ private BrowseStartHealthCheck testee;
+ private BrowseStartDAO browseStartDAO;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ browseStartDAO = new BrowseStartDAO(cassandra.getConf());
+ testee = new BrowseStartHealthCheck(browseStartDAO, Clock.systemUTC());
+ }
+
+ @Test
+ void checkShouldReturnHealthyWhenEmpty() {
+ assertThat(Mono.from(testee.check()).block().getStatus())
+ .isEqualTo(ResultStatus.HEALTHY);
+ }
+
+ @Test
+ void checkShouldReturnHealthyWhenSingleValue() {
+ browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+ Clock.systemUTC().instant().minus(Duration.ofDays(6))).block();
+
+ assertThat(Mono.from(testee.check()).block().getStatus())
+ .isEqualTo(ResultStatus.HEALTHY);
+ }
+
+ @Test
+ void checkShouldReturnHealthyWhenSingleFutureValue() {
+ browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+ Clock.systemUTC().instant().plus(Duration.ofDays(6))).block();
+
+ assertThat(Mono.from(testee.check()).block().getStatus())
+ .isEqualTo(ResultStatus.HEALTHY);
+ }
+
+ @Test
+ void checkShouldReturnDegradedWhenSingleOldValue() {
+ browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+ Clock.systemUTC().instant().minus(Duration.ofDays(8))).block();
+
+ assertThat(Mono.from(testee.check()).block().getStatus())
+ .isEqualTo(ResultStatus.DEGRADED);
+ }
+
+ @Test
+ void checkShouldReturnDegradedWhenMixed() {
+ browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+ Clock.systemUTC().instant().minus(Duration.ofDays(8))).block();
+ browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+ Clock.systemUTC().instant().minus(Duration.ofDays(6))).block();
+
+ assertThat(Mono.from(testee.check()).block().getStatus())
+ .isEqualTo(ResultStatus.DEGRADED);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org