You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/18 05:04:52 UTC

[james-project] 02/07: [REFACTORING] JMAP draft setMessages destroy is now fully reactive

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2402634b5cb32f11d80dd78e6b996a5aac9a22d1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 15 09:56:42 2021 +0700

    [REFACTORING] JMAP draft setMessages destroy is now fully reactive
---
 .../org/apache/james/mailbox/MessageIdManager.java |  4 +-
 .../james/mailbox/store/StoreMessageIdManager.java |  2 +-
 .../methods/SetMessagesDestructionProcessor.java   | 74 ++++++++++------------
 .../jmap/draft/methods/SetMessagesProcessor.java   |  4 +-
 4 files changed, 39 insertions(+), 45 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 218819a..272d94c 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -66,7 +66,7 @@ public interface MessageIdManager {
 
     DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
-    Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException;
+    Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession);
 
     void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
@@ -76,7 +76,7 @@ public interface MessageIdManager {
         return getMessages(ImmutableList.of(messageId), fetchGroup, mailboxSession);
     }
 
-    default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) throws MailboxException {
+    default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) {
         return Mono.from(delete(ImmutableList.of(messageId), mailboxSession))
             .subscribeOn(Schedulers.elastic())
             .block();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 11ca8ff..47d44a0 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -223,7 +223,7 @@ public class StoreMessageIdManager implements MessageIdManager {
     }
 
     @Override
-    public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
+    public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
         return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata)
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
index 5a90a6b..2564aa7 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
@@ -22,29 +22,27 @@ package org.apache.james.jmap.draft.methods;
 import static org.apache.james.jmap.draft.methods.Method.JMAP_PREFIX;
 
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.jmap.draft.model.SetError;
 import org.apache.james.jmap.draft.model.SetMessagesRequest;
 import org.apache.james.jmap.draft.model.SetMessagesResponse;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageIdManager;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.DeleteResult;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.metrics.api.MetricFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
-
     private static final Logger LOGGER = LoggerFactory.getLogger(SetMessagesCreationProcessor.class);
 
     private final MessageIdManager messageIdManager;
@@ -58,44 +56,38 @@ public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
     }
 
     @Override
-    public SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) {
-        return metricFactory.decorateSupplierWithTimerMetric(JMAP_PREFIX + "SetMessageDestructionProcessor",
-            () -> delete(request.getDestroy(), mailboxSession)
-                .reduce(SetMessagesResponse.builder(),
-                    SetMessagesResponse.Builder::accumulator,
-                    SetMessagesResponse.Builder::combiner)
-                .build());
+    public Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) {
+        return Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + "SetMessageDestructionProcessor",
+            delete(request.getDestroy(), mailboxSession)));
     }
 
-
-    private Stream<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, MailboxSession mailboxSession) {
-        try {
-            if (toBeDestroyed.isEmpty()) {
-                return Stream.empty();
-            }
-            DeleteResult deleteResult = Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
-                .subscribeOn(Schedulers.elastic())
-                .block();
-
-            Stream<SetMessagesResponse> destroyed = deleteResult.getDestroyed().stream()
-                .map(messageId -> SetMessagesResponse.builder().destroyed(messageId).build());
-            Stream<SetMessagesResponse> notFound = deleteResult.getNotFound().stream()
-                .map(messageId -> SetMessagesResponse.builder().notDestroyed(messageId,
-                    SetError.builder()
-                        .type(SetError.Type.NOT_FOUND)
-                        .description("The message " + messageId.serialize() + " can't be found")
-                        .build())
-                    .build());
-            return Stream.concat(destroyed, notFound);
-        } catch (MailboxException e) {
-            LOGGER.error("An error occurred when deleting a message", e);
-            return toBeDestroyed.stream()
-                .map(messageId -> SetMessagesResponse.builder().notDestroyed(messageId,
-                    SetError.builder()
-                        .type(SetError.Type.ERROR)
-                        .description("An error occurred while deleting messages " + messageId.serialize())
-                        .build())
-                    .build());
+    private Mono<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, MailboxSession mailboxSession) {
+        if (toBeDestroyed.isEmpty()) {
+            return Mono.just(SetMessagesResponse.builder().build());
         }
+        return Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
+            .map(deleteResult -> SetMessagesResponse.builder()
+                .destroyed(ImmutableList.copyOf(deleteResult.getDestroyed()))
+                .notDestroyed(deleteResult.getNotFound().stream()
+                    .map(messageId -> Pair.of(messageId,
+                        SetError.builder()
+                            .type(SetError.Type.NOT_FOUND)
+                            .description("The message " + messageId.serialize() + " can't be found")
+                            .build()))
+                    .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue)))
+                .build())
+            .onErrorResume(e -> {
+                LOGGER.error("An error occurred when deleting a message", e);
+                return Mono.just(
+                    SetMessagesResponse.builder()
+                        .notDestroyed(toBeDestroyed.stream()
+                            .map(messageId -> Pair.of(messageId,
+                                SetError.builder()
+                                    .type(SetError.Type.ERROR)
+                                    .description("An error occurred while deleting messages " + messageId.serialize())
+                                    .build()))
+                            .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue)))
+                        .build());
+            });
     }
 }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
index 787d9f8..0d34019 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
@@ -27,7 +27,9 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public interface SetMessagesProcessor {
-    SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession);
+    default SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) {
+        return processReactive(request, mailboxSession).block();
+    }
 
     default Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) {
         return Mono.fromCallable(() -> process(request, mailboxSession))

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org