You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/06/28 14:00:05 UTC
[james-project] branch 3.8.x updated: JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1615)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.8.x by this push:
new 295f757b81 JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1615)
295f757b81 is described below
commit 295f757b811abf49d4c7a4c7109ea9b50c62a3b0
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Wed Jun 28 15:59:59 2023 +0200
JAMES-3919 RabbitMQMailQueue: clean up cassandra projection when we c… (#1609) (#1615)
---
.../backends/rabbitmq/RabbitMQManagementAPI.java | 3 ++
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 2 +-
.../org/apache/james/queue/rabbitmq/Enqueuer.java | 13 ++++++++-
.../james/queue/rabbitmq/RabbitMQMailQueue.java | 4 +--
.../queue/rabbitmq/view/api/FakeMailQueueView.java | 4 +--
.../queue/rabbitmq/view/api/MailQueueView.java | 3 +-
.../view/cassandra/CassandraMailQueueView.java | 11 ++++---
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 34 ++++++++++++----------
8 files changed, 46 insertions(+), 28 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
index 9b7dc78089..390d09137c 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQManagementAPI.java
@@ -454,6 +454,9 @@ public interface RabbitMQManagementAPI {
@RequestLine(value = "DELETE /api/queues/{vhost}/{name}", decodeSlash = false)
void deleteQueue(@Param("vhost") String vhost, @Param("name") String name);
+ @RequestLine(value = "DELETE /api/queues/{vhost}/{name}/contents", decodeSlash = false)
+ void purgeQueue(@Param("vhost") String vhost, @Param("name") String name);
+
@RequestLine(value = "GET /api/exchanges/{vhost}/{name}/bindings/source", decodeSlash = false)
List<BindingSource> listBindings(@Param("vhost") String vhost, @Param("name") String name);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index f9440c03fa..98d2d28b6d 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -141,7 +141,7 @@ class Dequeuer {
case SUCCESS:
dequeueMetric.increment();
response.ack();
- mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()));
+ Mono.from(mailQueueView.delete(DeleteCondition.withEnqueueId(mailWithEnqueueId.getEnqueueId(), mailWithEnqueueId.getBlobIds()))).block();
break;
case RETRY:
response.nack(REQUEUE);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index aacb1e06b6..658de9ba43 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -25,6 +25,7 @@ import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX;
import java.time.Clock;
import java.time.Duration;
+import java.util.function.Function;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
@@ -35,6 +36,7 @@ import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
+import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.mailet.Mail;
@@ -82,11 +84,20 @@ class Enqueuer {
return Flux.mergeDelayError(2,
mailQueueView.storeMail(enqueuedItem),
publishReferenceToRabbit(mailReference))
- .then();
+ .then()
+ .onErrorResume(cleanupMailQueueView(enqueueId, mailReference));
}).sneakyThrow())
.thenEmpty(Mono.fromRunnable(enqueueMetric::increment));
}
+ private Function<Throwable, Mono<Void>> cleanupMailQueueView(EnqueueId enqueueId, MailReference mailReference) {
+ return (Throwable e) -> {
+ DeleteCondition.WithEnqueueId deleteCondition = DeleteCondition.withEnqueueId(enqueueId, mailReference.getPartsId());
+ return Mono.from(mailQueueView.delete(deleteCondition))
+ .thenReturn(Mono.<Void>error(e));
+ };
+ }
+
Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) {
Mail mail = item.getMail();
return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), mail, item.getEnqueuedPartsId()))
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index c02365d272..fe01937033 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -121,12 +121,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
@Override
public long clear() {
- return mailQueueView.delete(DeleteCondition.all());
+ return Mono.from(mailQueueView.delete(DeleteCondition.all())).block();
}
@Override
public long remove(Type type, String value) {
- return mailQueueView.delete(DeleteCondition.from(type, value));
+ return Mono.from(mailQueueView.delete(DeleteCondition.from(type, value))).block();
}
@Override
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java
index 5399b1e11e..e7e5af5b09 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/FakeMailQueueView.java
@@ -54,8 +54,8 @@ public class FakeMailQueueView<V extends ManageableMailQueue.MailQueueItemView>
}
@Override
- public long delete(DeleteCondition deleteCondition) {
- return 0;
+ public Mono<Long> delete(DeleteCondition deleteCondition) {
+ return Mono.just(0L);
}
@Override
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 1649f63328..9cffba255a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -25,6 +25,7 @@ import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -39,7 +40,7 @@ public interface MailQueueView<V extends ManageableMailQueue.MailQueueItemView>
Mono<Void> storeMail(EnqueuedItem enqueuedItem);
- long delete(DeleteCondition deleteCondition);
+ Publisher<Long> delete(DeleteCondition deleteCondition);
Mono<Boolean> isPresent(EnqueueId id);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index b72f637d1c..94643777b9 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -128,24 +128,23 @@ public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueB
}
@Override
- public long delete(DeleteCondition deleteCondition) {
+ public Mono<Long> delete(DeleteCondition deleteCondition) {
if (deleteCondition instanceof DeleteCondition.WithEnqueueId) {
DeleteCondition.WithEnqueueId enqueueIdCondition = (DeleteCondition.WithEnqueueId) deleteCondition;
- delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds()).block();
- return 1L;
+ return delete(enqueueIdCondition.getEnqueueId(), enqueueIdCondition.getBlobIds())
+ .thenReturn(1L);
}
return browseThenDelete(deleteCondition);
}
- private long browseThenDelete(DeleteCondition deleteCondition) {
+ private Mono<Long> browseThenDelete(DeleteCondition deleteCondition) {
return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.map(EnqueuedItemWithSlicingContext::getEnqueuedItem)
.filter(deleteCondition::shouldBeDeleted)
.flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName)
.then(Mono.from(mimeMessageStore.delete(mailReference.getPartsId()))), DELETION_CONCURRENCY)
.count()
- .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName))
- .block();
+ .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName));
}
private Mono<Void> delete(EnqueueId enqueueId,
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 72034b03af..0e037d5ca3 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -32,6 +32,7 @@ import static org.apache.mailet.base.MailAddressFixture.RECIPIENT1;
import static org.apache.mailet.base.MailAddressFixture.SENDER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.TEN_SECONDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
@@ -41,6 +42,7 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
@@ -386,7 +388,12 @@ class RabbitMQMailQueueTest {
String name1 = "myMail1";
String name2 = "myMail2";
String name3 = "myMail3";
- Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+
+ List<MailQueue.MailQueueItem> receivedItem = new ArrayList<>();
+ Flux.from(getMailQueue().deQueue())
+ .doOnNext(receivedItem::add)
+ .subscribe();
+
getMailQueue().enQueue(defaultMail()
.name(name1)
.build());
@@ -409,19 +416,16 @@ class RabbitMQMailQueueTest {
.name(name3)
.build());
- List<MailQueue.MailQueueItem> items = dequeueFlux.take(3).collectList().block(Duration.ofSeconds(10));
-
- assertThat(items)
- .extracting(item -> item.getMail().getName())
- .contains(name1, name3);
+ await().atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> assertThat(receivedItem)
+ .extracting(item -> item.getMail().getName())
+ .contains(name1, name3));
}
@Test
void enqueuedEmailsShouldNotBeLostDuringRabbitMQOutages() throws Exception {
String name = "myMail";
- rabbitMQExtension.getRabbitMQ().pause();
- Thread.sleep(2000);
try {
getMailQueue().enQueue(defaultMail()
@@ -430,8 +434,8 @@ class RabbitMQMailQueueTest {
} catch (Exception e) {
// Ignore
}
- rabbitMQExtension.getRabbitMQ().unpause();
- Thread.sleep(100);
+ rabbitMQExtension.managementAPI().purgeQueue(rabbitMQExtension.getRabbitMQ().getConfiguration().getVhost().orElse("/"),
+ "JamesMailQueue-workqueue-spool");
getMailQueue().republishNotProcessedMails(clock.instant().plus(30, ChronoUnit.MINUTES)).blockLast();
@@ -669,7 +673,7 @@ class RabbitMQMailQueueTest {
.subscribe();
try {
- Awaitility.await().untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times));
+ await().untilAsserted(() -> assertThat(counter.get()).isGreaterThanOrEqualTo(times));
} finally {
disposable.dispose();
}
@@ -786,7 +790,7 @@ class RabbitMQMailQueueTest {
.subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.subscribe();
- Awaitility.await().atMost(TEN_SECONDS)
+ await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(dequeuedMailNames)
.containsExactly(name1, name2, name3));
}
@@ -826,7 +830,7 @@ class RabbitMQMailQueueTest {
.subscribeOn(Schedulers.fromExecutor(EXECUTOR)))
.subscribe();
- Awaitility.await().atMost(TEN_SECONDS)
+ await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(dequeuedMailNames)
.containsExactly(name1, name2, name3));
}
@@ -854,7 +858,7 @@ class RabbitMQMailQueueTest {
.subscribe();
- Awaitility.await().atMost(TEN_SECONDS)
+ await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
}
@@ -880,7 +884,7 @@ class RabbitMQMailQueueTest {
.subscribeOn(Schedulers.fromExecutor(EXECUTOR))
.subscribe();
- Awaitility.await().atMost(TEN_SECONDS)
+ await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org