You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/14 02:31:31 UTC

[james-project] 05/22: JAMES-3277 SetMessagesUpdateProcessor: read all metadata upfront

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 926464152a3d8098bce65c5474008dd692c24d31
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Oct 12 12:25:19 2020 +0700

    JAMES-3277 SetMessagesUpdateProcessor: read all metadata upfront
    
     - Reading messagesV2 metadata level is not required thus can be skept
     - messageMetadata mimics MessageManager recent API
     - More importantly this allows grouping of the rights management, that is performed only once upon retrieving messages.
---
 .../org/apache/james/mailbox/MessageIdManager.java |  6 +++++
 .../cassandra/mail/CassandraMessageIdMapper.java   |  6 +++++
 .../mail/CassandraMessageIdMapperTest.java         |  4 +++
 .../inmemory/mail/InMemoryMessageIdMapper.java     | 16 +++++++++++
 .../james/mailbox/store/StoreMessageIdManager.java | 17 +++++++++---
 .../james/mailbox/store/mail/MessageIdMapper.java  |  4 +++
 .../james/jmap/draft/methods/ReferenceUpdater.java |  9 +++----
 .../draft/methods/SetMessagesUpdateProcessor.java  | 31 +++++++++++++++-------
 8 files changed, 74 insertions(+), 19 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 92895c4..26a53cb 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -27,6 +27,7 @@ import javax.mail.Flags;
 
 import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
 import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.DeleteResult;
 import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.MailboxId;
@@ -39,6 +40,11 @@ import com.google.common.collect.ImmutableList;
 import reactor.core.publisher.Flux;
 
 public interface MessageIdManager {
+    default Publisher<ComposedMessageIdWithMetaData> messageMetadata(MessageId id, MailboxSession session) {
+        return messagesMetadata(ImmutableList.of(id), session);
+    }
+
+    Publisher<ComposedMessageIdWithMetaData> messagesMetadata(Collection<MessageId> id, MailboxSession session);
 
     Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, final MailboxSession mailboxSession) throws MailboxException;
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 771c276..7c09008 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -47,6 +47,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.util.FunctionalUtils;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,6 +111,11 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .flatMap(this::keepMessageIfMailboxExists);
     }
 
+    @Override
+    public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId) {
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty());
+    }
+
     private Flux<MailboxMessage> keepMessageIfMailboxExists(GroupedFlux<MailboxId, MailboxMessage> groupedFlux) {
         CassandraId cassandraId = (CassandraId) groupedFlux.key();
         return mailboxDAO.retrieveMailbox(cassandraId)
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
index fae47aa..fb69a32 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
@@ -25,12 +25,16 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.List;
 import java.util.Optional;
 
+import javax.mail.Flags;
+
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.StatementRecorder;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MailboxSessionUtil;
+import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.cassandra.CassandraMailboxSessionMapperFactory;
 import org.apache.james.mailbox.cassandra.TestCassandraMailboxSessionMapperFactory;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index c16968b..f01a0e2 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -30,6 +30,8 @@ import javax.mail.Flags;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
 import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
@@ -41,6 +43,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.reactivestreams.Publisher;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
@@ -66,6 +69,19 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
     }
 
     @Override
+    public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId) {
+        return mailboxMapper.list()
+            .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.Full, UNLIMITED))
+            .map(message -> new ComposedMessageIdWithMetaData(
+                new ComposedMessageId(
+                    message.getMailboxId(),
+                    message.getMessageId(),
+                    message.getUid()),
+                message.createFlags(),
+                message.getModSeq()));
+    }
+
+    @Override
     public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) {
         return mailboxMapper.list()
             .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), fetchType, UNLIMITED))
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index e84d747..1a29de9 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -43,6 +43,7 @@ import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.extension.PreDeletionHook;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.DeleteResult;
 import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.Mailbox;
@@ -69,6 +70,7 @@ import org.apache.james.mailbox.store.mail.model.Message;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.mailbox.store.quota.QuotaChecker;
 import org.apache.james.util.FunctionalUtils;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,6 +160,17 @@ public class StoreMessageIdManager implements MessageIdManager {
             .map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow());
     }
 
