You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/26 01:33:15 UTC

[james-project] 03/05: JAMES-3810 Propose a health check for CassandraMailQueueView browse start update process

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 75f68a20f3df5dad5c942407cf1493a21f6b7913
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Aug 23 18:04:09 2022 +0700

    JAMES-3810 Propose a health check for CassandraMailQueueView browse start update process
---
 .../rabbitmq/CassandraMailQueueViewModule.java     |  5 ++
 .../rabbitmq/view/cassandra/BrowseStartDAO.java    | 12 +++
 .../view/cassandra/BrowseStartHealthCheck.java     | 69 +++++++++++++++
 .../view/cassandra/BrowseStartDAOTest.java         |  2 +-
 .../view/cassandra/BrowseStartHealthCheckTest.java | 97 ++++++++++++++++++++++
 5 files changed, 184 insertions(+), 1 deletion(-)

diff --git a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java
index 986110dc55..625e96e1b5 100644
--- a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java
+++ b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/CassandraMailQueueViewModule.java
@@ -23,12 +23,14 @@ import javax.inject.Singleton;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.blob.api.BlobReferenceSource;
+import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.lifecycle.api.StartUpCheck;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.james.queue.rabbitmq.view.cassandra.BrowseStartDAO;
+import org.apache.james.queue.rabbitmq.view.cassandra.BrowseStartHealthCheck;
 import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueMailDelete;
 import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueMailStore;
