You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/02/21 10:05:14 UTC

[2/2] james-project git commit: JAMES-1874 Also parallelize in mailbox checking while doing GetMessages

JAMES-1874 Also parallelize in mailbox checking while doing GetMessages


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/322d5797
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/322d5797
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/322d5797

Branch: refs/heads/master
Commit: 322d5797ac045a8bce369bc94ffd10dcecc43f42
Parents: b714e31
Author: Benoit Tellier <bt...@linagora.com>
Authored: Mon Feb 20 17:57:25 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Feb 21 16:56:45 2017 +0700

----------------------------------------------------------------------
 .../CassandraMailboxSessionMapperFactory.java   |  2 +-
 .../mail/CassandraMessageIdMapper.java          | 33 +++++++++++++-------
 2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/322d5797/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 07e51e7..782a4e2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -104,7 +104,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
 
     @Override
     public MessageIdMapper createMessageIdMapper(MailboxSession mailboxSession) throws MailboxException {
-        return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), getAttachmentMapper(mailboxSession),
+        return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), mailboxDAO, getAttachmentMapper(mailboxSession),
                 imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, modSeqProvider, mailboxSession);
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/322d5797/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 984e2f3..17768cb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -51,6 +51,7 @@ import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.OptionalConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +66,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
 
     private final MailboxMapper mailboxMapper;
+    private final CassandraMailboxDAO mailboxDAO;
     private final AttachmentMapper attachmentMapper;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMessageIdDAO messageIdDAO;
@@ -73,10 +75,11 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     private final ModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
 
-    public CassandraMessageIdMapper(MailboxMapper mailboxMapper, AttachmentMapper attachmentMapper,
+    public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO mailboxDAO, AttachmentMapper attachmentMapper,
                                     CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
                                     CassandraIndexTableHandler indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession mailboxSession) {
         this.mailboxMapper = mailboxMapper;
+        this.mailboxDAO = mailboxDAO;
         this.attachmentMapper = attachmentMapper;
         this.imapUidDAO = imapUidDAO;
         this.messageIdDAO = messageIdDAO;
@@ -98,21 +101,29 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
                 .map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())))
             .thenApply(stream -> stream.flatMap(Function.identity()))
             .thenApply(stream -> stream.collect(Guavate.toImmutableList()))
-            .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Optional.empty())).join()
-            .filter(pair -> mailboxExists(pair.getLeft()))
+            .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Optional.empty()))
+            .thenCompose(stream -> CompletableFutureUtil.allOf(
+                stream.map(pair -> mailboxExists(pair.getLeft())
+                    .thenApply(b -> Optional.of(pair).filter(any -> b)))))
+            .join()
+            .flatMap(OptionalConverter::toStream)
             .map(loadAttachments(fetchType))
             .map(toMailboxMessages())
             .sorted(Comparator.comparing(MailboxMessage::getUid));
     }
 
-    private boolean mailboxExists(CassandraMessageDAO.MessageWithoutAttachment messageWithoutAttachment) {
-        try {
-            mailboxMapper.findMailboxById(messageWithoutAttachment.getMailboxId());
-            return true;
-        } catch (MailboxException e) {
-            LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.", messageWithoutAttachment.getMailboxId(), messageWithoutAttachment.getMessageId());
-            return false;
-        }
+    private CompletableFuture<Boolean> mailboxExists(CassandraMessageDAO.MessageWithoutAttachment messageWithoutAttachment) {
+        CassandraId cassandraId = (CassandraId) messageWithoutAttachment.getMailboxId();
+        return mailboxDAO.retrieveMailbox(cassandraId)
+            .thenApply(optional -> {
+                if (!optional.isPresent()) {
+                    LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
+                        cassandraId,
+                        messageWithoutAttachment.getMessageId());
+                    return false;
+                }
+                return true;
+            });
     }
 
     private Function<Pair<CassandraMessageDAO.MessageWithoutAttachment, Stream<CassandraMessageDAO.MessageAttachmentRepresentation>>, Pair<CassandraMessageDAO.MessageWithoutAttachment, Stream<MessageAttachment>>> loadAttachments(FetchType fetchType) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org