You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/06/28 14:00:05 UTC

[james-project] branch 3.8.x updated: JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1615)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.8.x by this push:
     new 295f757b81 JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1615)
295f757b81 is described below

commit 295f757b811abf49d4c7a4c7109ea9b50c62a3b0
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Wed Jun 28 15:59:59 2023 +0200

    JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1615)
---
 .../backends/rabbitmq/RabbitMQManagementAPI.java   |  3 ++
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  2 +-
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  | 13 ++++++++-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |  4 +--
 .../queue/rabbitmq/view/api/FakeMailQueueView.java |  4 +--
 .../queue/rabbitmq/view/api/MailQueueView.java     |  3 +-
 .../view/cassandra/CassandraMailQueueView.java     | 11 ++++---
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 34 ++++++++++++----------
 8 files changed, 46 insertions(+), 28 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
index 9b7dc78089..390d09137c 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
@@ -454,6 +454,9 @@ public interface RabbitMQManagementAPI {
     @RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = false)
     void deleteQueue(@Param("vhost") String vhost, @Param("name") String name);
 
+    @RequestLine(value = "DELETE /api/queues/{vhost}/{name}/contents", decodeSlash = false)
+    void purgeQueue(@Param("vhost") String vhost, @Param("name") String name);
+
     @RequestLine(value = "GET /api/exchanges/{vhost}/{name}/bindings/source", decodeSlash = false)
     List<BindingSource> listBindings(@Param("vhost") String vhost, @Param("name") String name);
 
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index f9440c03fa..98d2d28b6d 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -141,7 +141,7 @@ class Dequeuer {
                 case SUCCESS:
                     dequeueMetric.increment();
                     response.ack();
-                    mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()));
+                    Mono.from(mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()))).block();
                     break;
                 case RETRY:
                     response.nack(REQUEUE);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index aacb1e06b6..658de9ba43 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -25,6 +25,7 @@ import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
 
 import java.time.Clock;
 import java.time.Duration;
+import java.util.function.Function;
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -35,6 +36,7 @@ import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
+import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.mailet.Mail;
@@ -82,11 +84,20 @@ class Enqueuer {
                 return Flux.mergeDelayError(2,
                         mailQueueView.storeMail(enqueuedItem),
                         publishReferenceToRabbit(mailReference))
-                        .then();
+                        .then()
+                        .onErrorResume(cleanupMailQueueView(enqueueId, mailReference));
             }).sneakyThrow())
             .thenEmpty(Mono.fromRunnable(enqueueMetric::increment));
     }
 
+    private Function<Throwable, Mono<Void>> cleanupMailQueueView(EnqueueId enqueueId, MailReference mailReference) {
+        return (Throwable e) -> {
+            DeleteCondition.WithEnqueueId deleteCondition = DeleteCondition.withEnqueueId(enqueueId, mailReference.getPartsId());
+            return Mono.from(mailQueueView.delete(deleteCondition))
+                    .thenReturn(Mono.<Void>error(e));
+        };
+    }
+
     Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) {
         Mail mail = item.getMail();
         return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), mail, item.getEnqueuedPartsId()))
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index c02365d272..fe01937033 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -121,12 +121,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
 
     @Override
     public long clear() {
-        return mailQueueView.delete(DeleteCondition.all());
+        return Mono.from(mailQueueView.delete(DeleteCondition.all())).block();
     }
 
     @Override
     public long remove(Type type, String value) {
-        return mailQueueView.delete(DeleteCondition.from(type, value));
+        return Mono.from(mailQueueView.delete(DeleteCondition.from(type, value))).block();
     }
 
     @Override
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java
index 5399b1e11e..e7e5af5b09 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java
@@ -54,8 +54,8 @@ public class FakeMailQueueView<V extends ManageableMailQueue.MailQueueItemView>
     }
 
     @Override
-    public long delete(DeleteCondition deleteCondition) {
-        return 0;
+    public Mono<Long> delete(DeleteCondition deleteCondition) {
+        return Mono.just(0L);
     }
 
     @Override
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 1649f63328..9cffba255a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -25,6 +25,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.EnqueueId;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -39,7 +40,7 @@ public interface MailQueueView<V extends ManageableMailQueue.MailQueueItemView>
 
     Mono<Void> storeMail(EnqueuedItem enqueuedItem);
 
-    long delete(DeleteCondition deleteCondition);
+    Publisher<Long> delete(DeleteCondition deleteCondition);
 
     Mono<Boolean> isPresent(EnqueueId id);
 
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index b72f637d1c..94643777b9 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -128,24 +128,23 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
     }
 
     @Override
