You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/06/16 04:13:32 UTC

[james-project] 04/05: JAMES-3599 Group execution: execute listeners together

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f7a91cfb727d29f16e01570e0a86caa7adc0dfab
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 14 15:15:12 2021 +0700

    JAMES-3599 Group execution: execute listeners together
    
    This minimizes:
    
     - The count of events to deserialize (one for all groups)
     - The count of ACKs to perform
    
    This enables potentially ordering of execution upon the happy case.
    
    Note that retries are still performed on a per-group basis.
---
 .../rabbitmq/ReactorRabbitMQChannelPool.java       |  18 ++++
 .../org/apache/james/events/GroupRegistration.java |  10 +-
 .../james/events/GroupRegistrationHandler.java     | 101 ++++++++++++++++++++-
 .../org/apache/james/events/RabbitMQEventBus.java  |   4 +-
 .../apache/james/events/RabbitMQEventBusTest.java  |   4 +-
 5 files changed, 129 insertions(+), 8 deletions(-)

diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 4e74850..ab6f673 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -276,6 +276,24 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
             .then();
     }
 
+    public Mono<Void> createWorkQueue(QueueSpecification queueSpecification) {
+        Preconditions.checkArgument(queueSpecification.getName() != null, "WorkQueue pattern do not make sense for unnamed queues");
+
+        return Mono.using(this::createSender,
+            managementSender -> managementSender.declareQueue(queueSpecification),
+            Sender::close)
+            .onErrorResume(
+                e -> e instanceof ShutdownSignalException
+                    && e.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue"),
+                e -> {
+                    LOGGER.warn("{} already exists without dead-letter setup. Dead lettered messages to it will be lost. " +
+                            "To solve this, re-create the queue with the x-dead-letter-exchange argument set up.",
+                        queueSpecification.getName());
+                    return Mono.empty();
+                })
+            .then();
+    }
+
     private void invalidateObject(Channel channel) {
         try {
             pool.invalidateObject(channel);
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 5b5ce39..c3ce6e5 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -21,7 +21,6 @@ package org.apache.james.events;
 
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
 import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
@@ -43,7 +42,6 @@ import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.QueueSpecification;
 import reactor.rabbitmq.Receiver;
@@ -139,8 +137,7 @@ class GroupRegistration implements Registration {
 
         return deserializeEvent(eventAsBytes)
             .flatMap(event -> delayGenerator.delayIfHaveTo(currentRetryCount)
-                .flatMap(any -> runListener(event))
-                .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
+                .flatMap(any -> runListenerReliably(currentRetryCount, event))
                 .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic())))
             .onErrorResume(e -> {
                 LOGGER.error("Unable to process delivery for group {}", group, e);
@@ -150,6 +147,11 @@ class GroupRegistration implements Registration {
             });
     }
 
+    public Mono<Void> runListenerReliably(int currentRetryCount, Event event) {
+        return runListener(event)
+            .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable));
+    }
+
     private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
         return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8)))
             .subscribeOn(Schedulers.parallel());
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index cb224d4..88a2e09 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -19,16 +19,51 @@
 
 package org.apache.james.events;
 
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
+import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
+import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;
+
+import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Preconditions;
 
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Receiver;
 import reactor.rabbitmq.Sender;
+import reactor.util.retry.Retry;
 
 class GroupRegistrationHandler {
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class);
+
+    private final GroupRegistration.WorkQueueName queueName;
+
+    public static class GroupRegistrationHandlerGroup extends Group {
+
+    }
+
+    static final Group GROUP = new GroupRegistrationHandlerGroup();
+
     private final NamingStrategy namingStrategy;
     private final Map<Group, GroupRegistration> groupRegistrations;
     private final EventSerializer eventSerializer;
