You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 02:04:07 UTC

[james-project] 12/15: JAMES-3290 Reactify MailLoader to handle error propagation

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c6a3ecbcaf0fe115dddd94b35bd10634cf212e1f
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Jun 30 17:44:08 2020 +0200

    JAMES-3290 Reactify MailLoader to handle error propagation
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 31 +++++--------
 .../apache/james/queue/rabbitmq/MailLoader.java    | 39 +++++++++++------
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   |  6 +--
 .../james/queue/rabbitmq/MailLoaderTest.java       | 51 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 36 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b22d850..b13b613 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -24,7 +24,6 @@ import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.metrics.api.Metric;
@@ -35,6 +34,7 @@ import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
 
+import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
 import com.rabbitmq.client.Delivery;
 
@@ -76,14 +76,14 @@ class Dequeuer implements Closeable {
 
     }
 
-    private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
+    private final MailLoader mailLoader;
     private final Metric dequeueMetric;
     private final MailReferenceSerializer mailReferenceSerializer;
     private final MailQueueView mailQueueView;
     private final Receiver receiver;
     private final Flux<AcknowledgableDelivery> flux;
 
-    Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+    Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, MailLoader mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
              MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
         this.mailLoader = mailLoader;
@@ -120,13 +120,8 @@ class Dequeuer implements Closeable {
     }
 
     private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {
-        try {
-            MailWithEnqueueId mailWithEnqueueId = loadMail(response);
-            ThrowingConsumer<Boolean> ack = ack(response, mailWithEnqueueId);
-            return Mono.just(new RabbitMQMailQueueItem(ack, mailWithEnqueueId));
-        } catch (MailQueue.MailQueueException e) {
-            return Mono.error(e);
-        }
+        return loadMail(response)
+            .map(mailWithEnqueueId -> new RabbitMQMailQueueItem(ack(response, mailWithEnqueueId), mailWithEnqueueId));
     }
 
     private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery response, MailWithEnqueueId mailWithEnqueueId) {
@@ -141,17 +136,15 @@ class Dequeuer implements Closeable {
         };
     }
 
-    private MailWithEnqueueId loadMail(Delivery response) throws MailQueue.MailQueueException {
-        MailReferenceDTO mailDTO = toMailReference(response);
-        return mailLoader.apply(mailDTO);
+    private Mono<MailWithEnqueueId> loadMail(Delivery response) {
+        return toMailReference(response)
+            .flatMap(mailLoader::load);
     }
 
-    private MailReferenceDTO toMailReference(Delivery getResponse) throws MailQueue.MailQueueException {
-        try {
-            return mailReferenceSerializer.read(getResponse.getBody());
-        } catch (IOException e) {
-            throw new MailQueue.MailQueueException("Failed to parse DTO", e);
-        }
+    private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
+        return Mono.fromCallable(getResponse::getBody)
+            .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow())
+            .onErrorResume(IOException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to parse DTO", e)));
     }
 
 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
index a616909..b724e60 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.queue.rabbitmq;
 
+import java.util.function.Function;
+
 import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
 import javax.mail.internet.MimeMessage;
@@ -29,6 +31,10 @@ import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.mailet.Mail;
 
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Mono;
+
 class MailLoader {
     private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
     private final BlobId.Factory blobIdFactory;
@@ -38,18 +44,25 @@ class MailLoader {
         this.blobIdFactory = blobIdFactory;
     }
 
-    MailWithEnqueueId load(MailReferenceDTO dto) throws MailQueue.MailQueueException {
-        try {
-            MailReference mailReference = dto.toMailReference(blobIdFactory);
-
-            Mail mail = mailReference.getMail();
-            MimeMessage mimeMessage = mimeMessageStore.read(mailReference.getPartsId()).block();
-            mail.setMessage(mimeMessage);
-            return new MailWithEnqueueId(mailReference.getEnqueueId(), mail);
-        } catch (AddressException e) {
-            throw new MailQueue.MailQueueException("Failed to parse mail address", e);
-        } catch (MessagingException e) {
-            throw new MailQueue.MailQueueException("Failed to generate mime message", e);
-        }
+    Mono<MailWithEnqueueId> load(MailReferenceDTO dto) {
+        return Mono.fromCallable(() -> dto.toMailReference(blobIdFactory))
+            .flatMap(mailReference -> buildMail(mailReference)
+                .map(mail -> new MailWithEnqueueId(mailReference.getEnqueueId(), mail)));
+    }
+
+    private Mono<Mail> buildMail(MailReference mailReference) {
+        return mimeMessageStore.read(mailReference.getPartsId())
+            .flatMap(mimeMessage -> buildMailWithMessageReference(mailReference, mimeMessage));
+    }
+
+    private Mono<Mail> buildMailWithMessageReference(MailReference mailReference, MimeMessage mimeMessage) {
+        Function<Mail, Mono<Object>> setMessage = mail ->
+            Mono.fromRunnable(Throwing.runnable(() -> mail.setMessage(mimeMessage)).sneakyThrow())
+                .onErrorResume(AddressException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to parse mail address", e)))
+                .onErrorResume(MessagingException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to generate mime message", e)));
+
+        return Mono.just(mailReference.getMail())
+            .flatMap(mail -> setMessage.apply(mail)
+                .thenReturn(mail));
     }
 }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index fc2f784..e9b3c32 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -29,7 +29,6 @@ import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX
 import java.time.Clock;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 
 import javax.inject.Inject;
 import javax.mail.internet.MimeMessage;
@@ -46,7 +45,6 @@ import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
 import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
@@ -65,7 +63,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
         private final Sender sender;
         private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
         private final MailReferenceSerializer mailReferenceSerializer;
-        private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
+        private final MailLoader mailLoader;
         private final MailQueueView.Factory mailQueueViewFactory;
         private final Clock clock;
         private final MailQueueItemDecoratorFactory decoratorFactory;
@@ -89,7 +87,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             this.clock = clock;
             this.decoratorFactory = decoratorFactory;
             this.mailReferenceSerializer = new MailReferenceSerializer();
-            this.mailLoader = Throwing.function(new MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow();
+            this.mailLoader = new MailLoader(mimeMessageStore, blobIdFactory);
             this.configuration = configuration;
         }
 
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java
new file mode 100644
index 0000000..4b406fb
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java
@@ -0,0 +1,51 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ***************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Mono;
+
+class MailLoaderTest {
+    @Test
+    void storeExceptionShouldBePropagated() {
+        Store<MimeMessage, MimeMessagePartsId> store = mock(Store.class);
+        when(store.read(any())).thenReturn(Mono.error(new RuntimeException("Cassandra problem")));
+        MailReferenceDTO dto = mock(MailReferenceDTO.class);
+        when(dto.toMailReference(any())).thenReturn(mock(MailReference.class));
+        MailLoader loader = new MailLoader(store, new HashBlobId.Factory());
+
+        String result = loader.load(dto)
+            .thenReturn("continued")
+            .onErrorResume(RuntimeException.class, e -> Mono.just("caught"))
+            .block();
+        assertThat(result).isEqualTo("caught");
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org