You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/02/21 10:05:14 UTC
[2/2] james-project git commit: JAMES-1874 Also parallelize in
mailbox checking while doing GetMessages
JAMES-1874 Also parallelize in mailbox checking while doing GetMessages
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/322d5797
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/322d5797
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/322d5797
Branch: refs/heads/master
Commit: 322d5797ac045a8bce369bc94ffd10dcecc43f42
Parents: b714e31
Author: Benoit Tellier <bt...@linagora.com>
Authored: Mon Feb 20 17:57:25 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Tue Feb 21 16:56:45 2017 +0700
----------------------------------------------------------------------
.../CassandraMailboxSessionMapperFactory.java | 2 +-
.../mail/CassandraMessageIdMapper.java | 33 +++++++++++++-------
2 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/322d5797/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 07e51e7..782a4e2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -104,7 +104,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
@Override
public MessageIdMapper createMessageIdMapper(MailboxSession mailboxSession) throws MailboxException {
- return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), getAttachmentMapper(mailboxSession),
+ return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), mailboxDAO, getAttachmentMapper(mailboxSession),
imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, modSeqProvider, mailboxSession);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/322d5797/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 984e2f3..17768cb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -51,6 +51,7 @@ import org.apache.james.mailbox.store.mail.ModSeqProvider;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.OptionalConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +66,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
private final MailboxMapper mailboxMapper;
+ private final CassandraMailboxDAO mailboxDAO;
private final AttachmentMapper attachmentMapper;
private final CassandraMessageIdToImapUidDAO imapUidDAO;
private final CassandraMessageIdDAO messageIdDAO;
@@ -73,10 +75,11 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
private final ModSeqProvider modSeqProvider;
private final MailboxSession mailboxSession;
- public CassandraMessageIdMapper(MailboxMapper mailboxMapper, AttachmentMapper attachmentMapper,
+ public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO mailboxDAO, AttachmentMapper attachmentMapper,
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
CassandraIndexTableHandler indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession mailboxSession) {
this.mailboxMapper = mailboxMapper;
+ this.mailboxDAO = mailboxDAO;
this.attachmentMapper = attachmentMapper;
this.imapUidDAO = imapUidDAO;
this.messageIdDAO = messageIdDAO;
@@ -98,21 +101,29 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.map(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())))
.thenApply(stream -> stream.flatMap(Function.identity()))
.thenApply(stream -> stream.collect(Guavate.toImmutableList()))
- .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Optional.empty())).join()
- .filter(pair -> mailboxExists(pair.getLeft()))
+ .thenCompose(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Optional.empty()))
+ .thenCompose(stream -> CompletableFutureUtil.allOf(
+ stream.map(pair -> mailboxExists(pair.getLeft())
+ .thenApply(b -> Optional.of(pair).filter(any -> b)))))
+ .join()
+ .flatMap(OptionalConverter::toStream)
.map(loadAttachments(fetchType))
.map(toMailboxMessages())
.sorted(Comparator.comparing(MailboxMessage::getUid));
}
- private boolean mailboxExists(CassandraMessageDAO.MessageWithoutAttachment messageWithoutAttachment) {
- try {
- mailboxMapper.findMailboxById(messageWithoutAttachment.getMailboxId());
- return true;
- } catch (MailboxException e) {
- LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.", messageWithoutAttachment.getMailboxId(), messageWithoutAttachment.getMessageId());
- return false;
- }
+ private CompletableFuture<Boolean> mailboxExists(CassandraMessageDAO.MessageWithoutAttachment messageWithoutAttachment) {
+ CassandraId cassandraId = (CassandraId) messageWithoutAttachment.getMailboxId();
+ return mailboxDAO.retrieveMailbox(cassandraId)
+ .thenApply(optional -> {
+ if (!optional.isPresent()) {
+ LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
+ cassandraId,
+ messageWithoutAttachment.getMessageId());
+ return false;
+ }
+ return true;
+ });
}
private Function<Pair<CassandraMessageDAO.MessageWithoutAttachment, Stream<CassandraMessageDAO.MessageAttachmentRepresentation>>, Pair<CassandraMessageDAO.MessageWithoutAttachment, Stream<MessageAttachment>>> loadAttachments(FetchType fetchType) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org