You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/06/16 04:13:28 UTC

[james-project] branch master updated (dda8b7a -> 4daea75)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from dda8b7a  [REFACTORING] Replacement should not require regex in AuthCmdHandler
     new e217e64  JAMES-3599 GroupContract::groupDeliveryShouldNotExceedRate is subject to data races
     new 8b501cc  JAMES-3599 GroupContract::registerShouldNotDispatchPastEventsForGroups needs a grade period
     new c6e63d9  JAMES-3599 GroupRegistration : nack is blocking
     new f7a91cf  JAMES-3599 Group execution: execute listeners together
     new 4daea75  JAMES-3599 Upgrade instructions

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../rabbitmq/ReactorRabbitMQChannelPool.java       |  18 ++++
 .../org/apache/james/events/GroupContract.java     |  10 +-
 .../org/apache/james/events/GroupRegistration.java |  20 ++--
 .../james/events/GroupRegistrationHandler.java     | 101 ++++++++++++++++++++-
 .../org/apache/james/events/RabbitMQEventBus.java  |   4 +-
 .../apache/james/events/RabbitMQEventBusTest.java  |   4 +-
 upgrade-instructions.md                            |  22 ++++-
 7 files changed, 159 insertions(+), 20 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/05: JAMES-3599 Upgrade instructions

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4daea75aa686e5b6db86420f985a86b63ab6ab9c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 14 15:28:13 2021 +0700

    JAMES-3599 Upgrade instructions
---
 upgrade-instructions.md | 22 +++++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)

diff --git a/upgrade-instructions.md b/upgrade-instructions.md
index 1969894..46fae99 100644
--- a/upgrade-instructions.md
+++ b/upgrade-instructions.md
@@ -17,7 +17,27 @@ Changes to apply between 3.5.x and 3.6.x will be reported here.
 Change list:
 
  - [Drop Cassandra schema version prior version 8](#drop-cassandra-schema-version-prior-version-8)
- - [Adopt UnboundID as a LDAP library](#drop-cassandra-schema-version-prior-version-8)
+ - [Adopt UnboundID as a LDAP library](#adopt-unboundid-as-a-ldap-library)
+ - [Review the architecture of the RabbitMQ event bus](#review-the-architecture-of-the-rabbitmq-event-bus)
+ 
+### Review the architecture of the RabbitMQ event bus
+
+Date 14/06/2021
+
+JIRA: https://issues.apache.org/jira/projects/JAMES/issues/JAMES-3599
+
+Impacted products: Distributed James server
+
+We now group listeners execution whenever possible. This minimizes:
+                                                    
+ - The count of events to deserialize (one for all groups)
+ - The count of ACKs to perform
+
+Note that retries are still performed on a per-group basis.
+
+One need, after a rolling upgrade to unbind group queues from the primary exchange. Group queues can be identified by
+their `mailboxEvent-workQueue-` prefix, and the primary exchange is named `mailboxEvent-exchange`. These operations can
+easily be performed via the rabbitMQ management web interface.
  
 ### Adopt UnboundID as a LDAP library
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/05: JAMES-3599 Group execution: execute listeners together

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f7a91cfb727d29f16e01570e0a86caa7adc0dfab
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 14 15:15:12 2021 +0700

    JAMES-3599 Group execution: execute listeners together
    
    This minimizes:
    
     - The count of events to deserialize (one for all groups)
     - The count of ACKs to perform
    
    This enables potentially ordering of execution upon the happy case.
    
    Note that retries are still performed on a per-group basis.
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       |  18 ++++
 .../org/apache/james/events/GroupRegistration.java |  10 +-
 .../james/events/GroupRegistrationHandler.java     | 101 ++++++++++++++++++++-
 .../org/apache/james/events/RabbitMQEventBus.java  |   4 +-
 .../apache/james/events/RabbitMQEventBusTest.java  |   4 +-
 5 files changed, 129 insertions(+), 8 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 4e74850..ab6f673 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -276,6 +276,24 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
             .then();
     }
 
+    public Mono<Void> createWorkQueue(QueueSpecification queueSpecification) {
+        Preconditions.checkArgument(queueSpecification.getName() != null, "WorkQueue pattern do not make sense for unnamed queues");
+
+        return Mono.using(this::createSender,
+            managementSender -> managementSender.declareQueue(queueSpecification),
+            Sender::close)
+            .onErrorResume(
+                e -> e instanceof ShutdownSignalException
+                    && e.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue"),
+                e -> {
+                    LOGGER.warn("{} already exists without dead-letter setup. Dead lettered messages to it will be lost. " +
+                            "To solve this, re-create the queue with the x-dead-letter-exchange argument set up.",
+                        queueSpecification.getName());
+                    return Mono.empty();
+                })
+            .then();
+    }
+
     private void invalidateObject(Channel channel) {
         try {
             pool.invalidateObject(channel);
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 5b5ce39..c3ce6e5 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -21,7 +21,6 @@ package org.apache.james.events;
 
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
 import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
@@ -43,7 +42,6 @@ import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
@@ -139,8 +137,7 @@ class GroupRegistration implements Registration {
 
         return deserializeEvent(eventAsBytes)
             .flatMap(event -> delayGenerator.delayIfHaveTo(currentRetryCount)
-                .flatMap(any -> runListener(event))
-                .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
+                .flatMap(any -> runListenerReliably(currentRetryCount, event))
                 .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic())))
             .onErrorResume(e -> {
                 LOGGER.error("Unable to process delivery for group {}", group, e);
@@ -150,6 +147,11 @@ class GroupRegistration implements Registration {
             });
     }
 
+    public Mono<Void> runListenerReliably(int currentRetryCount, Event event) {
+        return runListener(event)
+            .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable));
+    }
+
     private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
         return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8)))
             .subscribeOn(Schedulers.parallel());
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index cb224d4..88a2e09 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -19,16 +19,51 @@
 
 package org.apache.james.events;
 
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
+import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
+import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;
+
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Preconditions;
 
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
+import reactor.util.retry.Retry;
 
 class GroupRegistrationHandler {
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class);
+
+    private final GroupRegistration.WorkQueueName queueName;
+
+    public static class GroupRegistrationHandlerGroup extends Group {
+
+    }
+
+    static final Group GROUP = new GroupRegistrationHandlerGroup();
+
     private final NamingStrategy namingStrategy;
     private final Map<Group, GroupRegistration> groupRegistrations;
     private final EventSerializer eventSerializer;
