You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/14 02:31:30 UTC
[james-project] 04/22: JAMES-3277 Avoid a double message read upon
CassandraMessageIdMapper::setFlags
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8b8183153070fedf984954d12f14a30aea16e491
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Oct 12 12:20:27 2020 +0700
JAMES-3277 Avoid a double message read upon CassandraMessageIdMapper::setFlags
Not found was handled via an extra read but we could rely with a smarter error management
on the reads of updateFlags to detect them.
---
.../cassandra/mail/CassandraMessageIdMapper.java | 25 +++++----------------
.../mail/CassandraMessageIdMapperTest.java | 26 ++++++++++++++++++++++
2 files changed, 31 insertions(+), 20 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index b0293d1..771c276 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -101,7 +101,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
@Override
public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
- return Flux.fromStream(messageIds.stream())
+ return Flux.fromIterable(messageIds)
.flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
.flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType)
.map(messageRepresentation -> Pair.of(composedMessageId, messageRepresentation)), cassandraConfiguration.getMessageReadChunkSize())
@@ -220,21 +220,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
return Flux.fromIterable(mailboxIds)
.distinct()
.map(mailboxId -> (CassandraId) mailboxId)
- .filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
.concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
.flatMap(this::updateCounts)
.collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight))
.block();
}
- private Mono<Boolean> haveMetaData(MessageId messageId, CassandraId mailboxId) {
- return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId))
- .hasElements();
- }
-
private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
- return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
- .single()
+ return Mono.defer(() -> updateFlags(mailboxId, messageId, newState, updateMode))
.retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
.onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
@@ -261,16 +254,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.thenReturn(pair);
}
- private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
- try {
- return updateFlags(mailboxId, messageId, newState, updateMode);
- } catch (MailboxException e) {
- LOGGER.error("Error while updating flags on mailbox: {}", mailboxId);
- return Mono.empty();
- }
- }
-
- private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+ private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
CassandraId cassandraId = (CassandraId) mailboxId;
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
.flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
@@ -300,6 +284,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
return imapUidDAO.updateMetadata(newComposedId, oldComposedId.getModSeq())
.filter(FunctionalUtils.identityPredicate())
.flatMap(any -> messageIdDAO.updateMetadata(newComposedId)
- .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)));
+ .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)))
+ .single();
}
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
index c196c71..fae47aa 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
@@ -85,6 +85,32 @@ class CassandraMessageIdMapperTest extends MessageIdMapperTest {
.containsOnly(message1, message2, message3, message4);
}
+ @Test
+ void setFlagsShouldMinimizeMessageReads(CassandraCluster cassandra) throws Exception {
+ CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
+ CassandraMailboxSessionMapperFactory mapperFactory = TestCassandraMailboxSessionMapperFactory.forTests(
+ cassandraCluster.getCassandraCluster(),
+ messageIdFactory,
+ CassandraConfiguration.builder()
+ .messageReadChunkSize(3)
+ .build());
+
+ saveMessages();
+
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+
+ mapperFactory.getMessageIdMapper(MAILBOX_SESSION).setFlags(message1.getMessageId(),
+ ImmutableList.of(message1.getMailboxId()),
+ new Flags(Flags.Flag.DELETED),
+ MessageManager.FlagsUpdateMode.REPLACE);
+
+ assertThat(statementRecorder.listExecutedStatements(
+ StatementRecorder.Selector.preparedStatementStartingWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted," +
+ "flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable")))
+ .hasSize(1);
+ }
+
@Nested
class FailureTest {
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org