-    public long delete(DeleteCondition deleteCondition) {
+    public Mono<Long> delete(DeleteCondition deleteCondition) {
         if (deleteCondition instanceof DeleteCondition.WithEnqueueId) {
             DeleteCondition.WithEnqueueId enqueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition;
-            delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()).block();
-            return 1L;
+            return delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds())
+                .thenReturn(1L);
         }
         return browseThenDelete(deleteCondition);
     }
 
-    private long browseThenDelete(DeleteCondition deleteCondition) {
+    private Mono<Long> browseThenDelete(DeleteCondition deleteCondition) {
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
             .filter(deleteCondition::shouldBeDeleted)
             .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName)
                 .then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))), DELETION_CONCURRENCY)
             .count()
-            .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
-            .block();
+            .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
     }
 
     private Mono<Void> delete(EnqueueId enqueueId,
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 72034b03af..0e037d5ca3 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -32,6 +32,7 @@ import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1;
 import static org.apache.mailet.base.MailAddressFixture.SENDER;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.awaitility.Awaitility.await;
 import static org.awaitility.Durations.TEN_SECONDS;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.never;
@@ -41,6 +42,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
@@ -386,7 +388,12 @@ class RabbitMQMailQueueTest {
             String name1 = "myMail1";
             String name2 = "myMail2";
             String name3 = "myMail3";
-            Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+
+            List<MailQueue.MailQueueItem> receivedItem = new ArrayList<>();
+            Flux.from(getMailQueue().deQueue())
+                    .doOnNext(receivedItem::add)
+                    .subscribe();
+
             getMailQueue().enQueue(defaultMail()
                 .name(name1)
                 .build());
@@ -409,19 +416,16 @@ class RabbitMQMailQueueTest {
                 .name(name3)
                 .build());
 
-            List<MailQueue.MailQueueItem> items = dequeueFlux.take(3).collectList().block(Duration.ofSeconds(10));
-
-            assertThat(items)
-                .extracting(item -> item.getMail().getName())
-                .contains(name1, name3);
+            await().atMost(Duration.ofSeconds(10))
+                    .untilAsserted(() -> assertThat(receivedItem)
+                            .extracting(item -> item.getMail().getName())
+                            .contains(name1, name3));
         }
 
         @Test
         void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws Exception {
             String name = "myMail";
 
-            rabbitMQExtension.getRabbitMQ().pause();
-            Thread.sleep(2000);
 
             try {
                 getMailQueue().enQueue(defaultMail()
@@ -430,8 +434,8 @@ class RabbitMQMailQueueTest {
             } catch (Exception e) {
                 // Ignore
             }
-            rabbitMQExtension.getRabbitMQ().unpause();
-            Thread.sleep(100);
+            rabbitMQExtension.managementAPI().purgeQueue(rabbitMQExtension.getRabbitMQ().getConfiguration().getVhost().orElse("/"),
+                    "JamesMailQueue-workqueue-spool");
 
             getMailQueue().republishNotProcessedMails(clock.instant().plus(30, ChronoUnit.MINUTES)).blockLast();
 
@@ -669,7 +673,7 @@ class RabbitMQMailQueueTest {
                 .subscribe();
 
             try {
-                Awaitility.await().untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times));
+                await().untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times));
             } finally {
                 disposable.dispose();
             }
@@ -786,7 +790,7 @@ class RabbitMQMailQueueTest {
                     .subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
                 .subscribe();
 
-            Awaitility.await().atMost(TEN_SECONDS)
+            await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
@@ -826,7 +830,7 @@ class RabbitMQMailQueueTest {
                     .subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
                 .subscribe();
 
-            Awaitility.await().atMost(TEN_SECONDS)
+            await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
@@ -854,7 +858,7 @@ class RabbitMQMailQueueTest {
                 .subscribe();
 
 
-            Awaitility.await().atMost(TEN_SECONDS)
+            await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
         }
 
@@ -880,7 +884,7 @@ class RabbitMQMailQueueTest {
                 .subscribeOn(Schedulers.fromExecutor(EXECUTOR))
                 .subscribe();
 
-            Awaitility.await().atMost(TEN_SECONDS)
+            await().atMost(TEN_SECONDS)
                 .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org