You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/18 05:04:52 UTC
[james-project] 02/07: [REFACTORING] JMAP draft setMessages destroy
is now fully reactive
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2402634b5cb32f11d80dd78e6b996a5aac9a22d1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 15 09:56:42 2021 +0700
[REFACTORING] JMAP draft setMessages destroy is now fully reactive
---
.../org/apache/james/mailbox/MessageIdManager.java | 4 +-
.../james/mailbox/store/StoreMessageIdManager.java | 2 +-
.../methods/SetMessagesDestructionProcessor.java | 74 ++++++++++------------
.../jmap/draft/methods/SetMessagesProcessor.java | 4 +-
4 files changed, 39 insertions(+), 45 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 218819a..272d94c 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -66,7 +66,7 @@ public interface MessageIdManager {
DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
- Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException;
+ Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession);
void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
@@ -76,7 +76,7 @@ public interface MessageIdManager {
return getMessages(ImmutableList.of(messageId), fetchGroup, mailboxSession);
}
- default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) throws MailboxException {
+ default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) {
return Mono.from(delete(ImmutableList.of(messageId), mailboxSession))
.subscribeOn(Schedulers.elastic())
.block();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 11ca8ff..47d44a0 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -223,7 +223,7 @@ public class StoreMessageIdManager implements MessageIdManager {
}
@Override
- public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
+ public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) {
MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata)
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
index 5a90a6b..2564aa7 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
@@ -22,29 +22,27 @@ package org.apache.james.jmap.draft.methods;
import static org.apache.james.jmap.draft.methods.Method.JMAP_PREFIX;
import java.util.List;
-import java.util.stream.Stream;
import javax.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.jmap.draft.model.SetError;
import org.apache.james.jmap.draft.model.SetMessagesRequest;
import org.apache.james.jmap.draft.model.SetMessagesResponse;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageIdManager;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.DeleteResult;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.metrics.api.MetricFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
-
private static final Logger LOGGER = LoggerFactory.getLogger(SetMessagesCreationProcessor.class);
private final MessageIdManager messageIdManager;
@@ -58,44 +56,38 @@ public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
}
@Override
- public SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) {
- return metricFactory.decorateSupplierWithTimerMetric(JMAP_PREFIX + "SetMessageDestructionProcessor",
- () -> delete(request.getDestroy(), mailboxSession)
- .reduce(SetMessagesResponse.builder(),
- SetMessagesResponse.Builder::accumulator,
- SetMessagesResponse.Builder::combiner)
- .build());
+ public Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) {
+ return Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + "SetMessageDestructionProcessor",
+ delete(request.getDestroy(), mailboxSession)));
}
-
- private Stream<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, MailboxSession mailboxSession) {
- try {
- if (toBeDestroyed.isEmpty()) {
- return Stream.empty();
- }
- DeleteResult deleteResult = Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
- .subscribeOn(Schedulers.elastic())
- .block();
-
- Stream<SetMessagesResponse> destroyed = deleteResult.getDestroyed().stream()
- .map(messageId -> SetMessagesResponse.builder().destroyed(messageId).build());
- Stream<SetMessagesResponse> notFound = deleteResult.getNotFound().stream()
- .map(messageId -> SetMessagesResponse.builder().notDestroyed(messageId,
- SetError.builder()
- .type(SetError.Type.NOT_FOUND)
- .description("The message " + messageId.serialize() + " can't be found")
- .build())
- .build());
- return Stream.concat(destroyed, notFound);
- } catch (MailboxException e) {
- LOGGER.error("An error occurred when deleting a message", e);
- return toBeDestroyed.stream()
- .map(messageId -> SetMessagesResponse.builder().notDestroyed(messageId,
- SetError.builder()
- .type(SetError.Type.ERROR)
- .description("An error occurred while deleting messages " + messageId.serialize())
- .build())
- .build());
+ private Mono<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, MailboxSession mailboxSession) {
+ if (toBeDestroyed.isEmpty()) {
+ return Mono.just(SetMessagesResponse.builder().build());
}
+ return Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
+ .map(deleteResult -> SetMessagesResponse.builder()
+ .destroyed(ImmutableList.copyOf(deleteResult.getDestroyed()))
+ .notDestroyed(deleteResult.getNotFound().stream()
+ .map(messageId -> Pair.of(messageId,
+ SetError.builder()
+ .type(SetError.Type.NOT_FOUND)
+ .description("The message " + messageId.serialize() + " can't be found")
+ .build()))
+ .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue)))
+ .build())
+ .onErrorResume(e -> {
+ LOGGER.error("An error occurred when deleting a message", e);
+ return Mono.just(
+ SetMessagesResponse.builder()
+ .notDestroyed(toBeDestroyed.stream()
+ .map(messageId -> Pair.of(messageId,
+ SetError.builder()
+ .type(SetError.Type.ERROR)
+ .description("An error occurred while deleting messages " + messageId.serialize())
+ .build()))
+ .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue)))
+ .build());
+ });
}
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
index 787d9f8..0d34019 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
@@ -27,7 +27,9 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public interface SetMessagesProcessor {
- SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession);
+ default SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) {
+ return processReactive(request, mailboxSession).block();
+ }
default Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) {
return Mono.fromCallable(() -> process(request, mailboxSession))
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org