@@ -38,10 +73,13 @@ class GroupRegistrationHandler {
     private final RetryBackoffConfiguration retryBackoff;
     private final EventDeadLetters eventDeadLetters;
     private final ListenerExecutor listenerExecutor;
+    private final EventBusId eventBusId;
+    private Optional<Receiver> receiver;
+    private Optional<Disposable> consumer;
 
     GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider,
                              RetryBackoffConfiguration retryBackoff,
-                             EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor) {
+                             EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor, EventBusId eventBusId) {
         this.namingStrategy = namingStrategy;
         this.eventSerializer = eventSerializer;
         this.channelPool = channelPool;
@@ -50,7 +88,11 @@ class GroupRegistrationHandler {
         this.retryBackoff = retryBackoff;
         this.eventDeadLetters = eventDeadLetters;
         this.listenerExecutor = listenerExecutor;
+        this.eventBusId = eventBusId;
         this.groupRegistrations = new ConcurrentHashMap<>();
+        this.queueName = namingStrategy.workQueue(GROUP);
+        this.consumer = Optional.empty();
+        this.receiver = Optional.empty();
     }
 
     GroupRegistration retrieveGroupRegistration(Group group) {
@@ -58,11 +100,68 @@ class GroupRegistrationHandler {
             .orElseThrow(() -> new GroupRegistrationNotFound(group));
     }
 
+    public void start() {
+        channelPool.createWorkQueue(
+            QueueSpecification.queue(queueName.asString())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
+                .arguments(deadLetterQueue(namingStrategy.deadLetterExchange())),
+            BindingSpecification.binding()
+                .exchange(namingStrategy.exchange())
+                .queue(queueName.asString())
+                .routingKey(EMPTY_ROUTING_KEY))
+            .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
+            .block();
+
+        this.receiver = Optional.of(receiverProvider.createReceiver());
+        this.consumer = Optional.of(consumeWorkQueue());
+    }
+
+    private Disposable consumeWorkQueue() {
+        Preconditions.checkState(receiver.isPresent());
+        return receiver.get().consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+            .publishOn(Schedulers.parallel())
+            .filter(delivery -> Objects.nonNull(delivery.getBody()))
+            .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+            .subscribeOn(Schedulers.elastic())
+            .subscribe();
+    }
+
+    private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
+        byte[] eventAsBytes = acknowledgableDelivery.getBody();
+
+        return deserializeEvent(eventAsBytes)
+            .flatMapIterable(aa -> groupRegistrations.values()
+                .stream()
+                .map(group -> Pair.of(group, aa))
+                .collect(Guavate.toImmutableList()))
+            .flatMap(event -> event.getLeft().runListenerReliably(DEFAULT_RETRY_COUNT, event.getRight()))
+                .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic()))
+            .then()
+            .onErrorResume(e -> {
+                LOGGER.error("Unable to process delivery for group {}", GROUP, e);
+                return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE))
+                    .subscribeOn(Schedulers.elastic())
+                    .then();
+            });
+    }
+
+    private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
+        return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8)))
+            .subscribeOn(Schedulers.parallel());
+    }
+
     void stop() {
         groupRegistrations.values().forEach(GroupRegistration::unregister);
+        consumer.ifPresent(Disposable::dispose);
+        receiver.ifPresent(Receiver::close);
     }
 
     Registration register(EventListener.ReactiveEventListener listener, Group group) {
+        if (groupRegistrations.isEmpty()) {
+            start();
+        }
         return groupRegistrations
             .compute(group, (groupToRegister, oldGroupRegistration) -> {
                 if (oldGroupRegistration != null) {
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index ad50412..74b092d 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -83,7 +83,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
-            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId);
             eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
 
             eventDispatcher.start();
@@ -98,7 +98,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
 
             LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
             keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
-            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+            groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId);
             eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
 
             keyRegistrationHandler.declareQueue();
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
index ed68d5a..525aa76 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
@@ -138,7 +138,9 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
         eventBus2.stop();
         eventBus3.stop();
         eventBusWithKeyHandlerNotStarted.stop();
-        ALL_GROUPS.stream()
+        Stream.concat(
+            ALL_GROUPS.stream(),
+            Stream.of(GroupRegistrationHandler.GROUP))
             .map(TEST_NAMING_STRATEGY::workQueue)
             .forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
         rabbitMQExtension.getSender()

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org