You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 02:04:07 UTC
[james-project] 12/15: JAMES-3290 Reactify MailLoader to handle
error propagation
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c6a3ecbcaf0fe115dddd94b35bd10634cf212e1f
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Tue Jun 30 17:44:08 2020 +0200
JAMES-3290 Reactify MailLoader to handle error propagation
---
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 31 +++++--------
.../apache/james/queue/rabbitmq/MailLoader.java | 39 +++++++++++------
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 6 +--
.../james/queue/rabbitmq/MailLoaderTest.java | 51 ++++++++++++++++++++++
4 files changed, 91 insertions(+), 36 deletions(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b22d850..b13b613 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -24,7 +24,6 @@ import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.Consumer;
-import java.util.function.Function;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.metrics.api.Metric;
@@ -35,6 +34,7 @@ import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;
+import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
import com.rabbitmq.client.Delivery;
@@ -76,14 +76,14 @@ class Dequeuer implements Closeable {
}
- private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
+ private final MailLoader mailLoader;
private final Metric dequeueMetric;
private final MailReferenceSerializer mailReferenceSerializer;
private final MailQueueView mailQueueView;
private final Receiver receiver;
private final Flux<AcknowledgableDelivery> flux;
- Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, Function<MailReferenceDTO, MailWithEnqueueId> mailLoader,
+ Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, MailLoader mailLoader,
MailReferenceSerializer serializer, MetricFactory metricFactory,
MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
this.mailLoader = mailLoader;
@@ -120,13 +120,8 @@ class Dequeuer implements Closeable {
}
private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {
- try {
- MailWithEnqueueId mailWithEnqueueId = loadMail(response);
- ThrowingConsumer<Boolean> ack = ack(response, mailWithEnqueueId);
- return Mono.just(new RabbitMQMailQueueItem(ack, mailWithEnqueueId));
- } catch (MailQueue.MailQueueException e) {
- return Mono.error(e);
- }
+ return loadMail(response)
+ .map(mailWithEnqueueId -> new RabbitMQMailQueueItem(ack(response, mailWithEnqueueId), mailWithEnqueueId));
}
private ThrowingConsumer<Boolean> ack(AcknowledgableDelivery response, MailWithEnqueueId mailWithEnqueueId) {
@@ -141,17 +136,15 @@ class Dequeuer implements Closeable {
};
}
- private MailWithEnqueueId loadMail(Delivery response) throws MailQueue.MailQueueException {
- MailReferenceDTO mailDTO = toMailReference(response);
- return mailLoader.apply(mailDTO);
+ private Mono<MailWithEnqueueId> loadMail(Delivery response) {
+ return toMailReference(response)
+ .flatMap(mailLoader::load);
}
- private MailReferenceDTO toMailReference(Delivery getResponse) throws MailQueue.MailQueueException {
- try {
- return mailReferenceSerializer.read(getResponse.getBody());
- } catch (IOException e) {
- throw new MailQueue.MailQueueException("Failed to parse DTO", e);
- }
+ private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
+ return Mono.fromCallable(getResponse::getBody)
+ .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow())
+ .onErrorResume(IOException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to parse DTO", e)));
}
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
index a616909..b724e60 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailLoader.java
@@ -19,6 +19,8 @@
package org.apache.james.queue.rabbitmq;
+import java.util.function.Function;
+
import javax.mail.MessagingException;
import javax.mail.internet.AddressException;
import javax.mail.internet.MimeMessage;
@@ -29,6 +31,10 @@ import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.queue.api.MailQueue;
import org.apache.mailet.Mail;
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Mono;
+
class MailLoader {
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final BlobId.Factory blobIdFactory;
@@ -38,18 +44,25 @@ class MailLoader {
this.blobIdFactory = blobIdFactory;
}
- MailWithEnqueueId load(MailReferenceDTO dto) throws MailQueue.MailQueueException {
- try {
- MailReference mailReference = dto.toMailReference(blobIdFactory);
-
- Mail mail = mailReference.getMail();
- MimeMessage mimeMessage = mimeMessageStore.read(mailReference.getPartsId()).block();
- mail.setMessage(mimeMessage);
- return new MailWithEnqueueId(mailReference.getEnqueueId(), mail);
- } catch (AddressException e) {
- throw new MailQueue.MailQueueException("Failed to parse mail address", e);
- } catch (MessagingException e) {
- throw new MailQueue.MailQueueException("Failed to generate mime message", e);
- }
+ Mono<MailWithEnqueueId> load(MailReferenceDTO dto) {
+ return Mono.fromCallable(() -> dto.toMailReference(blobIdFactory))
+ .flatMap(mailReference -> buildMail(mailReference)
+ .map(mail -> new MailWithEnqueueId(mailReference.getEnqueueId(), mail)));
+ }
+
+ private Mono<Mail> buildMail(MailReference mailReference) {
+ return mimeMessageStore.read(mailReference.getPartsId())
+ .flatMap(mimeMessage -> buildMailWithMessageReference(mailReference, mimeMessage));
+ }
+
+ private Mono<Mail> buildMailWithMessageReference(MailReference mailReference, MimeMessage mimeMessage) {
+ Function<Mail, Mono<Object>> setMessage = mail ->
+ Mono.fromRunnable(Throwing.runnable(() -> mail.setMessage(mimeMessage)).sneakyThrow())
+ .onErrorResume(AddressException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to parse mail address", e)))
+ .onErrorResume(MessagingException.class, e -> Mono.error(new MailQueue.MailQueueException("Failed to generate mime message", e)));
+
+ return Mono.just(mailReference.getMail())
+ .flatMap(mail -> setMessage.apply(mail)
+ .thenReturn(mail));
}
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index fc2f784..e9b3c32 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -29,7 +29,6 @@ import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX
import java.time.Clock;
import java.util.Optional;
import java.util.Set;
-import java.util.function.Function;
import javax.inject.Inject;
import javax.mail.internet.MimeMessage;
@@ -46,7 +45,6 @@ import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
-import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
@@ -65,7 +63,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
private final Sender sender;
private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore;
private final MailReferenceSerializer mailReferenceSerializer;
- private final Function<MailReferenceDTO, MailWithEnqueueId> mailLoader;
+ private final MailLoader mailLoader;
private final MailQueueView.Factory mailQueueViewFactory;
private final Clock clock;
private final MailQueueItemDecoratorFactory decoratorFactory;
@@ -89,7 +87,7 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
this.clock = clock;
this.decoratorFactory = decoratorFactory;
this.mailReferenceSerializer = new MailReferenceSerializer();
- this.mailLoader = Throwing.function(new MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow();
+ this.mailLoader = new MailLoader(mimeMessageStore, blobIdFactory);
this.configuration = configuration;
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java
new file mode 100644
index 0000000..4b406fb
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/MailLoaderTest.java
@@ -0,0 +1,51 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ***************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.junit.jupiter.api.Test;
+
+import reactor.core.publisher.Mono;
+
+class MailLoaderTest {
+ @Test
+ void storeExceptionShouldBePropagated() {
+ Store<MimeMessage, MimeMessagePartsId> store = mock(Store.class);
+ when(store.read(any())).thenReturn(Mono.error(new RuntimeException("Cassandra problem")));
+ MailReferenceDTO dto = mock(MailReferenceDTO.class);
+ when(dto.toMailReference(any())).thenReturn(mock(MailReference.class));
+ MailLoader loader = new MailLoader(store, new HashBlobId.Factory());
+
+ String result = loader.load(dto)
+ .thenReturn("continued")
+ .onErrorResume(RuntimeException.class, e -> Mono.just("caught"))
+ .block();
+ assertThat(result).isEqualTo("caught");
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org