You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/20 07:32:14 UTC

[james-project] 31/41: JAMES-2979 add test to ensure that dequeuing from the mailqueue should be done multiple elements at the same time

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5e582a5eb829ed4b0571bd8e04d0ead48bf3294f
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Thu Nov 14 14:01:11 2019 +0100

    JAMES-2979 add test to ensure that dequeuing from the mailqueue should be done multiple elements at the same time
---
 .../queue/activemq/ActiveMQMailQueueBlobTest.java  |  4 +++
 .../queue/activemq/ActiveMQMailQueueTest.java      |  6 +++-
 .../apache/james/queue/api/MailQueueContract.java  | 35 +++++++++++++++++++++-
 .../apache/james/queue/jms/JMSMailQueueTest.java   |  6 +++-
 4 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
index 1b021d0..b2c3cea 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueBlobTest.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
 import java.time.temporal.ChronoUnit;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.commons.io.FileUtils;
 import org.apache.james.filesystem.api.FileSystem;
@@ -65,6 +66,9 @@ public class ActiveMQMailQueueBlobTest implements DelayedManageableMailQueueCont
     public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
         fileSystem = new MyFileSystem();
         ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(0);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
         FileSystemBlobTransferPolicy policy = new FileSystemBlobTransferPolicy();
         policy.setFileSystem(fileSystem);
         policy.setDefaultUploadUrl(BASE_DIR);
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
index acfdbfb..609cef1 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueTest.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.activemq;
 import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
@@ -51,7 +52,10 @@ public class ActiveMQMailQueueTest implements DelayedManageableMailQueueContract
 
     @BeforeEach
     public void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(0);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
         RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
         MetricFactory metricFactory = metricTestSystem.getMetricFactory();
         GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index f1d57c8..cec9783 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import javax.mail.internet.MimeMessage;
@@ -49,12 +50,12 @@ import org.apache.mailet.Attribute;
 import org.apache.mailet.Mail;
 import org.apache.mailet.PerRecipientHeaders;
 import org.apache.mailet.base.test.FakeMail;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -510,6 +511,38 @@ public interface MailQueueContract {
             .hasSize(totalDequeuedMessages);
     }
 
+    @Test
+    default void dequeueShouldBeConcurrent() {
+        MailQueue testee = getMailQueue();
+        int NB_MAILS = 1000;
+        IntStream.range(0, NB_MAILS)
+            .forEach(Throwing.intConsumer(i -> testee.enQueue(defaultMail()
+                .name("name" + i)
+                .build())));
+
+        ConcurrentLinkedDeque<Mail> dequeuedMails = new ConcurrentLinkedDeque<>();
+
+        Flux.from(testee.deQueue())
+            .flatMap(item -> Mono.defer(() -> {
+                    dequeuedMails.add(item.getMail());
+                    try {
+                        Thread.sleep(100);
+                        item.done(true);
+                        return Mono.empty();
+                    } catch (MailQueue.MailQueueException | InterruptedException e) {
+                        return Mono.error(e);
+                    }
+                }).subscribeOn(Schedulers.elastic()), 1000
+            )
+            .subscribeOn(Schedulers.newSingle("foo"))
+            .subscribe();
+
+        Awaitility.await()
+            .atMost(org.awaitility.Duration.ONE_MINUTE)
+            .until(() -> dequeuedMails.size() >= NB_MAILS);
+
+    }
+
     class SerializableAttribute implements Serializable {
         private final String value;
 
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
index 8f92851..f436a1d 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.jms;
 import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
@@ -47,7 +48,10 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
 
     @BeforeEach
     void setUp(BrokerService broker, MailQueueMetricExtension.MailQueueMetricTestSystem metricTestSystem) {
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(0);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
         RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
         MetricFactory metricFactory = metricTestSystem.getMetricFactory();
         GaugeRegistry gaugeRegistry = metricTestSystem.getSpyGaugeRegistry();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org