@@ -72,6 +74,9 @@ public class CassandraMailQueueViewModule extends AbstractModule {
 
         Multibinder.newSetBinder(binder(), BlobReferenceSource.class)
             .addBinding().to(MailQueueViewBlobReferenceSource.class);
+
+        Multibinder.newSetBinder(binder(), HealthCheck.class)
+            .addBinding().to(BrowseStartHealthCheck.class);
     }
 
     @Provides
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
index 01c0d811ca..89bb23deb6 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAO.java
@@ -31,6 +31,7 @@ import java.time.Instant;
 
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 
@@ -39,12 +40,14 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement;
 import com.datastax.oss.driver.api.core.cql.Row;
 import com.google.common.annotations.VisibleForTesting;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class BrowseStartDAO {
 
     private final CassandraAsyncExecutor executor;
     private final PreparedStatement selectOne;
+    private final PreparedStatement selectAll;
     private final PreparedStatement insertOne;
     private final PreparedStatement updateOne;
 
@@ -57,6 +60,10 @@ public class BrowseStartDAO {
             .whereColumn(QUEUE_NAME).isEqualTo(bindMarker(QUEUE_NAME))
             .build());
 
+        this.selectAll = session.prepare(selectFrom(TABLE_NAME)
+            .all()
+            .build());
+
         this.updateOne = session.prepare(update(TABLE_NAME)
             .setColumn(BROWSE_START, bindMarker(BROWSE_START))
             .whereColumn(QUEUE_NAME).isEqualTo(bindMarker(QUEUE_NAME))
@@ -86,6 +93,11 @@ public class BrowseStartDAO {
             .setString(QUEUE_NAME, mailQueueName.asString()));
     }
 
+    Flux<Pair<MailQueueName, Instant>> listAll() {
+        return executor.executeRows(selectAll.bind())
+            .map(row -> Pair.of(MailQueueName.fromString(row.getString(QUEUE_NAME)), row.getInstant(BROWSE_START)));
+    }
+
     @VisibleForTesting
     Mono<Row> selectOne(MailQueueName queueName) {
         return executor.executeSingleRow(
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheck.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheck.java
new file mode 100644
index 0000000000..5920733305
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheck.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+import org.reactivestreams.Publisher;
+
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Mono;
+
+public class BrowseStartHealthCheck implements HealthCheck {
+    private static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQMailQueue BrowseStart");
+    private static final Duration GRACE_PERIOD = Duration.ofDays(7);
+
+    private final BrowseStartDAO browseStartDAO;
+    private final Clock clock;
+
+    @Inject
+    public BrowseStartHealthCheck(BrowseStartDAO browseStartDAO, Clock clock) {
+        this.browseStartDAO = browseStartDAO;
+        this.clock = clock;
+    }
+
+    @Override
+    public ComponentName componentName() {
+        return COMPONENT_NAME;
+    }
+
+    @Override
+    public Publisher<Result> check() {
+        return browseStartDAO.listAll()
+            .filter(pair -> pair.getValue().isBefore(clock.instant().minus(GRACE_PERIOD)))
+            .map(Pair::getKey)
+            .collect(ImmutableList.toImmutableList())
+            .filter(list -> !list.isEmpty())
+            .map(tooOldBrowseStart -> Result.degraded(COMPONENT_NAME, String.format("The following mail queues %s have out of date browse starts (older than 7 days)" +
+                " which can cause performance issues. We recommend auditing the mail queue content, and resuming the delivery of oldest items, which would " +
+                "allow browse start updates to take place correctly again.", tooOldBrowseStart.toString())))
+            .switchIfEmpty(Mono.just(Result.healthy(COMPONENT_NAME)))
+            .onErrorResume(e -> Mono.just(
+                Result.unhealthy(COMPONENT_NAME, "Error while checking browse start", e)));
+    }
+}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
index 7b1bc73947..53e7b7c33e 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartDAOTest.java
@@ -44,7 +44,7 @@ class BrowseStartDAOTest {
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
-            CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE,CassandraMailQueueViewModule.MODULE));
+            CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE, CassandraMailQueueViewModule.MODULE));
 
     private BrowseStartDAO testee;
 
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheckTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheckTest.java
new file mode 100644
index 0000000000..eb7b8266fa
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/BrowseStartHealthCheckTest.java
@@ -0,0 +1,97 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq.view.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.core.healthcheck.ResultStatus;
+import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Mono;
+
+class BrowseStartHealthCheckTest {
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+        CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE,CassandraMailQueueViewModule.MODULE));
+
+    private BrowseStartHealthCheck testee;
+    private BrowseStartDAO browseStartDAO;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        browseStartDAO = new BrowseStartDAO(cassandra.getConf());
+        testee = new BrowseStartHealthCheck(browseStartDAO, Clock.systemUTC());
+    }
+
+    @Test
+    void checkShouldReturnHealthyWhenEmpty() {
+        assertThat(Mono.from(testee.check()).block().getStatus())
+            .isEqualTo(ResultStatus.HEALTHY);
+    }
+
+    @Test
+    void checkShouldReturnHealthyWhenSingleValue() {
+        browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+            Clock.systemUTC().instant().minus(Duration.ofDays(6))).block();
+
+        assertThat(Mono.from(testee.check()).block().getStatus())
+            .isEqualTo(ResultStatus.HEALTHY);
+    }
+
+    @Test
+    void checkShouldReturnHealthyWhenSingleFutureValue() {
+        browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+            Clock.systemUTC().instant().plus(Duration.ofDays(6))).block();
+
+        assertThat(Mono.from(testee.check()).block().getStatus())
+            .isEqualTo(ResultStatus.HEALTHY);
+    }
+
+    @Test
+    void checkShouldReturnDegradedWhenSingleOldValue() {
+        browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+            Clock.systemUTC().instant().minus(Duration.ofDays(8))).block();
+
+        assertThat(Mono.from(testee.check()).block().getStatus())
+            .isEqualTo(ResultStatus.DEGRADED);
+    }
+
+    @Test
+    void checkShouldReturnDegradedWhenMixed() {
+        browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+            Clock.systemUTC().instant().minus(Duration.ofDays(8))).block();
+        browseStartDAO.insertInitialBrowseStart(MailQueueName.fromString("abc"),
+            Clock.systemUTC().instant().minus(Duration.ofDays(6))).block();
+
+        assertThat(Mono.from(testee.check()).block().getStatus())
+            .isEqualTo(ResultStatus.DEGRADED);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org