You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/06/16 04:13:32 UTC
[james-project] 04/05: JAMES-3599 Group execution: execute
listeners together
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f7a91cfb727d29f16e01570e0a86caa7adc0dfab
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Jun 14 15:15:12 2021 +0700
JAMES-3599 Group execution: execute listeners together
This minimizes:
- The count of events to deserialize (one for all groups)
- The count of ACKs to perform
This enables potentially ordering of execution upon the happy case.
Note that retries are still performed on a per-group basis.
---
.../rabbitmq/ReactorRabbitMQChannelPool.java | 18 ++++
.../org/apache/james/events/GroupRegistration.java | 10 +-
.../james/events/GroupRegistrationHandler.java | 101 ++++++++++++++++++++-
.../org/apache/james/events/RabbitMQEventBus.java | 4 +-
.../apache/james/events/RabbitMQEventBusTest.java | 4 +-
5 files changed, 129 insertions(+), 8 deletions(-)
diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
index 4e74850..ab6f673 100644
--- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
+++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java
@@ -276,6 +276,24 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable {
.then();
}
+ public Mono<Void> createWorkQueue(QueueSpecification queueSpecification) {
+ Preconditions.checkArgument(queueSpecification.getName() != null, "WorkQueue pattern do not make sense for unnamed queues");
+
+ return Mono.using(this::createSender,
+ managementSender -> managementSender.declareQueue(queueSpecification),
+ Sender::close)
+ .onErrorResume(
+ e -> e instanceof ShutdownSignalException
+ && e.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue"),
+ e -> {
+ LOGGER.warn("{} already exists without dead-letter setup. Dead lettered messages to it will be lost. " +
+ "To solve this, re-create the queue with the x-dead-letter-exchange argument set up.",
+ queueSpecification.getName());
+ return Mono.empty();
+ })
+ .then();
+ }
+
private void invalidateObject(Channel channel) {
try {
pool.invalidateObject(channel);
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index 5b5ce39..c3ce6e5 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -21,7 +21,6 @@ package org.apache.james.events;
import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
@@ -43,7 +42,6 @@ import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
-import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
@@ -139,8 +137,7 @@ class GroupRegistration implements Registration {
return deserializeEvent(eventAsBytes)
.flatMap(event -> delayGenerator.delayIfHaveTo(currentRetryCount)
- .flatMap(any -> runListener(event))
- .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable))
+ .flatMap(any -> runListenerReliably(currentRetryCount, event))
.then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic())))
.onErrorResume(e -> {
LOGGER.error("Unable to process delivery for group {}", group, e);
@@ -150,6 +147,11 @@ class GroupRegistration implements Registration {
});
}
+ public Mono<Void> runListenerReliably(int currentRetryCount, Event event) {
+ return runListener(event)
+ .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable));
+ }
+
private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8)))
.subscribeOn(Schedulers.parallel());
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index cb224d4..88a2e09 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -19,16 +19,51 @@
package org.apache.james.events;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
+import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue;
+import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;
+
+import java.nio.charset.StandardCharsets;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Preconditions;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;
+import reactor.util.retry.Retry;
class GroupRegistrationHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class);
+
+ private final GroupRegistration.WorkQueueName queueName;
+
+ public static class GroupRegistrationHandlerGroup extends Group {
+
+ }
+
+ static final Group GROUP = new GroupRegistrationHandlerGroup();
+
private final NamingStrategy namingStrategy;
private final Map<Group, GroupRegistration> groupRegistrations;
private final EventSerializer eventSerializer;
@@ -38,10 +73,13 @@ class GroupRegistrationHandler {
private final RetryBackoffConfiguration retryBackoff;
private final EventDeadLetters eventDeadLetters;
private final ListenerExecutor listenerExecutor;
+ private final EventBusId eventBusId;
+ private Optional<Receiver> receiver;
+ private Optional<Disposable> consumer;
GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider,
RetryBackoffConfiguration retryBackoff,
- EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor) {
+ EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor, EventBusId eventBusId) {
this.namingStrategy = namingStrategy;
this.eventSerializer = eventSerializer;
this.channelPool = channelPool;
@@ -50,7 +88,11 @@ class GroupRegistrationHandler {
this.retryBackoff = retryBackoff;
this.eventDeadLetters = eventDeadLetters;
this.listenerExecutor = listenerExecutor;
+ this.eventBusId = eventBusId;
this.groupRegistrations = new ConcurrentHashMap<>();
+ this.queueName = namingStrategy.workQueue(GROUP);
+ this.consumer = Optional.empty();
+ this.receiver = Optional.empty();
}
GroupRegistration retrieveGroupRegistration(Group group) {
@@ -58,11 +100,68 @@ class GroupRegistrationHandler {
.orElseThrow(() -> new GroupRegistrationNotFound(group));
}
+ public void start() {
+ channelPool.createWorkQueue(
+ QueueSpecification.queue(queueName.asString())
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
+ .arguments(deadLetterQueue(namingStrategy.deadLetterExchange())),
+ BindingSpecification.binding()
+ .exchange(namingStrategy.exchange())
+ .queue(queueName.asString())
+ .routingKey(EMPTY_ROUTING_KEY))
+ .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic()))
+ .block();
+
+ this.receiver = Optional.of(receiverProvider.createReceiver());
+ this.consumer = Optional.of(consumeWorkQueue());
+ }
+
+ private Disposable consumeWorkQueue() {
+ Preconditions.checkState(receiver.isPresent());
+ return receiver.get().consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+ .publishOn(Schedulers.parallel())
+ .filter(delivery -> Objects.nonNull(delivery.getBody()))
+ .flatMap(this::deliver, EventBus.EXECUTION_RATE)
+ .subscribeOn(Schedulers.elastic())
+ .subscribe();
+ }
+
+ private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
+ byte[] eventAsBytes = acknowledgableDelivery.getBody();
+
+ return deserializeEvent(eventAsBytes)
+ .flatMapIterable(aa -> groupRegistrations.values()
+ .stream()
+ .map(group -> Pair.of(group, aa))
+ .collect(Guavate.toImmutableList()))
+ .flatMap(event -> event.getLeft().runListenerReliably(DEFAULT_RETRY_COUNT, event.getRight()))
+ .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic()))
+ .then()
+ .onErrorResume(e -> {
+ LOGGER.error("Unable to process delivery for group {}", GROUP, e);
+ return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE))
+ .subscribeOn(Schedulers.elastic())
+ .then();
+ });
+ }
+
+ private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
+ return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8)))
+ .subscribeOn(Schedulers.parallel());
+ }
+
void stop() {
groupRegistrations.values().forEach(GroupRegistration::unregister);
+ consumer.ifPresent(Disposable::dispose);
+ receiver.ifPresent(Receiver::close);
}
Registration register(EventListener.ReactiveEventListener listener, Group group) {
+ if (groupRegistrations.isEmpty()) {
+ start();
+ }
return groupRegistrations
.compute(group, (groupToRegister, oldGroupRegistration) -> {
if (oldGroupRegistration != null) {
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index ad50412..74b092d 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -83,7 +83,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
- groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+ groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId);
eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
eventDispatcher.start();
@@ -98,7 +98,7 @@ public class RabbitMQEventBus implements EventBus, Startable {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff);
- groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor);
+ groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId);
eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters);
keyRegistrationHandler.declareQueue();
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
index ed68d5a..525aa76 100644
--- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java
@@ -138,7 +138,9 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract,
eventBus2.stop();
eventBus3.stop();
eventBusWithKeyHandlerNotStarted.stop();
- ALL_GROUPS.stream()
+ Stream.concat(
+ ALL_GROUPS.stream(),
+ Stream.of(GroupRegistrationHandler.GROUP))
.map(TEST_NAMING_STRATEGY::workQueue)
.forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block());
rabbitMQExtension.getSender()
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org