You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/17 02:24:15 UTC
[james-project] 01/31: JAMES-3296 Add republishing to
RabbitMQMailQueue from Cassandra capability
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 17f8fc5696b56c159a23d6d4abbde0d996756baf
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Mon Jul 6 15:16:48 2020 +0200
JAMES-3296 Add republishing to RabbitMQMailQueue from Cassandra capability
---
.../james/webadmin/dto/MailQueueItemDTOTest.java | 3 +-
.../james/queue/api/ManageableMailQueue.java | 18 +-
.../james/queue/file/FileCacheableMailQueue.java | 2 +-
.../james/queue/jms/JMSCacheableMailQueue.java | 2 +-
.../james/queue/memory/MemoryMailQueueFactory.java | 4 +-
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 6 +-
.../org/apache/james/queue/rabbitmq/Enqueuer.java | 8 +
.../james/queue/rabbitmq/RabbitMQMailQueue.java | 17 +-
.../queue/rabbitmq/view/api/MailQueueView.java | 9 +-
.../view/cassandra/CassandraMailQueueBrowser.java | 64 ++++++-
.../view/cassandra/CassandraMailQueueView.java | 22 ++-
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 193 ++++++++++++++++++++-
.../rabbitmq/RabbitMqMailQueueFactoryTest.java | 3 +-
13 files changed, 322 insertions(+), 29 deletions(-)
diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java
index 18b14d2..4b177de 100644
--- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java
+++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.james.core.MailAddress;
import org.apache.james.queue.api.Mails;
+import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView;
import org.apache.mailet.base.test.FakeMail;
import org.assertj.core.api.JUnitSoftAssertions;
@@ -54,7 +55,7 @@ public class MailQueueItemDTOTest {
public void fromShouldCreateTheRightObject() throws Exception {
FakeMail mail = Mails.defaultMail().name("name").build();
ZonedDateTime date = ZonedDateTime.parse("2018-01-02T11:22:02Z");
- MailQueueItemView mailQueueItemView = new MailQueueItemView(mail, date);
+ MailQueueItemView mailQueueItemView = new ManageableMailQueue.DefaultMailQueueItemView(mail, date);
MailQueueItemDTO mailQueueItemDTO = MailQueueItemDTO.from(mailQueueItemView);
List<String> expectedRecipients = mail.getRecipients().stream()
.map(MailAddress::asString)
diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
index 20635a6..5309dbc 100644
--- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
+++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
@@ -91,20 +91,30 @@ public interface ManageableMailQueue extends MailQueue {
/**
* Represent a View over a queue {@link MailQueue.MailQueueItem}
*/
- class MailQueueItemView {
+ interface MailQueueItemView {
+ Mail getMail();
+
+ Optional<ZonedDateTime> getNextDelivery();
+ }
+
+
+ /**
+ * Represent a View over a queue {@link MailQueue.MailQueueItem}
+ */
+ class DefaultMailQueueItemView implements MailQueueItemView {
private final Mail mail;
private final Optional<ZonedDateTime> nextDelivery;
- public MailQueueItemView(Mail mail) {
+ public DefaultMailQueueItemView(Mail mail) {
this(mail, Optional.empty());
}
- public MailQueueItemView(Mail mail, ZonedDateTime nextDelivery) {
+ public DefaultMailQueueItemView(Mail mail, ZonedDateTime nextDelivery) {
this(mail, Optional.of(nextDelivery));
}
- public MailQueueItemView(Mail mail, Optional<ZonedDateTime> nextDelivery) {
+ public DefaultMailQueueItemView(Mail mail, Optional<ZonedDateTime> nextDelivery) {
this.mail = mail;
this.nextDelivery = nextDelivery;
}
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
index d32968e..cb69709 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
@@ -477,7 +477,7 @@ public class FileCacheableMailQueue implements ManageableMailQueue {
while (items.hasNext()) {
try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(items.next().getObjectFile()))) {
final Mail mail = (Mail) in.readObject();
- item = new MailQueueItemView(mail, getNextDelivery(mail));
+ item = new DefaultMailQueueItemView(mail, getNextDelivery(mail));
return true;
} catch (IOException | ClassNotFoundException e) {
LOGGER.info("Unable to load mail", e);
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 4e7cee9..1084b17 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -647,7 +647,7 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M
while (hasNext()) {
try {
Message m = messages.nextElement();
- return new MailQueueItemView(createMail(m), nextDeliveryDate(m));
+ return new DefaultMailQueueItemView(createMail(m), nextDeliveryDate(m));
} catch (MessagingException | JMSException e) {
LOGGER.error("Unable to browse queue", e);
}
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index da9e8b1..93f12ca 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -221,9 +221,9 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF
@Override
public MailQueueIterator browse() throws MailQueueException {
- Iterator<MailQueueItemView> underlying = ImmutableList.copyOf(mailItems)
+ Iterator<DefaultMailQueueItemView> underlying = ImmutableList.copyOf(mailItems)
.stream()
- .map(item -> new MailQueueItemView(item.getMail(), item.delivery))
+ .map(item -> new DefaultMailQueueItemView(item.getMail(), item.delivery))
.iterator();
return new MailQueueIterator() {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index fe209f9..b477335 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -32,6 +32,7 @@ import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,13 +82,13 @@ class Dequeuer implements Closeable {
private final MailLoader mailLoader;
private final Metric dequeueMetric;
private final MailReferenceSerializer mailReferenceSerializer;
- private final MailQueueView mailQueueView;
+ private final MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView;
private final Receiver receiver;
private final Flux<AcknowledgableDelivery> flux;
Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, MailLoader mailLoader,
MailReferenceSerializer serializer, MetricFactory metricFactory,
- MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
+ MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
this.mailLoader = mailLoader;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
@@ -162,5 +163,4 @@ class Dequeuer implements Closeable {
return Mono.empty();
});
}
-
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index c134798..af30c14 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -34,6 +34,7 @@ import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.mailet.Mail;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -76,6 +77,13 @@ class Enqueuer {
.block();
}
+ Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) {
+ Mail mail = item.getMail();
+ return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), mail, item.getEnqueuedPartsId()))
+ .flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow())
+ .then();
+ }
+
private Mono<MimeMessagePartsId> saveMail(Mail mail) throws MailQueue.MailQueueException {
try {
return mimeMessageStore.save(mail.getMessage());
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 3ed3ba9..5e838db 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -20,20 +20,24 @@
package org.apache.james.queue.rabbitmq;
import java.time.Duration;
+import java.time.Instant;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class RabbitMQMailQueue implements ManageableMailQueue {
@@ -43,12 +47,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
private final MetricFactory metricFactory;
private final Enqueuer enqueuer;
private final Dequeuer dequeuer;
- private final MailQueueView mailQueueView;
+ private final MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView;
private final MailQueueItemDecoratorFactory decoratorFactory;
RabbitMQMailQueue(MetricFactory metricFactory, MailQueueName name,
Enqueuer enqueuer, Dequeuer dequeuer,
- MailQueueView mailQueueView, MailQueueItemDecoratorFactory decoratorFactory) {
+ MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView, MailQueueItemDecoratorFactory decoratorFactory) {
this.metricFactory = metricFactory;
this.name = name;
this.enqueuer = enqueuer;
@@ -119,4 +123,13 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
.add("name", name)
.toString();
}
+
+ public Flux<String> republishNotProcessedMails(Instant olderThan) {
+ Function<CassandraMailQueueBrowser.CassandraMailQueueItemView, Mono<String>> requeue = item ->
+ enqueuer.reQueue(item)
+ .thenReturn(item.getMail().getName());
+
+ return mailQueueView.browseOlderThanReactive(olderThan)
+ .flatMap(requeue);
+ }
}
\ No newline at end of file
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 33eec80..921a08b 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -19,14 +19,17 @@
package org.apache.james.queue.rabbitmq.view.api;
+import java.time.Instant;
+
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-public interface MailQueueView {
+public interface MailQueueView<V extends ManageableMailQueue.MailQueueItemView> {
interface Factory {
MailQueueView create(MailQueueName mailQueueName);
@@ -42,5 +45,9 @@ public interface MailQueueView {
ManageableMailQueue.MailQueueIterator browse();
+ Flux<V> browseReactive();
+
+ Flux<V> browseOlderThanReactive(Instant olderThan);
+
long getSize();
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 91223ad..455025c 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -24,17 +24,21 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice
import java.time.Clock;
import java.time.Instant;
+import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.Optional;
import javax.inject.Inject;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.blob.api.Store;
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
@@ -52,9 +56,9 @@ public class CassandraMailQueueBrowser {
static class CassandraMailQueueIterator implements ManageableMailQueue.MailQueueIterator {
- private final Iterator<ManageableMailQueue.MailQueueItemView> iterator;
+ private final Iterator<CassandraMailQueueItemView> iterator;
- CassandraMailQueueIterator(Iterator<ManageableMailQueue.MailQueueItemView> iterator) {
+ CassandraMailQueueIterator(Iterator<CassandraMailQueueItemView> iterator) {
Preconditions.checkNotNull(iterator);
this.iterator = iterator;
@@ -71,7 +75,7 @@ public class CassandraMailQueueBrowser {
}
@Override
- public ManageableMailQueue.MailQueueItemView next() {
+ public CassandraMailQueueItemView next() {
return iterator.next();
}
}
@@ -100,10 +104,24 @@ public class CassandraMailQueueBrowser {
this.clock = clock;
}
- Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) {
+ Flux<CassandraMailQueueItemView> browse(MailQueueName queueName) {
return browseReferences(queueName)
.flatMapSequential(this::toMailFuture)
- .map(ManageableMailQueue.MailQueueItemView::new);
+ .map(CassandraMailQueueItemView::new);
+ }
+
+ Flux<CassandraMailQueueItemView> browseOlderThan(MailQueueName queueName, Instant olderThan) {
+ return browseReferencesOlderThan(queueName, olderThan)
+ .flatMapSequential(this::toMailFuture)
+ .map(CassandraMailQueueItemView::new);
+ }
+
+ Flux<EnqueuedItemWithSlicingContext> browseReferencesOlderThan(MailQueueName queueName, Instant olderThan) {
+ return browseStartDao.findBrowseStart(queueName)
+ .flatMapMany(this::allSlicesStartingAt)
+ .filter(slice -> slice.getStartSliceInstant().isBefore(olderThan))
+ .flatMapSequential(slice -> browseSlice(queueName, slice))
+ .filter(item -> item.getEnqueuedItem().getEnqueuedTime().isBefore(olderThan));
}
Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) {
@@ -112,10 +130,10 @@ public class CassandraMailQueueBrowser {
.flatMapSequential(slice -> browseSlice(queueName, slice));
}
- private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
+ private Mono<Pair<EnqueuedItem, Mail>> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem();
return mimeMessageStore.read(enqueuedItem.getPartsId())
- .map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
+ .map(mimeMessage -> Pair.of(enqueuedItem, toMail(enqueuedItem, mimeMessage)));
}
private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
@@ -151,4 +169,36 @@ public class CassandraMailQueueBrowser {
.range(0, configuration.getBucketCount())
.map(BucketId::of);
}
+
+ public static class CassandraMailQueueItemView implements ManageableMailQueue.MailQueueItemView {
+ private final EnqueuedItem enqueuedItem;
+ private final Mail mail;
+
+ public CassandraMailQueueItemView(Pair<EnqueuedItem, Mail> pair) {
+ this(pair.getLeft(), pair.getRight());
+ }
+
+ public CassandraMailQueueItemView(EnqueuedItem enqueuedItem, Mail mail) {
+ this.enqueuedItem = enqueuedItem;
+ this.mail = mail;
+ }
+
+ public EnqueueId getEnqueuedId() {
+ return enqueuedItem.getEnqueueId();
+ }
+
+ public MimeMessagePartsId getEnqueuedPartsId() {
+ return enqueuedItem.getPartsId();
+ }
+
+ @Override
+ public Mail getMail() {
+ return mail;
+ }
+
+ @Override
+ public Optional<ZonedDateTime> getNextDelivery() {
+ return Optional.empty();
+ }
+ }
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index b82de97..72b26a3 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -21,6 +21,8 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import static org.apache.james.util.FunctionalUtils.negate;
+import java.time.Instant;
+
import javax.inject.Inject;
import org.apache.james.queue.api.ManageableMailQueue;
@@ -33,10 +35,11 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
-public class CassandraMailQueueView implements MailQueueView {
+public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> {
public static class Factory implements MailQueueView.Factory {
private final CassandraMailQueueMailStore storeHelper;
@@ -91,13 +94,24 @@ public class CassandraMailQueueView implements MailQueueView {
@Override
public ManageableMailQueue.MailQueueIterator browse() {
return new CassandraMailQueueBrowser.CassandraMailQueueIterator(
- cassandraMailQueueBrowser.browse(mailQueueName)
- .subscribeOn(Schedulers.elastic())
+ browseReactive()
.toIterable()
.iterator());
}
@Override
+ public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> browseReactive() {
+ return cassandraMailQueueBrowser.browse(mailQueueName)
+ .subscribeOn(Schedulers.elastic());
+ }
+
+ @Override
+ public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> browseOlderThanReactive(Instant olderThan) {
+ return cassandraMailQueueBrowser.browseOlderThan(mailQueueName, olderThan)
+ .subscribeOn(Schedulers.elastic());
+ }
+
+ @Override
public long getSize() {
return cassandraMailQueueBrowser.browseReferences(mailQueueName)
.count()
@@ -133,6 +147,6 @@ public class CassandraMailQueueView implements MailQueueView {
@Override
public Mono<Boolean> isPresent(EnqueueId id) {
return cassandraMailQueueMailDelete.isDeleted(id, mailQueueName)
- .map(negate());
+ .map(negate());
}
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index bf2d209..8808bfc 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -23,6 +23,7 @@ import static java.time.temporal.ChronoUnit.HOURS;
import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -75,11 +76,12 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import com.github.fge.lambdas.Throwing;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
class RabbitMQMailQueueTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -132,7 +134,7 @@ class RabbitMQMailQueueTest {
}
@Override
- public MailQueue getMailQueue() {
+ public RabbitMQMailQueue getMailQueue() {
return mailQueue;
}
@@ -288,6 +290,177 @@ class RabbitMQMailQueueTest {
.containsExactly(name1, name2, name3);
}
+ @Test
+ void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
+ clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+ String name1 = "myMail1";
+ String name2 = "myMail2";
+ String name3 = "myMail3";
+ Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+
+ // Avoid early processing and prefetching
+ Sender sender = rabbitMQExtension.getSender();
+
+ suspendDequeuing(sender);
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name2)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name3)
+ .build());
+
+ resumeDequeuing(sender);
+ assertThat(getMailQueue()
+ .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+ .collectList()
+ .block())
+ .containsExactlyInAnyOrder(name1, name2, name3);
+
+ List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+ assertThat(items)
+ .extracting(item -> item.getMail().getName())
+ .containsExactlyInAnyOrder(name1, name2, name3);
+ }
+
+ @Test
+ void onlyOldMessagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception {
+ clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+ String name1 = "myMail1";
+ String name2 = "myMail2";
+ String name3 = "myMail3";
+ Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+
+ // Avoid early processing and prefetching
+ Sender sender = rabbitMQExtension.getSender();
+
+ suspendDequeuing(sender);
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name2)
+ .build());
+
+ clock.setInstant(Instant.now());
+ getMailQueue().enQueue(defaultMail()
+ .name(name3)
+ .build());
+
+ resumeDequeuing(sender);
+ assertThat(getMailQueue()
+ .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+ .collectList()
+ .block())
+ .containsExactlyInAnyOrder(name1, name2);
+
+ List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+ assertThat(items)
+ .extracting(item -> item.getMail().getName())
+ .containsExactlyInAnyOrder(name1, name2);
+ }
+
+ @Test
+ void messagesShouldBeProcessedAfterTwoMailsReprocessing() throws Exception {
+ clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+ String name1 = "myMail1";
+ String name2 = "myMail2";
+ String name3 = "myMail3";
+ Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+
+ // Avoid early processing and prefetching
+ Sender sender = rabbitMQExtension.getSender();
+
+ suspendDequeuing(sender);
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name2)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name3)
+ .build());
+
+ assertThat(getMailQueue()
+ .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+ .collectList()
+ .block())
+ .containsExactlyInAnyOrder(name1, name2, name3);
+ resumeDequeuing(sender);
+ assertThat(getMailQueue()
+ .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+ .collectList()
+ .block())
+ .containsExactlyInAnyOrder(name1, name2, name3);
+
+ List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+ assertThat(items)
+ .extracting(item -> item.getMail().getName())
+ .containsExactlyInAnyOrder(name1, name2, name3);
+ }
+
+ @Test
+ void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessedAndNewMessagesShouldNotBeLost() throws Exception {
+ clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+ String name1 = "myMail1";
+ String name2 = "myMail2";
+ String name3 = "myMail3";
+ Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue());
+
+ // Avoid early processing and prefetching
+ Sender sender = rabbitMQExtension.getSender();
+
+ suspendDequeuing(sender);
+ //mail send when rabbit down
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+ resumeDequeuing(sender);
+
+ //mail send when rabbit is up again and before rebuild
+ clock.setInstant(Instant.now());
+ getMailQueue().enQueue(defaultMail()
+ .name(name3)
+ .build());
+
+ Flux.merge(Mono.fromCallable(() -> {
+ //mail send concurently with rebuild
+ getMailQueue().enQueue(defaultMail()
+ .name(name2)
+ .build());
+ return true;
+
+ }), Mono.fromRunnable(() ->
+ assertThat(getMailQueue()
+ .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+ .collectList()
+ .block())
+ .containsOnly(name1)
+ ))
+ .then()
+ .block(Duration.ofSeconds(10));
+
+ List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+ assertThat(items)
+ .extracting(item -> item.getMail().getName())
+ .containsExactlyInAnyOrder(name1, name2, name3);
+ }
+
private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) {
IntStream.rangeClosed(1, emailCount)
.forEach(Throwing.intConsumer(i -> enQueue(defaultMail()
@@ -485,6 +658,22 @@ class RabbitMQMailQueueTest {
Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
.untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
}
+
+ private void resumeDequeuing(Sender sender) {
+ sender.bindQueue(getMailQueueBindingSpecification()).block();
+ }
+
+ private void suspendDequeuing(Sender sender) {
+ sender.unbindQueue(getMailQueueBindingSpecification()).block();
+ }
+
+ private BindingSpecification getMailQueueBindingSpecification() {
+ MailQueueName mailQueueName = MailQueueName.fromString(getMailQueue().getName().asString());
+ return BindingSpecification.binding()
+ .exchange(mailQueueName.toRabbitExchangeName().asString())
+ .queue(mailQueueName.toWorkQueueName().asString())
+ .routingKey(EMPTY_ROUTING_KEY);
+ }
}
@Nested
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index c40fb11..2917b35 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.james.queue.api.MailQueueFactoryContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -52,7 +53,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM
void setup() throws Exception {
MimeMessageStore.Factory mimeMessageStoreFactory = mock(MimeMessageStore.Factory.class);
MailQueueView.Factory mailQueueViewFactory = mock(MailQueueView.Factory.class);
- MailQueueView mailQueueView = mock(MailQueueView.class);
+ MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView = mock(MailQueueView.class);
when(mailQueueViewFactory.create(any()))
.thenReturn(mailQueueView);
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org