You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/23 10:41:18 UTC
[02/13] james-project git commit: MAILBOX-376 Refactor
GroupConsumerRetry
MAILBOX-376 Refactor GroupConsumerRetry
- Remove unneeded RetryPublisher inner class
- Avoid some not needed mono wrapping - which was leading to confusing method names
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0c64eec9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0c64eec9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0c64eec9
Branch: refs/heads/master
Commit: 0c64eec98acb283fa577525363dc3aa4ae6e60e9
Parents: c3f16af
Author: Benoit Tellier <bt...@linagora.com>
Authored: Wed Jan 23 10:38:56 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Wed Jan 23 10:38:56 2019 +0700
----------------------------------------------------------------------
.../mailbox/events/GroupConsumerRetry.java | 78 +++++++-------------
1 file changed, 27 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/0c64eec9/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
index ce2c713..ff10d23 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -42,9 +42,7 @@ import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
class GroupConsumerRetry {
-
static class RetryExchangeName {
-
static RetryExchangeName of(Group group) {
return new RetryExchangeName(group.asString());
}
@@ -63,65 +61,21 @@ class GroupConsumerRetry {
}
}
- static class RetryPublisher {
-
- private final Sender sender;
- private final RetryExchangeName retryExchangeName;
- private final RetryBackoffConfiguration retryBackoff;
- private final EventDeadLetters eventDeadLetters;
- private final GroupRegistration.WorkQueueName queueName;
-
- RetryPublisher(Sender sender, RetryExchangeName retryExchangeName, RetryBackoffConfiguration retryBackoff,
- EventDeadLetters eventDeadLetters, GroupRegistration.WorkQueueName queueName) {
- this.sender = sender;
- this.retryExchangeName = retryExchangeName;
- this.retryBackoff = retryBackoff;
- this.eventDeadLetters = eventDeadLetters;
- this.queueName = queueName;
- }
-
- Mono<Void> publish(Event event, byte[] eventAsByte, int currentRetryCount) {
- return Mono.just(currentRetryCount)
- .flatMap(retryCount -> retryOrStoreToDeadLetter(event, eventAsByte, retryCount));
- }
-
- private Mono<Void> retryOrStoreToDeadLetter(Event event, byte[] eventAsByte, int currentRetryCount) {
- if (currentRetryCount >= retryBackoff.getMaxRetries()) {
- return eventDeadLetters.store(queueName.getGroup(), event);
- }
-
- return sendRetryMessage(event, eventAsByte, currentRetryCount);
- }
-
- private Mono<Void> sendRetryMessage(Event event, byte[] eventAsByte, int currentRetryCount) {
- Mono<OutboundMessage> retryMessage = Mono.just(new OutboundMessage(
- retryExchangeName.asString(),
- EMPTY_ROUTING_KEY,
- new AMQP.BasicProperties.Builder()
- .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
- .build(),
- eventAsByte));
-
- return sender.send(retryMessage)
- .doOnError(throwable -> LOGGER.error("Exception happens when publishing event of user {} to retry exchange," +
- "this event will be lost forever",
- event.getUser().asString(), throwable));
- }
- }
-
private static final Logger LOGGER = LoggerFactory.getLogger(GroupConsumerRetry.class);
private final Sender sender;
private final GroupRegistration.WorkQueueName queueName;
private final RetryExchangeName retryExchangeName;
- private final RetryPublisher retryPublisher;
+ private final RetryBackoffConfiguration retryBackoff;
+ private final EventDeadLetters eventDeadLetters;
GroupConsumerRetry(Sender sender, GroupRegistration.WorkQueueName queueName, Group group,
RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters) {
this.sender = sender;
this.queueName = queueName;
this.retryExchangeName = RetryExchangeName.of(group);
- this.retryPublisher = new RetryPublisher(sender, retryExchangeName, retryBackoff, eventDeadLetters, queueName);
+ this.retryBackoff = retryBackoff;
+ this.eventDeadLetters = eventDeadLetters;
}
Mono<Void> createRetryExchange() {
@@ -140,6 +94,28 @@ class GroupConsumerRetry {
LOGGER.error("Exception happens when handling event {} of user {}",
event.getEventId().getId().toString(), event.getUser().asString(), throwable);
- return retryPublisher.publish(event, eventAsBytes, currentRetryCount);
+ return retryOrStoreToDeadLetter(event, eventAsBytes, currentRetryCount);
+ }
+
+ private Mono<Void> retryOrStoreToDeadLetter(Event event, byte[] eventAsByte, int currentRetryCount) {
+ if (currentRetryCount >= retryBackoff.getMaxRetries()) {
+ return eventDeadLetters.store(queueName.getGroup(), event);
+ }
+ return sendRetryMessage(event, eventAsByte, currentRetryCount);
+ }
+
+ private Mono<Void> sendRetryMessage(Event event, byte[] eventAsByte, int currentRetryCount) {
+ Mono<OutboundMessage> retryMessage = Mono.just(new OutboundMessage(
+ retryExchangeName.asString(),
+ EMPTY_ROUTING_KEY,
+ new AMQP.BasicProperties.Builder()
+ .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
+ .build(),
+ eventAsByte));
+
+ return sender.send(retryMessage)
+ .doOnError(throwable -> LOGGER.error("Exception happens when publishing event of user {} to retry exchange," +
+ "this event will be lost forever",
+ event.getUser().asString(), throwable));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org