+    @Override
+    public Publisher<ComposedMessageIdWithMetaData> messagesMetadata(Collection<MessageId> ids, MailboxSession session) {
+        MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(session);
+        int concurrency = 4;
+        return Flux.fromIterable(ids)
+            .flatMap(id -> Flux.from(messageIdMapper.findMetadata(id)), concurrency)
+            .groupBy(metaData -> metaData.getComposedMessageId().getMailboxId())
+            .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(session, Right.Read).apply(groupedFlux.key()))
+            .flatMap(Function.identity());
+    }
+
     private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, List<MailboxMessage> messageList, Right... rights) throws MailboxException {
         return MailboxReactorUtils.block(Flux.fromIterable(messageList)
             .map(MailboxMessage::getMailboxId)
@@ -432,10 +445,6 @@ public class StoreMessageIdManager implements MessageIdManager {
         return mailboxMessage -> mailboxIds.contains(mailboxMessage.getMailboxId());
     }
 
-    private Function<MailboxMessage, Mono<Boolean>> hasRightsOn(MailboxSession session, Right... rights) {
-        return hasRightsOnMailboxReactive(session, rights).compose(MailboxMessage::getMailboxId);
-    }
-
     private Function<MailboxId, Mono<Boolean>> hasRightsOnMailboxReactive(MailboxSession session, Right... rights) {
         return mailboxId -> Mono.from(rightManager.myRights(mailboxId, session))
             .map(myRights -> myRights.contains(rights))
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 6e422e2..5191435 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -26,12 +26,14 @@ import javax.mail.Flags;
 import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.reactivestreams.Publisher;
 
 import com.google.common.collect.Multimap;
 
@@ -42,6 +44,8 @@ public interface MessageIdMapper {
 
     List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType);
 
+    Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId);
+
     default Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
         return Flux.fromIterable(find(messageIds, fetchType));
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
index 2e48cd4..17c946a 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
@@ -32,12 +32,10 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
 import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.Header;
 import org.apache.james.mailbox.model.Headers;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.mailbox.model.MessageResult;
 import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
 import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.util.streams.Iterators;
@@ -96,9 +94,10 @@ public class ReferenceUpdater {
             .collectList().block();
         try {
             MessageId reference = Iterables.getOnlyElement(references);
-            List<MailboxId> mailboxIds = messageIdManager.getMessage(reference, FetchGroup.MINIMAL, session).stream()
-                .map(MessageResult::getMailboxId)
-                .collect(Guavate.toImmutableList());
+            List<MailboxId> mailboxIds = Flux.from(messageIdManager.messageMetadata(reference, session))
+                .map(metaData -> metaData.getComposedMessageId().getMailboxId())
+                .collect(Guavate.toImmutableList())
+                .block();
             messageIdManager.setFlags(flag, FlagsUpdateMode.ADD, reference, mailboxIds, session);
         } catch (NoSuchElementException e) {
             logger.info("Unable to find a message with this Mime Message Id: " + messageId);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
index 82d423e..2c74d40 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java
@@ -24,6 +24,7 @@ import static org.apache.james.jmap.draft.methods.Method.JMAP_PREFIX;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -54,6 +55,7 @@ import org.apache.james.mailbox.Role;
 import org.apache.james.mailbox.SystemMailboxesProvider;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.OverQuotaException;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxId.Factory;
@@ -72,6 +74,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
 
 import io.vavr.control.Try;
 import reactor.core.publisher.Flux;
@@ -135,9 +138,15 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
     }
 
     private void prepareResponse(SetMessagesRequest request, MailboxSession mailboxSession, SetMessagesResponse.Builder responseBuilder, Set<MailboxId> outboxes) {
-        request.buildUpdatePatches(updatePatchConverter).forEach((id, patch) -> {
+        Map<MessageId, UpdateMessagePatch> patches = request.buildUpdatePatches(updatePatchConverter);
+
+        Multimap<MessageId, ComposedMessageIdWithMetaData> messages = Flux.from(messageIdManager.messagesMetadata(patches.keySet(), mailboxSession))
+            .collect(Guavate.toImmutableListMultimap(metaData -> metaData.getComposedMessageId().getMessageId()))
+            .block();
+
+        patches.forEach((id, patch) -> {
                 if (patch.isValid()) {
-                    update(outboxes, id, patch, mailboxSession, responseBuilder);
+                    update(outboxes, id, patch, mailboxSession, responseBuilder, messages);
                 } else {
                     handleInvalidRequest(responseBuilder, id, patch.getValidationErrors(), patch);
                 }
@@ -146,9 +155,11 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
     }
 
     private void update(Set<MailboxId> outboxes, MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession,
-                        SetMessagesResponse.Builder builder) {
+                        SetMessagesResponse.Builder builder, Multimap<MessageId, ComposedMessageIdWithMetaData> metadata) {
         try {
-            List<MessageResult> messages = messageIdManager.getMessage(messageId, FetchGroup.MINIMAL, mailboxSession);
+            List<ComposedMessageIdWithMetaData> messages = Optional.ofNullable(metadata.get(messageId))
+                .map(ImmutableList::copyOf)
+                .orElse(ImmutableList.of());
             assertValidUpdate(messages, updateMessagePatch, outboxes);
 
             if (messages.isEmpty()) {
@@ -223,16 +234,16 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
         }
     }
 
-    private void assertValidUpdate(List<MessageResult> messagesToBeUpdated,
+    private void assertValidUpdate(List<ComposedMessageIdWithMetaData> messagesToBeUpdated,
                                    UpdateMessagePatch updateMessagePatch,
                                    Set<MailboxId> outboxMailboxes) {
         ImmutableList<MailboxId> previousMailboxes = messagesToBeUpdated.stream()
-            .map(MessageResult::getMailboxId)
+            .map(metaData -> metaData.getComposedMessageId().getMailboxId())
             .collect(Guavate.toImmutableList());
         List<MailboxId> targetMailboxes = getTargetedMailboxes(previousMailboxes, updateMessagePatch);
 
         boolean isDraft = messagesToBeUpdated.stream()
-            .map(MessageResult::getFlags)
+            .map(ComposedMessageIdWithMetaData::getFlags)
             .map(Keywords.lenientFactory()::fromFlags)
             .reduce(new KeywordsCombiner())
             .orElse(Keywords.DEFAULT_VALUE)
@@ -294,12 +305,12 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor {
             .collect(Guavate.toImmutableSet());
     }
 
-    private Stream<MailboxException> updateFlags(MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, MessageResult messageResult) {
+    private Stream<MailboxException> updateFlags(MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, ComposedMessageIdWithMetaData message) {
         try {
             if (!updateMessagePatch.isFlagsIdentity()) {
                 messageIdManager.setFlags(
-                    updateMessagePatch.applyToState(messageResult.getFlags()),
-                    FlagsUpdateMode.REPLACE, messageId, ImmutableList.of(messageResult.getMailboxId()), mailboxSession);
+                    updateMessagePatch.applyToState(message.getFlags()),
+                    FlagsUpdateMode.REPLACE, messageId, ImmutableList.of(message.getComposedMessageId().getMailboxId()), mailboxSession);
             }
             return Stream.of();
         } catch (MailboxException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org