You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/23 10:41:18 UTC

[02/13] james-project git commit: MAILBOX-376 Refactor GroupConsumerRetry

MAILBOX-376 Refactor GroupConsumerRetry

 - Remove unneeded RetryPublisher inner class
 - Avoid some not needed mono wrapping - which was leading to confusing method names


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0c64eec9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0c64eec9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0c64eec9

Branch: refs/heads/master
Commit: 0c64eec98acb283fa577525363dc3aa4ae6e60e9
Parents: c3f16af
Author: Benoit Tellier <bt...@linagora.com>
Authored: Wed Jan 23 10:38:56 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Wed Jan 23 10:38:56 2019 +0700

----------------------------------------------------------------------
 .../mailbox/events/GroupConsumerRetry.java      | 78 +++++++-------------
 1 file changed, 27 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0c64eec9/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
----------------------------------------------------------------------
diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
index ce2c713..ff10d23 100644
--- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
+++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -42,9 +42,7 @@ import reactor.rabbitmq.OutboundMessage;
 import reactor.rabbitmq.Sender;
 
 class GroupConsumerRetry {
-
     static class RetryExchangeName {
-
         static RetryExchangeName of(Group group) {
             return new RetryExchangeName(group.asString());
         }
@@ -63,65 +61,21 @@ class GroupConsumerRetry {
         }
     }
 
-    static class RetryPublisher {
-
-        private final Sender sender;
-        private final RetryExchangeName retryExchangeName;
-        private final RetryBackoffConfiguration retryBackoff;
-        private final EventDeadLetters eventDeadLetters;
-        private final GroupRegistration.WorkQueueName queueName;
-
-        RetryPublisher(Sender sender, RetryExchangeName retryExchangeName, RetryBackoffConfiguration retryBackoff,
-                       EventDeadLetters eventDeadLetters, GroupRegistration.WorkQueueName queueName) {
-            this.sender = sender;
-            this.retryExchangeName = retryExchangeName;
-            this.retryBackoff = retryBackoff;
-            this.eventDeadLetters = eventDeadLetters;
-            this.queueName = queueName;
-        }
-
-        Mono<Void> publish(Event event, byte[] eventAsByte, int currentRetryCount) {
-            return Mono.just(currentRetryCount)
-                .flatMap(retryCount -> retryOrStoreToDeadLetter(event, eventAsByte, retryCount));
-        }
-
-        private Mono<Void> retryOrStoreToDeadLetter(Event event, byte[] eventAsByte, int currentRetryCount) {
-            if (currentRetryCount >= retryBackoff.getMaxRetries()) {
-                return eventDeadLetters.store(queueName.getGroup(), event);
-            }
-
-            return sendRetryMessage(event, eventAsByte, currentRetryCount);
-        }
-
-        private Mono<Void> sendRetryMessage(Event event, byte[] eventAsByte, int currentRetryCount) {
-            Mono<OutboundMessage> retryMessage = Mono.just(new OutboundMessage(
-                retryExchangeName.asString(),
-                EMPTY_ROUTING_KEY,
-                new AMQP.BasicProperties.Builder()
-                    .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
-                    .build(),
-                eventAsByte));
-
-            return sender.send(retryMessage)
-                .doOnError(throwable -> LOGGER.error("Exception happens when publishing event of user {} to retry exchange," +
-                    "this event will be lost forever",
-                    event.getUser().asString(), throwable));
-        }
-    }
-
     private static final Logger LOGGER = LoggerFactory.getLogger(GroupConsumerRetry.class);
 
     private final Sender sender;
     private final GroupRegistration.WorkQueueName queueName;
     private final RetryExchangeName retryExchangeName;
-    private final RetryPublisher retryPublisher;
+    private final RetryBackoffConfiguration retryBackoff;
+    private final EventDeadLetters eventDeadLetters;
 
     GroupConsumerRetry(Sender sender, GroupRegistration.WorkQueueName queueName, Group group,
                        RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters) {
         this.sender = sender;
         this.queueName = queueName;
         this.retryExchangeName = RetryExchangeName.of(group);
-        this.retryPublisher = new RetryPublisher(sender, retryExchangeName, retryBackoff, eventDeadLetters, queueName);
+        this.retryBackoff = retryBackoff;
+        this.eventDeadLetters = eventDeadLetters;
     }
 
     Mono<Void> createRetryExchange() {
@@ -140,6 +94,28 @@ class GroupConsumerRetry {
         LOGGER.error("Exception happens when handling event {} of user {}",
             event.getEventId().getId().toString(), event.getUser().asString(), throwable);
 
-        return retryPublisher.publish(event, eventAsBytes, currentRetryCount);
+        return retryOrStoreToDeadLetter(event, eventAsBytes, currentRetryCount);
+    }
+
+    private Mono<Void> retryOrStoreToDeadLetter(Event event, byte[] eventAsByte, int currentRetryCount) {
+        if (currentRetryCount >= retryBackoff.getMaxRetries()) {
+            return eventDeadLetters.store(queueName.getGroup(), event);
+        }
+        return sendRetryMessage(event, eventAsByte, currentRetryCount);
+    }
+
+    private Mono<Void> sendRetryMessage(Event event, byte[] eventAsByte, int currentRetryCount) {
+        Mono<OutboundMessage> retryMessage = Mono.just(new OutboundMessage(
+            retryExchangeName.asString(),
+            EMPTY_ROUTING_KEY,
+            new AMQP.BasicProperties.Builder()
+                .headers(ImmutableMap.of(RETRY_COUNT, currentRetryCount + 1))
+                .build(),
+            eventAsByte));
+
+        return sender.send(retryMessage)
+            .doOnError(throwable -> LOGGER.error("Exception happens when publishing event of user {} to retry exchange," +
+                    "this event will be lost forever",
+                event.getUser().asString(), throwable));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org