You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/13 02:53:28 UTC
[james-project] 01/13: JAMES-3041 Change the way we control
resources for Spooler mail processing
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d6c0a6b283161d4ee3b2f0d0f6f04804e9feb881
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Feb 18 15:47:05 2020 +0100
JAMES-3041 Change the way we control resources for Spooler mail processing
For now, we controlled resources using a bounded threadpool
for mail processing.
However, it makes the concurrency of the dequeuing harder
to achieve and doesn't leverage modern tools Reactor brings
like explicit control over concurrency for a given Flux.
With this commit we start using elastic Scheduler everywhere
and control resources by specifying concurrency parameter.
---
.../mailetcontainer/impl/JamesMailSpooler.java | 53 ++++++++--------------
.../mailetcontainer/impl/JamesMailSpoolerTest.java | 5 +-
.../remote/delivery/RemoteDeliveryTest.java | 2 +-
.../SetMessagesOutboxFlagUpdateTest.java | 4 +-
.../routes/MailRepositoriesRoutesTest.java | 2 +-
.../webadmin/service/ReprocessingServiceTest.java | 2 +-
.../apache/james/queue/api/MailQueueFactory.java | 47 ++++++++++++++++++-
.../james/queue/api/MailQueueFactoryTest.java | 26 +++++++++++
.../api/ManageableMailQueueFactoryContract.java | 6 +--
.../james/queue/file/FileMailQueueFactory.java | 4 +-
.../queue/library/AbstractMailQueueFactory.java | 6 +--
.../james/queue/memory/MemoryMailQueueFactory.java | 6 +--
.../MemoryCacheableMailQueueFactoryTest.java | 6 +--
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 11 +++--
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 22 ++++-----
15 files changed, 130 insertions(+), 72 deletions(-)
diff --git a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
index 28383cd..aaa86d7 100644
--- a/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
+++ b/server/mailet/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/impl/JamesMailSpooler.java
@@ -22,7 +22,6 @@ package org.apache.james.mailetcontainer.impl;
import static org.apache.james.metrics.api.TimeMetric.ExecutionResult.DEFAULT_100_MS_THRESHOLD;
import java.io.IOException;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
@@ -41,14 +40,12 @@ import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueue.MailQueueItem;
import org.apache.james.queue.api.MailQueueFactory;
-import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
@@ -62,9 +59,10 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
public static final String SPOOL_PROCESSING = "spoolProcessing";
/**
- * The number of threads used to move mail through the spool.
+ * concurrency level to use for dequeuing mails from spool, allows to throttle resources dedicated to that async
+ * process.
*/
- private int numThreads;
+ private int concurrencyLevel;
private final AtomicInteger processingActive = new AtomicInteger(0);
@@ -77,11 +75,8 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
private final MailQueueFactory<?> queueFactory;
private reactor.core.Disposable disposable;
- private Scheduler spooler;
- private int parallelismLevel;
private MailQueue queue;
-
@Inject
public JamesMailSpooler(MetricFactory metricFactory, MailProcessor mailProcessor, MailQueueFactory<?> queueFactory) {
this.metricFactory = metricFactory;
@@ -91,10 +86,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
@Override
public void configure(HierarchicalConfiguration<ImmutableNode> config) {
- numThreads = config.getInt("threads", 100);
- //Reactor helps us run things in parallel but we have to ensure there are always threads available
- //in the threadpool to avoid starvation.
- parallelismLevel = Math.max(1, numThreads - 2);
+ concurrencyLevel = config.getInt("threads", 100);
}
/**
@@ -103,18 +95,17 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
@PostConstruct
public void init() {
LOGGER.info("init...");
- queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
- spooler = Schedulers.fromExecutor(Executors.newFixedThreadPool(numThreads, NamedThreadFactory.withName("spooler")));
- LOGGER.info("uses {} Thread(s)", numThreads);
- run();
+ LOGGER.info("Concurrency level is {}", concurrencyLevel);
+ queue = queueFactory.createQueue(MailQueueFactory.SPOOL, MailQueueFactory.prefetchCount(concurrencyLevel));
+ disposable = run(queue);
+ LOGGER.info("Spooler started");
}
- private void run() {
- LOGGER.info("Queue={}", queue);
- disposable = Flux.from(queue.deQueue())
- .flatMap(item -> handleOnQueueItem(item).subscribeOn(spooler), parallelismLevel)
+ private reactor.core.Disposable run(MailQueue queue) {
+ return Flux.from(queue.deQueue())
+ .flatMap(item -> handleOnQueueItem(item).subscribeOn(Schedulers.elastic()), concurrencyLevel)
.onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
- .subscribeOn(spooler)
+ .subscribeOn(Schedulers.elastic())
.subscribe();
}
@@ -122,7 +113,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
try {
return Mono.fromCallable(processingActive::incrementAndGet)
- .flatMap(ignore -> processMail(queueItem).subscribeOn(spooler))
+ .flatMap(ignore -> processMail(queueItem))
.doOnSuccess(any -> timeMetric.stopAndPublish().logWhenExceedP99(DEFAULT_100_MS_THRESHOLD))
.doOnSuccess(any -> processingActive.decrementAndGet());
} catch (Throwable e) {
@@ -134,16 +125,12 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
return Mono
.using(
queueItem::getMail,
- resource -> Mono.just(resource)
- .doOnNext(mail -> LOGGER.debug("==== Begin processing mail {} ====", mail.getName()))
- .map(mail -> performProcessMail(queueItem, mail))
- .doOnNext(mail -> LOGGER.debug("==== End processing mail {} ====", mail.getName())),
- LifecycleUtil::dispose)
- .then();
+ mail -> Mono.fromRunnable(() -> performProcessMail(queueItem, mail)),
+ LifecycleUtil::dispose);
}
-
- private Mail performProcessMail(MailQueueItem queueItem, Mail mail) {
+ private void performProcessMail(MailQueueItem queueItem, Mail mail) {
+ LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
try {
mailProcessor.service(mail);
@@ -157,8 +144,9 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
} catch (MailQueue.MailQueueException ex) {
throw new RuntimeException(e);
}
+ } finally {
+ LOGGER.debug("==== End processing mail {} ====", mail.getName());
}
- return mail;
}
/**
@@ -174,7 +162,6 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
public void dispose() {
LOGGER.info("start dispose() ...");
disposable.dispose();
- spooler.dispose();
try {
queue.close();
} catch (IOException e) {
@@ -185,7 +172,7 @@ public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMB
@Override
public int getThreadCount() {
- return numThreads;
+ return concurrencyLevel;
}
@Override
diff --git a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
index 0b62199..9485686 100644
--- a/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
+++ b/server/mailet/mailetcontainer-camel/src/test/java/org/apache/james/mailetcontainer/impl/JamesMailSpoolerTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
import static org.awaitility.Duration.TEN_SECONDS;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -73,7 +74,7 @@ class JamesMailSpoolerTest {
MailQueue queue = mock(MailQueue.class);
workQueue.onNext(item);
when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
- when(queueFactory.createQueue(MailQueueFactory.SPOOL)).thenAnswer(any -> queue);
+ when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
doThrow(new RuntimeException("Arbitrary failure"))
.doNothing()
@@ -107,7 +108,7 @@ class JamesMailSpoolerTest {
MailQueue queue = mock(MailQueue.class);
workQueue.onNext(item);
when(queue.deQueue()).thenAnswer(any -> workQueue.limitRate(1).filter(MockedMailQueueItem::isNotDone));
- when(queueFactory.createQueue(MailQueueFactory.SPOOL)).thenAnswer(any -> queue);
+ when(queueFactory.createQueue(eq(MailQueueFactory.SPOOL), any())).thenAnswer(any -> queue);
doAnswer(ignored -> {
Thread.currentThread().interrupt();
diff --git a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
index 1e62160..cfcc648 100644
--- a/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
+++ b/server/mailet/mailets/src/test/java/org/apache/james/transport/mailets/remote/delivery/RemoteDeliveryTest.java
@@ -99,7 +99,7 @@ public class RemoteDeliveryTest {
@Before
public void setUp() throws ConfigurationException {
- MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
+ MailQueueFactory<? extends ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
mailQueue = queueFactory.createQueue(RemoteDeliveryConfiguration.DEFAULT_OUTGOING_QUEUE_NAME);
DNSService dnsService = mock(DNSService.class);
MemoryDomainList domainList = new MemoryDomainList(dnsService);
diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
index 0f82362..bf21d1e 100644
--- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
+++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
@@ -73,12 +73,12 @@ public abstract class SetMessagesOutboxFlagUpdateTest {
protected MailQueueFactory<MailQueue> noopMailQueueFactory = new MailQueueFactory<MailQueue>() {
@Override
- public Optional<MailQueue> getQueue(MailQueueName name) {
+ public Optional<MailQueue> getQueue(MailQueueName name, PrefetchCount prefetchCount) {
return Optional.of(createQueue(name));
}
@Override
- public MailQueue createQueue(MailQueueName name) {
+ public MailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
return new MailQueue() {
@Override
public void close() throws IOException {
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
index 613c506..b69d674 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/routes/MailRepositoriesRoutesTest.java
@@ -114,7 +114,7 @@ public class MailRepositoriesRoutesTest {
MemoryTaskManager taskManager = new MemoryTaskManager(new Hostname("foo"));
JsonTransformer jsonTransformer = new JsonTransformer();
- MailQueueFactory<ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
+ MailQueueFactory<? extends ManageableMailQueue> queueFactory = new MemoryMailQueueFactory(new RawMailQueueItemDecoratorFactory());
spoolQueue = queueFactory.createQueue(MailQueueFactory.SPOOL);
customQueue = queueFactory.createQueue(CUSTOM_QUEUE);
diff --git a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
index 994c7aa..02cbc0d 100644
--- a/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
+++ b/server/protocols/webadmin/webadmin-mailrepository/src/test/java/org/apache/james/webadmin/service/ReprocessingServiceTest.java
@@ -64,7 +64,7 @@ public class ReprocessingServiceTest {
private ReprocessingService reprocessingService;
private MemoryMailRepositoryStore mailRepositoryStore;
- private MailQueueFactory<ManageableMailQueue> queueFactory;
+ private MailQueueFactory<? extends ManageableMailQueue> queueFactory;
private FakeMail mail1;
private FakeMail mail2;
private FakeMail mail3;
diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java
index 3f0a5dc..35a4967 100644
--- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java
+++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueueFactory.java
@@ -22,11 +22,46 @@ package org.apache.james.queue.api;
import java.util.Optional;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* Factory for {@link MailQueue}
*/
public interface MailQueueFactory<T extends MailQueue> {
+ static PrefetchCount defaultPrefetchCount() {
+ return prefetchCount(5);
+ }
+
+ static PrefetchCount prefetchCount(int count) {
+ return new PrefetchCount(count);
+ }
+
+ /**
+ * {@link PrefetchCount} provides producers insights about what kind of load consumers expect.
+ * If you expect to consume the mailqueue with 10 concurrent workers, it's a good idea to
+ * ensure the producer will fill the stream with at least 10 elements.
+ */
+ class PrefetchCount {
+ private final int value;
+
+ @VisibleForTesting
+ PrefetchCount(int value) {
+ Preconditions.checkArgument(value >= 0, "only non-negative values are allowed");
+ this.value = value;
+ }
+
+ public int asInt() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "PrefetchCount = " + value;
+ }
+ }
+
/**
* {@link MailQueue} which is used for spooling the messages
*/
@@ -38,9 +73,17 @@ public interface MailQueueFactory<T extends MailQueue> {
* @param name
* @return queue
*/
- Optional<T> getQueue(MailQueueName name);
+ default Optional<T> getQueue(MailQueueName name) {
+ return getQueue(name, defaultPrefetchCount());
+ }
+
+ Optional<T> getQueue(MailQueueName name, PrefetchCount count);
+
+ default T createQueue(MailQueueName name) {
+ return createQueue(name, defaultPrefetchCount());
+ }
- T createQueue(MailQueueName name);
+ T createQueue(MailQueueName name, PrefetchCount count);
Set<MailQueueName> listCreatedMailQueues();
}
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java
new file mode 100644
index 0000000..c565603
--- /dev/null
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueFactoryTest.java
@@ -0,0 +1,26 @@
+package org.apache.james.queue.api;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class MailQueueFactoryTest {
+
+ @Test
+ void prefetchCountShouldNotBeNegative() {
+ Assertions.assertThatThrownBy(() -> new MailQueueFactory.PrefetchCount(-12))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void prefetchCountCouldBeZero() {
+ Assertions.assertThatCode(() -> new MailQueueFactory.PrefetchCount(0)).doesNotThrowAnyException();
+ }
+
+ @Test
+ void prefetchCountCouldBePositive() {
+ Assertions.assertThatCode(() -> new MailQueueFactory.PrefetchCount(12)).doesNotThrowAnyException();
+ }
+
+}
\ No newline at end of file
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java
index 5ae065f..fc671b8 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueFactoryContract.java
@@ -25,15 +25,15 @@ import javax.mail.MessagingException;
import org.junit.jupiter.api.Test;
-public interface ManageableMailQueueFactoryContract {
+public interface ManageableMailQueueFactoryContract<T extends ManageableMailQueue> {
MailQueueName NAME_1 = MailQueueName.of("name1");
- MailQueueFactory<ManageableMailQueue> getMailQueueFactory();
+ MailQueueFactory<T> getMailQueueFactory();
@Test
default void createMailQueueShouldNotConflictIfAlreadyExists() throws MessagingException {
- MailQueueFactory<ManageableMailQueue> mailQueueFactory = getMailQueueFactory();
+ MailQueueFactory<T> mailQueueFactory = getMailQueueFactory();
MailQueue firstCreation = mailQueueFactory.createQueue(NAME_1);
firstCreation.enQueue(Mails.defaultMail().name("name").build());
diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
index 0587620..f727bcd 100644
--- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
+++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileMailQueueFactory.java
@@ -76,12 +76,12 @@ public class FileMailQueueFactory implements MailQueueFactory<ManageableMailQueu
}
@Override
- public Optional<ManageableMailQueue> getQueue(MailQueueName name) {
+ public Optional<ManageableMailQueue> getQueue(MailQueueName name, PrefetchCount prefetchCount) {
return Optional.ofNullable(queues.get(name));
}
@Override
- public ManageableMailQueue createQueue(MailQueueName name) {
+ public ManageableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
return queues.computeIfAbsent(name, mailQueueName -> {
try {
return new FileCacheableMailQueue(mailQueueActionItemDecoratorFactory, fs.getFile("file://var/store/queue"), mailQueueName, sync);
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
index 5fa22ef..052b0d2 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/library/AbstractMailQueueFactory.java
@@ -96,13 +96,13 @@ public abstract class AbstractMailQueueFactory<T extends MailQueue> implements M
}
@Override
- public final synchronized Optional<T> getQueue(MailQueueName name) {
+ public final synchronized Optional<T> getQueue(MailQueueName name, PrefetchCount prefetchCount) {
return Optional.ofNullable(queues.get(name));
}
@Override
- public synchronized T createQueue(MailQueueName name) {
- return getQueue(name).orElseGet(() -> createAndRegisterQueue(name));
+ public synchronized T createQueue(MailQueueName name, PrefetchCount prefetchCount) {
+ return getQueue(name, prefetchCount).orElseGet(() -> createAndRegisterQueue(name));
}
private T createAndRegisterQueue(MailQueueName name) {
diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index e13cb10..da9e8b1 100644
--- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -58,7 +58,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
-public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQueue> {
+public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueFactory.MemoryCacheableMailQueue> {
private final ConcurrentHashMap<MailQueueName, MemoryCacheableMailQueue> mailQueues;
private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
@@ -78,12 +78,12 @@ public class MemoryMailQueueFactory implements MailQueueFactory<ManageableMailQu
}
@Override
- public Optional<ManageableMailQueue> getQueue(MailQueueName name) {
+ public Optional<MemoryCacheableMailQueue> getQueue(MailQueueName name, PrefetchCount count) {
return Optional.ofNullable(mailQueues.get(name));
}
@Override
- public MemoryCacheableMailQueue createQueue(MailQueueName name) {
+ public MemoryCacheableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) {
return mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory));
}
diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
index aa2e294..a50722b 100644
--- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
+++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueFactoryTest.java
@@ -21,12 +21,12 @@ package org.apache.james.queue.memory;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueueFactoryContract;
-import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.api.ManageableMailQueueFactoryContract;
import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
import org.junit.jupiter.api.BeforeEach;
-class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<ManageableMailQueue>, ManageableMailQueueFactoryContract {
+class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<MemoryMailQueueFactory.MemoryCacheableMailQueue>,
+ ManageableMailQueueFactoryContract<MemoryMailQueueFactory.MemoryCacheableMailQueue> {
MemoryMailQueueFactory memoryMailQueueFactory;
@@ -36,7 +36,7 @@ class MemoryCacheableMailQueueFactoryTest implements MailQueueFactoryContract<Ma
}
@Override
- public MailQueueFactory<ManageableMailQueue> getMailQueueFactory() {
+ public MailQueueFactory<MemoryMailQueueFactory.MemoryCacheableMailQueue> getMailQueueFactory() {
return memoryMailQueueFactory;
}
}
\ No newline at end of file
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 64c0c93..b22d850 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -30,6 +30,7 @@ import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
+import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;
@@ -39,13 +40,13 @@ import com.rabbitmq.client.Delivery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
class Dequeuer implements Closeable {
private static final boolean REQUEUE = true;
- private static final int EXECUTION_RATE = 5;
private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
@@ -84,14 +85,14 @@ class Dequeuer implements Closeable {
Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
MailReferenceSerializer serializer, MetricFactory metricFactory,
- MailQueueView mailQueueView) {
+ MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
this.mailLoader = mailLoader;
this.mailReferenceSerializer = serializer;
this.mailQueueView = mailQueueView;
this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString());
this.receiver = receiverProvider.createReceiver();
this.flux = this.receiver
- .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(EXECUTION_RATE))
+ .consumeManualAck(name.toWorkQueueName().asString(), new ConsumeOptions().qos(prefetchCount.asInt()))
.filter(getResponse -> getResponse.getBody() != null);
}
@@ -101,8 +102,8 @@ class Dequeuer implements Closeable {
}
Flux<? extends MailQueue.MailQueueItem> deQueue() {
- return flux.concatMap(this::loadItem)
- .concatMap(this::filterIfDeleted);
+ return flux.flatMapSequential(response -> loadItem(response).subscribeOn(Schedulers.elastic()))
+ .concatMap(item -> filterIfDeleted(item).subscribeOn(Schedulers.elastic()));
}
private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 3805e74..fc2f784 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -93,7 +93,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
this.configuration = configuration;
}
- RabbitMQMailQueue create(MailQueueName mailQueueName) {
+ RabbitMQMailQueue create(MailQueueName mailQueueName, PrefetchCount prefetchCount) {
MailQueueView mailQueueView = mailQueueViewFactory.create(mailQueueName);
mailQueueView.initialize(mailQueueName);
@@ -103,7 +103,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
new Enqueuer(mailQueueName, sender, mimeMessageStore, mailReferenceSerializer,
metricFactory, mailQueueView, clock),
new Dequeuer(mailQueueName, receiverProvider, mailLoader, mailReferenceSerializer,
- metricFactory, mailQueueView),
+ metricFactory, mailQueueView, prefetchCount),
mailQueueView,
decoratorFactory);
@@ -133,15 +133,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
}
@Override
- public Optional<RabbitMQMailQueue> getQueue(org.apache.james.queue.api.MailQueueName name) {
- return getQueueFromRabbitServer(MailQueueName.fromString(name.asString()));
+ public Optional<RabbitMQMailQueue> getQueue(org.apache.james.queue.api.MailQueueName name, PrefetchCount count) {
+ return getQueueFromRabbitServer(MailQueueName.fromString(name.asString()), count);
}
@Override
- public RabbitMQMailQueue createQueue(org.apache.james.queue.api.MailQueueName name) {
+ public RabbitMQMailQueue createQueue(org.apache.james.queue.api.MailQueueName name, PrefetchCount count) {
MailQueueName mailQueueName = MailQueueName.fromString(name.asString());
- return getQueueFromRabbitServer(mailQueueName)
- .orElseGet(() -> createQueueIntoRabbitServer(mailQueueName));
+ return getQueueFromRabbitServer(mailQueueName, count)
+ .orElseGet(() -> createQueueIntoRabbitServer(mailQueueName, count));
}
@Override
@@ -152,7 +152,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
.collect(ImmutableSet.toImmutableSet());
}
- private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName) {
+ private RabbitMQMailQueue createQueueIntoRabbitServer(MailQueueName mailQueueName, PrefetchCount prefetchCount) {
String exchangeName = mailQueueName.toRabbitExchangeName().asString();
Flux.concat(
sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
@@ -169,13 +169,13 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
.routingKey(EMPTY_ROUTING_KEY)))
.then()
.block();
- return privateFactory.create(mailQueueName);
+ return privateFactory.create(mailQueueName, prefetchCount);
}
- private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name) {
+ private Optional<RabbitMQMailQueue> getQueueFromRabbitServer(MailQueueName name, PrefetchCount prefetchCount) {
return mqManagementApi.listCreatedMailQueueNames()
.filter(name::equals)
- .map(privateFactory::create)
+ .map(queueName -> privateFactory.create(queueName, prefetchCount))
.findFirst();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org