@@ -38,10 +73,13 @@ class GroupRegistrationHandler {
     private final RetryBackoffConfiguration retryBackoff;
     private final EventDeadLetters eventDeadLetters;
     private final ListenerExecutor listenerExecutor;
+    private final EventBusId eventBusId;
+    private Optional<Receiver> receiver;
+    private Optional<Disposable> consumer;
 
     GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider,
                              RetryBackoffConfiguration retryBackoff,
-                             EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor) {
+                             EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor, EventBusId eventBusId) {
         this.namingStrategy = namingStrategy;
         this.eventSerializer = eventSerializer;
         this.channelPool = channelPool;
@@ -50,7 +88,11 @@ class GroupRegistrationHandler {
         this.retryBackoff = retryBackoff;
         this.eventDeadLetters = eventDeadLetters;
         this.listenerExecutor = listenerExecutor;
+        this.eventBusId = eventBusId;
         this.groupRegistrations = new ConcurrentHashMap<>();
+        this.queueName = namingStrategy.workQueue(GROUP);
+        this.consumer = Optional.empty();
+        this.receiver = Optional.empty();
     }
 
     GroupRegistration retrieveGroupRegistration(Group group) {
@@ -58,11 +100,68 @@ class GroupRegistrationHandler {
             .orElseThrow(() -> new GroupRegistrationNotFound(group));
     }
 
+    public void start() {
+        channelPool.createWorkQueue(
+            QueueSpecification.queue(queueName.asString())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(deadLetterQueue(namingStrategy.deadLetterExchange())),
+            BindingSpecification.binding()
+                .exchange(namingStrategy.exchange())
+                .queue(queueName.asString())
+                .routingKey(EMPTY_ROUTING_KEY))
+            .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
+            .block();
+
+        this.receiver = Optional.of(receiverProvider.createReceiver());
+        this.consumer = Optional.of(consumeWorkQueue());
+    }
+
+    private Disposable consumeWorkQueue() {
+        Preconditions.checkState(receiver.isPresent());
+        return receiver.get().consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+            .publishOn(Schedulers.parallel())
+            .filter(delivery -> Objects.nonNull(delivery.getBody()))
+            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+    }
+
+    private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
+        byte[] eventAsBytes = acknowledgableDelivery.getBody();
+
+        return deserializeEvent(eventAsBytes)
+            .flatMapIterable(aa -> groupRegistrations.values()
+                .stream()
+                .map(group -> Pair.of(group, aa))
+                .collect(Guavate.toImmutableList()))
+            .flatMap(event -> event.getLeft().runListenerReliably(DEFAULT_RETRY_COUNT, event.getRight()))
+                .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic()))
+            .then()
+            .onErrorResume(e -> {
+                LOGGER.error("Unable to process delivery for group {}", GROUP, e);
+                return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE))
+                    .subscribeOn(Schedulers.elastic())
+                    .then();
+            });
+    }
+
+    private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
+        return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8)))
+            .subscribeOn(Schedulers.parallel());
+    }
+
     void stop() {
         groupRegistrations.values().forEach(GroupRegistration::unregister);
+        consumer.ifPresent(Disposable::dispose);
+        receiver.ifPresent(Receiver::close);
     }
 
     Registration register(EventListener.ReactiveEventListener listener, Group group) {
+        if (groupRegistrations.isEmpty()) {
+            start();
+        }
         return groupRegistrations
             .compute(group, (groupToRegister, oldGroupRegistration) -> {
                 if (oldGroupRegistration != null) {
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index ad50412..74b092d 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -83,7 +83,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
-            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId);
             eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
 
             eventDispatcher.start();
@@ -98,7 +98,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
-            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId);
             eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
 
             keyRegistrationHandler.declareQueue();
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
index ed68d5a..525aa76 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
@@ -138,7 +138,9 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus2.stop();
         eventBus3.stop();
         eventBusWithKeyHandlerNotStarted.stop();
-        ALL_GROUPS.stream()
+        Stream.concat(
+            ALL_GROUPS.stream(),
+            Stream.of(GroupRegistrationHandler.GROUP))
             .map(TEST_NAMING_STRATEGY::workQueue)
             .forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
         rabbitMQExtension.getSender()

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/05: JAMES-3599 GroupContract::groupDeliveryShouldNotExceedRate is subject to data races

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e217e641a74fbee2fa5cccbe4013ad3929971975
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 14 15:06:56 2021 +0700

    JAMES-3599 GroupContract::groupDeliveryShouldNotExceedRate is subject to data races
    
    An assertions mixes two atomic integers, thus atomicity
    fails
---
 .../api/src/test/java/org/apache/james/events/GroupContract.java | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
index 01eda24..69ab252 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
@@ -65,7 +65,7 @@ public interface GroupContract {
         default void groupDeliveryShouldNotExceedRate() {
             int eventCount = 50;
             AtomicInteger nbCalls = new AtomicInteger(0);
-            AtomicInteger finishedExecutions = new AtomicInteger(0);
+            AtomicInteger inFlight = new AtomicInteger(0);
             AtomicBoolean rateExceeded = new AtomicBoolean(false);
 
             eventBus().register(new EventListener.GroupEventListener() {
@@ -81,13 +81,12 @@ public interface GroupContract {
 
                 @Override
                 public void event(Event event) throws Exception {
-                    if (nbCalls.get() - finishedExecutions.get() > EventBus.EXECUTION_RATE) {
+                    if (inFlight.incrementAndGet() > EventBus.EXECUTION_RATE) {
                         rateExceeded.set(true);
                     }
                     nbCalls.incrementAndGet();
                     Thread.sleep(Duration.ofMillis(20).toMillis());
-                    finishedExecutions.incrementAndGet();
-
+                    inFlight.decrementAndGet();
                 }
             }, GROUP_A);
 
@@ -95,7 +94,7 @@ public interface GroupContract {
                 .forEach(i -> eventBus().dispatch(EVENT, NO_KEYS).block());
 
             getSpeedProfile().shortWaitCondition().atMost(TEN_MINUTES)
-                .untilAsserted(() -> assertThat(finishedExecutions.get()).isEqualTo(eventCount));
+                .untilAsserted(() -> assertThat(nbCalls.get()).isEqualTo(eventCount));
             assertThat(rateExceeded).isFalse();
         }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/05: JAMES-3599 GroupRegistration : nack is blocking

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c6e63d908bca438411f5c7db6b05e990356b2325
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 14 15:09:37 2021 +0700

    JAMES-3599 GroupRegistration : nack is blocking
    
    Thus it should be scheduled on the elastic scheduler
---
 .../main/java/org/apache/james/events/GroupRegistration.java   | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 57839ef..5b5ce39 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -121,11 +121,7 @@ class GroupRegistration implements Registration {
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                .arguments(deadLetterQueue(namingStrategy.deadLetterExchange())),
-            BindingSpecification.binding()
-                .exchange(namingStrategy.exchange())
-                .queue(queueName.asString())
-                .routingKey(EMPTY_ROUTING_KEY));
+                .arguments(deadLetterQueue(namingStrategy.deadLetterExchange())));
     }
 
     private Disposable consumeWorkQueue() {
@@ -148,7 +144,9 @@ class GroupRegistration implements Registration {
                 .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic())))
             .onErrorResume(e -> {
                 LOGGER.error("Unable to process delivery for group {}", group, e);
-                return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE));
+                return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE))
+                    .subscribeOn(Schedulers.elastic())
+                    .then();
             });
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/05: JAMES-3599 GroupContract::registerShouldNotDispatchPastEventsForGroups needs a grade period

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8b501cc8216a93b218c2753ff49eaf82bfbb184d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 14 15:08:31 2021 +0700

    JAMES-3599 GroupContract::registerShouldNotDispatchPastEventsForGroups needs a grade period
    
    Otherwise the registration might be completed while the
    message is on flight, causing a data race.
---
 event-bus/api/src/test/java/org/apache/james/events/GroupContract.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
index 69ab252..f477006 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/GroupContract.java
@@ -174,6 +174,7 @@ public interface GroupContract {
             EventListener listener = EventBusTestFixture.newListener();
 
             eventBus().dispatch(EVENT, NO_KEYS).block();
+            Thread.sleep(100);
 
             eventBus().register(listener, GROUP_A);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org