You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2019/06/14 16:32:51 UTC

[james-project] branch master updated (db793aa -> 30f6adb)

This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from db793aa  Merge remote-tracking branch 'benoit/mailetContext'
     new 6cfdc93  JAMES-2777 fix too strong expectation about task cancellation
     new 61a58a8  JAMES-2759 Update reactor-rabbitmq to 1.2.0
     new 7220b4e  JAMES-2786 unpause RabbitMQ container after each test to avoid next tests failure
     new 61b6bdc  JAMES-2786 DockerRabbitMQ use boolean instead of atomic boolean
     new cf0a390  JAMES-2786 RabbitMQ EventBus stop consume messages under heavy load
     new edce29a  JAMES-2786 limit the number of thread required for the concurrent stress test
     new 1487aa7  JAMES-2704 Fix randomness of RandomStoring mailet
     new d3908cd  JAMES-2786 Use a channel pool for EventBus
     new 30f6adb  Merge branch 'JAMES-2759-event-bus-stress-test'

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../james/backend/rabbitmq/DockerRabbitMQ.java     | 12 +++++--
 .../events/EventBusConcurrentTestContract.java     | 12 +++----
 .../james/mailbox/events/EventDispatcher.java      |  5 ++-
 .../james/mailbox/events/GroupRegistration.java    |  3 +-
 .../james/mailbox/events/RabbitMQEventBus.java     | 15 +++++++--
 .../james/mailbox/events/RabbitMQEventBusTest.java | 39 ++++++++++++++++++++++
 pom.xml                                            |  4 +--
 .../apache/james/smtp/SmtpRandomStoringTest.java   |  4 +--
 .../james/transport/mailets/RandomStoring.java     | 22 ++++++------
 .../src/main/java/org/apache/james/task/Task.java  |  2 +-
 .../apache/james/task/MemoryTaskManagerTest.java   |  1 +
 11 files changed, 91 insertions(+), 28 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 08/09: JAMES-2777 fix too strong expectation about task cancellation

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6cfdc93a153a6cd4b178010b7466b318dc76a80f
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Jun 12 16:13:18 2019 +0200

    JAMES-2777 fix too strong expectation about task cancellation
---
 server/task/src/main/java/org/apache/james/task/Task.java               | 2 +-
 .../task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/server/task/src/main/java/org/apache/james/task/Task.java b/server/task/src/main/java/org/apache/james/task/Task.java
