You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/20 07:32:14 UTC
[james-project] 31/41: JAMES-2979 add test to ensure that dequeuing
from the mailqueue should be done multiple elements at the same time
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5e582a5eb829ed4b0571bd8e04d0ead48bf3294f
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Thu Nov 14 14:01:11 2019 +0100
JAMES-2979 add test to ensure that dequeuing from the mailqueue should be done multiple elements at the same time
---
.../queue/activemq/ActiveMQMailQueueBlobTest.java | 4 +++
.../queue/activemq/ActiveMQMailQueueTest.java | 6 +++-
.../apache/james/queue/api/MailQueueContract.java | 35 +++++++++++++++++++++-
.../apache/james/queue/jms/JMSMailQueueTest.java | 6 +++-
4 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
index 1b021d0..b2c3cea 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
import java.time.temporal.ChronoUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.io.FileUtils;
import org.apache.james.filesystem.api.FileSystem;
@@ -65,6 +66,9 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont
public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
fileSystem = new MyFileSystem();
ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setQueuePrefetch(0);
+ connectionFactory.setPrefetchPolicy(prefetchPolicy);
FileSystemBlobTransferPolicy policy = new FileSystemBlobTransferPolicy();
policy.setFileSystem(fileSystem);
policy.setDefaultUploadUrl(BASE_DIR);
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
index acfdbfb..609cef1 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.activemq;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
@@ -51,7 +52,10 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract
@BeforeEach
public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setQueuePrefetch(0);
+ connectionFactory.setPrefetchPolicy(prefetchPolicy);
RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
MetricFactory metricFactory = metricTestSystem.getMetricFactory();
GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index f1d57c8..cec9783 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.mail.internet.MimeMessage;
@@ -49,12 +50,12 @@ import org.apache.mailet.Attribute;
import org.apache.mailet.Mail;
import org.apache.mailet.PerRecipientHeaders;
import org.apache.mailet.base.test.FakeMail;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -510,6 +511,38 @@ public interface MailQueueContract {
.hasSize(totalDequeuedMessages);
}
+ @Test
+ default void dequeueShouldBeConcurrent() {
+ MailQueue testee = getMailQueue();
+ int NB_MAILS = 1000;
+ IntStream.range(0, NB_MAILS)
+ .forEach(Throwing.intConsumer(i -> testee.enQueue(defaultMail()
+ .name("name" + i)
+ .build())));
+
+ ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>();
+
+ Flux.from(testee.deQueue())
+ .flatMap(item -> Mono.defer(() -> {
+ dequeuedMails.add(item.getMail());
+ try {
+ Thread.sleep(100);
+ item.done(true);
+ return Mono.empty();
+ } catch (MailQueue.MailQueueException | InterruptedException e) {
+ return Mono.error(e);
+ }
+ }).subscribeOn(Schedulers.elastic()), 1000
+ )
+ .subscribeOn(Schedulers.newSingle("foo"))
+ .subscribe();
+
+ Awaitility.await()
+ .atMost(org.awaitility.Duration.ONE_MINUTE)
+ .until(() -> dequeuedMails.size() >= NB_MAILS);
+
+ }
+
class SerializableAttribute implements Serializable {
private final String value;
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
index 8f92851..f436a1d 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.jms;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.james.metrics.api.GaugeRegistry;
import org.apache.james.metrics.api.MetricFactory;
@@ -47,7 +48,10 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
@BeforeEach
void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setQueuePrefetch(0);
+ connectionFactory.setPrefetchPolicy(prefetchPolicy);
RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
MetricFactory metricFactory = metricTestSystem.getMetricFactory();
GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org