index 82b7670..0c79ec0 100644
--- a/server/task/src/main/java/org/apache/james/task/Task.java
+++ b/server/task/src/main/java/org/apache/james/task/Task.java
@@ -74,7 +74,7 @@ public interface Task {
      *
      * @return Return true if fully migrated. Returns false otherwise.
      */
-    Result run();
+    Result run() throws InterruptedException;
 
 
     default String type() {
diff --git a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
index 3533eb3..691bbc9 100644
--- a/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/MemoryTaskManagerTest.java
@@ -92,6 +92,7 @@ class MemoryTaskManagerTest {
         TaskId id = memoryTaskManager.submit(() -> {
             waitForTaskToBeLaunched.countDown();
             await(task1Latch);
+            Thread.sleep(1);
             count.incrementAndGet();
             return Task.Result.COMPLETED;
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/09: JAMES-2759 Update reactor-rabbitmq to 1.2.0

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 61a58a8f18c6afdeb14a6f2de50c94497de25251
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri May 10 11:06:02 2019 +0700

    JAMES-2759 Update reactor-rabbitmq to 1.2.0
    
    reactor-rabbitmq is used by the RabbitMQ eventBus and the RabbitMQ queue
    
    Recent 1.2.0 version ships loads of bigfixes that can help us improve build overall stability: https://github.com/reactor/reactor-rabbitmq/releases
    
     - Channels are leaking when CancelCallback is used
     - Allow customization of channel creation for sending
     - Retry automatically on ack/nack failure
     - Enforce queue specification semantics for server-named, non-durable, exclusive, auto-delete queue creation
---
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index edc4a31..d4905b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -670,7 +670,7 @@
             <dependency>
                 <groupId>io.projectreactor</groupId>
                 <artifactId>reactor-bom</artifactId>
-                <version>Californium-SR3</version>
+                <version>Californium-SR9</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
@@ -2140,7 +2140,7 @@
             <dependency>
                 <groupId>io.projectreactor.rabbitmq</groupId>
                 <artifactId>reactor-rabbitmq</artifactId>
-                <version>1.0.0.RELEASE</version>
+                <version>1.2.0.RELEASE</version>
             </dependency>
             <dependency>
                 <groupId>javax.activation</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 09/09: Merge branch 'JAMES-2759-event-bus-stress-test'

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 30f6adb506dcb1b4be9f06539055ad3d4b63e4e8
Merge: 6cfdc93 d3908cd
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Jun 14 18:32:16 2019 +0200

    Merge branch 'JAMES-2759-event-bus-stress-test'

 .../james/backend/rabbitmq/DockerRabbitMQ.java     | 12 +++++--
 .../events/EventBusConcurrentTestContract.java     | 12 +++----
 .../james/mailbox/events/EventDispatcher.java      |  5 ++-
 .../james/mailbox/events/GroupRegistration.java    |  3 +-
 .../james/mailbox/events/RabbitMQEventBus.java     | 15 +++++++--
 .../james/mailbox/events/RabbitMQEventBusTest.java | 39 ++++++++++++++++++++++
 pom.xml                                            |  4 +--
 .../apache/james/smtp/SmtpRandomStoringTest.java   |  4 +--
 .../james/transport/mailets/RandomStoring.java     | 22 ++++++------
 9 files changed, 89 insertions(+), 27 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 07/09: JAMES-2786 Use a channel pool for EventBus

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d3908cda7d7fa8a767432a7019b883ff1fa49548
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Jun 14 10:52:04 2019 +0200

    JAMES-2786 Use a channel pool for EventBus
---
 .../james/mailbox/events/EventBusConcurrentTestContract.java | 12 ++++++------
 .../org/apache/james/mailbox/events/GroupRegistration.java   |  1 +
 .../org/apache/james/mailbox/events/RabbitMQEventBus.java    | 12 +++++++++++-
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
index 2f6e9cb..738969f 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusConcurrentTestContract.java
@@ -74,7 +74,7 @@ public interface EventBusConcurrentTestContract {
             int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -97,7 +97,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventBus = 1;
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -127,7 +127,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventDeliveredByKeys = totalKeyListenerRegistrations * TOTAL_DISPATCH_OPERATIONS;
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -159,7 +159,7 @@ public interface EventBusConcurrentTestContract {
             int totalGlobalRegistrations = 3; // GroupA + GroupB + GroupC
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, NO_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -187,7 +187,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventBus = 2; // eventBus1 + eventBus2
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
@@ -223,7 +223,7 @@ public interface EventBusConcurrentTestContract {
             int totalEventDeliveredByKeys = totalKeyListenerRegistrations * totalEventBus * TOTAL_DISPATCH_OPERATIONS;
 
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus().dispatch(EVENT, ALL_KEYS).block())
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(FIVE_SECONDS);
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 5164c2d..bbdd46b 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -140,6 +140,7 @@ class GroupRegistration implements Registration {
         int currentRetryCount = getRetryCount(acknowledgableDelivery);
 
         return delayGenerator.delayIfHaveTo(currentRetryCount)
+            .publishOn(Schedulers.elastic())
             .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
             .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
             .then(Mono.fromRunnable(acknowledgableDelivery::ack))
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 8ec96d6..784baab 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -34,11 +34,15 @@ import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
+import reactor.rabbitmq.ChannelPool;
+import reactor.rabbitmq.ChannelPoolFactory;
+import reactor.rabbitmq.ChannelPoolOptions;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
 import reactor.rabbitmq.SenderOptions;
 
 public class RabbitMQEventBus implements EventBus, Startable {
+    private static final int MAX_CHANNELS_NUMBER = 5;
     private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";
     static final String MAILBOX_EVENT = "mailboxEvent";
     static final String MAILBOX_EVENT_EXCHANGE_NAME = MAILBOX_EVENT + "-exchange";
@@ -54,6 +58,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
     private volatile boolean isRunning;
     private volatile boolean isStopping;
+    private ChannelPool channelPool;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
     EventDispatcher eventDispatcher;
@@ -77,7 +82,11 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
     public void start() {
         if (!isRunning && !isStopping) {
-            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono)
+            this.channelPool = ChannelPoolFactory.createChannelPool(
+                    connectionMono,
+                    new ChannelPoolOptions().maxCacheSize(MAX_CHANNELS_NUMBER)
+            );
+            sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono).channelPool(channelPool)
                 .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel))));
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, connectionMono, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor);
@@ -97,6 +106,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
             isRunning = false;
             groupRegistrationHandler.stop();
             keyRegistrationHandler.stop();
+            channelPool.close();
             sender.close();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/09: JAMES-2786 unpause RabbitMQ container after each test to avoid next tests failure

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7220b4e66b166fe455d3955951348ac8553676a1
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Fri Jun 7 15:42:55 2019 +0200

    JAMES-2786 unpause RabbitMQ container after each test to avoid next tests failure
---
 .../java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java    | 8 +++++++-
 .../org/apache/james/mailbox/events/RabbitMQEventBusTest.java     | 5 +++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
index 9843fd3..fa278a8 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
@@ -25,6 +25,7 @@ import java.net.URISyntaxException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.james.util.docker.Images;
@@ -66,6 +67,7 @@ public class DockerRabbitMQ {
     private final String nodeName;
     private final String rabbitHostName;
     private final String hostNameSuffix;
+    private final AtomicBoolean paused;
 
     public static DockerRabbitMQ withCookieAndHostName(String hostNamePrefix, String clusterIdentity, String erlangCookie, Network network) {
         return new DockerRabbitMQ(Optional.ofNullable(hostNamePrefix), Optional.ofNullable(clusterIdentity), Optional.ofNullable(erlangCookie), Optional.of(network));
@@ -77,6 +79,7 @@ public class DockerRabbitMQ {
 
     @SuppressWarnings("resource")
     private DockerRabbitMQ(Optional<String> hostNamePrefix, Optional<String> clusterIdentity, Optional<String> erlangCookie, Optional<Network> net) {
+        paused = new AtomicBoolean(false);
         this.hostNameSuffix = clusterIdentity.orElse(UUID.randomUUID().toString());
         this.rabbitHostName = hostName(hostNamePrefix);
         this.container = new GenericContainer<>(Images.RABBITMQ)
@@ -237,10 +240,13 @@ public class DockerRabbitMQ {
 
     public void pause() {
         DockerClientFactory.instance().client().pauseContainerCmd(container.getContainerId()).exec();
+        paused.set(true);
     }
 
     public void unpause() {
-        DockerClientFactory.instance().client().unpauseContainerCmd(container.getContainerId()).exec();
+        if (paused.compareAndSet(true, false)) {
+            DockerClientFactory.instance().client().unpauseContainerCmd(container.getContainerId()).exec();
+        }
     }
 
     public RabbitMQConnectionFactory createRabbitConnectionFactory() throws URISyntaxException {
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 49f0f15..66ae6d3 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -299,6 +299,11 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             rabbitManagementAPI = rabbitMQExtension.managementAPI();
         }
 
+        @AfterEach
+        void tearDown() {
+            rabbitMQExtension.getRabbitMQ().unpause();
+        }
+
         @Nested
         class SingleEventBus {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 06/09: JAMES-2704 Fix randomness of RandomStoring mailet

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1487aa7c3e3b46bd36a6b003aabcacbcc86a341b
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Thu Jun 13 14:53:11 2019 +0200

    JAMES-2704 Fix randomness of RandomStoring mailet
---
 .../apache/james/smtp/SmtpRandomStoringTest.java   |  4 ++--
 .../james/transport/mailets/RandomStoring.java     | 22 +++++++++++-----------
 2 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/server/mailet/integration-testing/src/test/java/org/apache/james/smtp/SmtpRandomStoringTest.java b/server/mailet/integration-testing/src/test/java/org/apache/james/smtp/SmtpRandomStoringTest.java
index 81f8958..08b6ab5 100644
--- a/server/mailet/integration-testing/src/test/java/org/apache/james/smtp/SmtpRandomStoringTest.java
+++ b/server/mailet/integration-testing/src/test/java/org/apache/james/smtp/SmtpRandomStoringTest.java
@@ -142,7 +142,7 @@ public class SmtpRandomStoringTest {
         SMTPMessageSender authenticatedSmtpConnection = messageSender.connect(LOCALHOST_IP, jamesServer.getProbe(SmtpGuiceProbe.class).getSmtpPort())
                 .authenticate(FROM, PASSWORD);
 
-        IntStream.range(1, numberOfMails)
+        IntStream.range(0, numberOfMails)
             .forEach(Throwing.intConsumer(index ->
                 authenticatedSmtpConnection
                     .sendMessage(buildMail("Message " + index))).sneakyThrow());
@@ -155,7 +155,7 @@ public class SmtpRandomStoringTest {
         awaitAtMostTenSeconds
             .untilAsserted(() -> checkMailboxesHaveBeenFilled(connections, numberOfMails));
 
-        connections.forEach(Throwing.consumer(IMAPMessageReader::close));
+        connections.forEach(Throwing.consumer(IMAPMessageReader::close).sneakyThrow());
     }
 
     private IMAPMessageReader createIMAPConnection(String username) {
diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
index 783fb56..eabcf60 100644
--- a/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
+++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/mailets/RandomStoring.java
@@ -21,12 +21,10 @@ package org.apache.james.transport.mailets;
 
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 import javax.inject.Inject;
 import javax.mail.MessagingException;
@@ -59,13 +57,13 @@ public class RandomStoring extends GenericMailet {
     private final Mono<List<ReroutingInfos>> reroutingInfos;
     private final UsersRepository usersRepository;
     private final MailboxManager mailboxManager;
-    private final Iterator<Integer> randomRecipientsNumbers;
+    private final Supplier<Integer> randomRecipientsNumbers;
 
     @Inject
     public RandomStoring(UsersRepository usersRepository, MailboxManager mailboxManager) {
         this.usersRepository = usersRepository;
         this.mailboxManager = mailboxManager;
-        this.randomRecipientsNumbers = ThreadLocalRandom.current().ints(MIN_NUMBER_OF_RECIPIENTS, MAX_NUMBER_OF_RECIPIENTS + 1).boxed().iterator();
+        this.randomRecipientsNumbers = () -> ThreadLocalRandom.current().nextInt(MIN_NUMBER_OF_RECIPIENTS, MAX_NUMBER_OF_RECIPIENTS + 1);
         this.reroutingInfos = Mono.fromCallable(this::retrieveReroutingInfos).cache(CACHE_DURATION);
     }
 
@@ -93,11 +91,14 @@ public class RandomStoring extends GenericMailet {
 
     private Collection<ReroutingInfos> generateRandomMailboxes() {
         List<ReroutingInfos> reroutingInfos = this.reroutingInfos.block();
-        Collections.shuffle(reroutingInfos);
 
-        return reroutingInfos
-            .stream()
-            .limit(randomRecipientsNumbers.next())
+        // Replaces Collections.shuffle() which has a too poor statistical distribution
+        return ThreadLocalRandom
+            .current()
+            .ints(0, reroutingInfos.size())
+            .mapToObj(reroutingInfos::get)
+            .distinct()
+            .limit(randomRecipientsNumbers.get())
             .collect(Guavate.toImmutableSet());
     }
 
@@ -105,8 +106,7 @@ public class RandomStoring extends GenericMailet {
         return Streams.stream(usersRepository.list())
             .map(User::fromUsername)
             .flatMap(this::buildReRoutingInfos)
-            .distinct()
-            .collect(Collectors.toList());
+            .collect(Guavate.toImmutableList());
     }
 
     private Stream<ReroutingInfos> buildReRoutingInfos(User user) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/09: JAMES-2786 DockerRabbitMQ use boolean instead of atomic boolean

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 61b6bdcfc2b2d8dbb9f7bb3cee5c5c9e1a404502
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Jun 12 10:38:13 2019 +0700

    JAMES-2786 DockerRabbitMQ use boolean instead of atomic boolean
---
 .../org/apache/james/backend/rabbitmq/DockerRabbitMQ.java  | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
index fa278a8..e786d6c 100644
--- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
+++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/DockerRabbitMQ.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.james.util.docker.Images;
@@ -67,7 +66,7 @@ public class DockerRabbitMQ {
     private final String nodeName;
     private final String rabbitHostName;
     private final String hostNameSuffix;
-    private final AtomicBoolean paused;
+    private boolean paused;
 
     public static DockerRabbitMQ withCookieAndHostName(String hostNamePrefix, String clusterIdentity, String erlangCookie, Network network) {
         return new DockerRabbitMQ(Optional.ofNullable(hostNamePrefix), Optional.ofNullable(clusterIdentity), Optional.ofNullable(erlangCookie), Optional.of(network));
@@ -79,7 +78,7 @@ public class DockerRabbitMQ {
 
     @SuppressWarnings("resource")
     private DockerRabbitMQ(Optional<String> hostNamePrefix, Optional<String> clusterIdentity, Optional<String> erlangCookie, Optional<Network> net) {
-        paused = new AtomicBoolean(false);
+        paused = false;
         this.hostNameSuffix = clusterIdentity.orElse(UUID.randomUUID().toString());
         this.rabbitHostName = hostName(hostNamePrefix);
         this.container = new GenericContainer<>(Images.RABBITMQ)
@@ -239,13 +238,16 @@ public class DockerRabbitMQ {
     }
 
     public void pause() {
-        DockerClientFactory.instance().client().pauseContainerCmd(container.getContainerId()).exec();
-        paused.set(true);
+        if (!paused) {
+            DockerClientFactory.instance().client().pauseContainerCmd(container.getContainerId()).exec();
+            paused = true;
+        }
     }
 
     public void unpause() {
-        if (paused.compareAndSet(true, false)) {
+        if (paused) {
             DockerClientFactory.instance().client().unpauseContainerCmd(container.getContainerId()).exec();
+            paused = false;
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 05/09: JAMES-2786 limit the number of thread required for the concurrent stress test

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit edce29a4a9154ba0f92e2f0d4bc14bf5f7b6ff2c
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Jun 12 18:16:06 2019 +0200

    JAMES-2786 limit the number of thread required for the concurrent stress test
---
 .../main/java/org/apache/james/mailbox/events/EventDispatcher.java  | 6 ++----
 .../java/org/apache/james/mailbox/events/GroupRegistration.java     | 2 +-
 .../java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java  | 3 +--
 3 files changed, 4 insertions(+), 7 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 5cb24aa..8f31102 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -82,6 +82,7 @@ class EventDispatcher {
             .concat(
                 dispatchToLocalListeners(event, keys),
                 dispatchToRemoteListeners(serializeEvent(event), keys))
+            .subscribeOn(Schedulers.elastic())
             .then()
             .doOnSuccess(any -> dispatchCount.incrementAndGet())
             .subscribeWith(MonoProcessor.create());
@@ -89,7 +90,6 @@ class EventDispatcher {
 
     private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> keys) {
         return Flux.fromIterable(keys)
-            .subscribeOn(Schedulers.elastic())
             .flatMap(key -> localListenerRegistry.getLocalMailboxListeners(key)
                 .map(listener -> Tuples.of(key, listener)))
             .filter(pair -> pair.getT2().getExecutionMode().equals(MailboxListener.ExecutionMode.SYNCHRONOUS))
@@ -127,9 +127,7 @@ class EventDispatcher {
         Stream<OutboundMessage> outboundMessages = routingKeys
             .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, serializedEvent));
 
-        return sender.send(Flux.fromStream(outboundMessages))
-            .publishOn(Schedulers.elastic())
-            .doOnError(th -> th.printStackTrace());
+        return sender.send(Flux.fromStream(outboundMessages));
     }
 
     private byte[] serializeEvent(Event event) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index 985da0a..5164c2d 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -140,7 +140,7 @@ class GroupRegistration implements Registration {
         int currentRetryCount = getRetryCount(acknowledgableDelivery);
 
         return delayGenerator.delayIfHaveTo(currentRetryCount)
-            .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))).publishOn(Schedulers.elastic()))
+            .flatMap(any -> Mono.fromRunnable(Throwing.runnable(() -> runListener(event))))
             .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
             .then(Mono.fromRunnable(acknowledgableDelivery::ack))
             .subscribeWith(MonoProcessor.create())
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index f94b960..eb285d0 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -186,7 +186,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
         EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
 
-        @Disabled("consuming too many threads")
         @Test
         void rabbitMQEventBusCannotHandleHugeDispatchingOperations() throws Exception {
             EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
@@ -199,7 +198,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
             int totalDispatchOperations = threadCount * operationCount;
             eventBus = (RabbitMQEventBus) eventBus();
             ConcurrentTestRunner.builder()
-                .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT, NO_KEYS))
+                .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT, NO_KEYS).block())
                 .threadCount(threadCount)
                 .operationCount(operationCount)
                 .runSuccessfullyWithin(Duration.ofMinutes(10));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 04/09: JAMES-2786 RabbitMQ EventBus stop consume messages under heavy load

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cf0a390afd8b62daf7719c2b9233d03f34c0f9aa
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Wed Jun 5 15:16:56 2019 +0700

    JAMES-2786 RabbitMQ EventBus stop consume messages under heavy load
---
 .../james/mailbox/events/EventDispatcher.java      |  7 ++++-
 .../james/mailbox/events/RabbitMQEventBus.java     |  3 +-
 .../james/mailbox/events/RabbitMQEventBusTest.java | 35 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
index 09c777e..5cb24aa 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/EventDispatcher.java
@@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXC
 
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 
 import org.apache.james.event.json.EventSerializer;
@@ -57,6 +58,7 @@ class EventDispatcher {
     private final LocalListenerRegistry localListenerRegistry;
     private final AMQP.BasicProperties basicProperties;
     private final MailboxListenerExecutor mailboxListenerExecutor;
+    final AtomicInteger dispatchCount = new AtomicInteger();
 
     EventDispatcher(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) {
         this.eventSerializer = eventSerializer;
@@ -81,6 +83,7 @@ class EventDispatcher {
                 dispatchToLocalListeners(event, keys),
                 dispatchToRemoteListeners(serializeEvent(event), keys))
             .then()
+            .doOnSuccess(any -> dispatchCount.incrementAndGet())
             .subscribeWith(MonoProcessor.create());
     }
 
@@ -124,7 +127,9 @@ class EventDispatcher {
         Stream<OutboundMessage> outboundMessages = routingKeys
             .map(routingKey -> new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, routingKey.asString(), basicProperties, serializedEvent));
 
-        return sender.send(Flux.fromStream(outboundMessages));
+        return sender.send(Flux.fromStream(outboundMessages))
+            .publishOn(Schedulers.elastic())
+            .doOnError(th -> th.printStackTrace());
     }
 
     private byte[] serializeEvent(Event event) {
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
index 041ded3..8ec96d6 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java
@@ -32,6 +32,7 @@ import org.apache.james.metrics.api.MetricFactory;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Connection;
+
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.RabbitFlux;
 import reactor.rabbitmq.Sender;
@@ -55,7 +56,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
     private volatile boolean isStopping;
     private GroupRegistrationHandler groupRegistrationHandler;
     private KeyRegistrationHandler keyRegistrationHandler;
-    private EventDispatcher eventDispatcher;
+    EventDispatcher eventDispatcher;
     private Sender sender;
 
     @Inject
diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 66ae6d3..f94b960 100644
--- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.james.mailbox.events;
 
+import static com.jayway.awaitility.Awaitility.await;
 import static org.apache.james.backend.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backend.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backend.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backend.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backend.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backend.rabbitmq.Constants.NO_ARGUMENTS;
+import static org.apache.james.mailbox.events.EventBusConcurrentTestContract.newCountingListener;
+import static org.apache.james.mailbox.events.EventBusConcurrentTestContract.totalEventsReceived;
 import static org.apache.james.mailbox.events.EventBusTestFixture.ALL_GROUPS;
 import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT;
 import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A;
@@ -47,6 +50,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
@@ -70,6 +74,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.ImmutableList;
 import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
@@ -181,6 +186,36 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
     class ConcurrentTest implements EventBusConcurrentTestContract.MultiEventBusConcurrentContract,
         EventBusConcurrentTestContract.SingleEventBusConcurrentContract {
 
+        @Disabled("consuming too many threads")
+        @Test
+        void rabbitMQEventBusCannotHandleHugeDispatchingOperations() throws Exception {
+            EventBusTestFixture.MailboxListenerCountingSuccessfulExecution countingListener1 = newCountingListener();
+
+            eventBus().register(countingListener1, new EventBusTestFixture.GroupA());
+            int totalGlobalRegistrations = 1; // GroupA + GroupB + GroupC
+
+            int threadCount = 10;
+            int operationCount = 10000;
+            int totalDispatchOperations = threadCount * operationCount;
+            eventBus = (RabbitMQEventBus) eventBus();
+            ConcurrentTestRunner.builder()
+                .operation((threadNumber, operationNumber) -> eventBus.dispatch(EVENT, NO_KEYS))
+                .threadCount(threadCount)
+                .operationCount(operationCount)
+                .runSuccessfullyWithin(Duration.ofMinutes(10));
+
+            // there is a moment when RabbitMQ EventBus consumed amount of messages, then it will stop to consume more
+            await()
+                .pollInterval(com.jayway.awaitility.Duration.FIVE_SECONDS)
+                .timeout(com.jayway.awaitility.Duration.TEN_MINUTES).until(() -> {
+                    int totalEventsReceived = totalEventsReceived(ImmutableList.of(countingListener1));
+                    System.out.println("event received: " + totalEventsReceived);
+                    System.out.println("dispatching count: " + eventBus.eventDispatcher.dispatchCount.get());
+                    assertThat(totalEventsReceived)
+                        .isEqualTo(totalGlobalRegistrations * totalDispatchOperations);
+                });
+        }
+
         @Override
         public EventBus eventBus3() {
             return eventBus3;


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org