You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 06:50:45 UTC

[james-project] 01/04: JAMES-3265 CassandraMessageMapper should limit modseq allocation upon flags updates

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a2dd14bc57eabeb8780556ca7cd24c74c198f751
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jun 24 08:51:37 2020 +0700

    JAMES-3265 CassandraMessageMapper should limit modseq allocation upon flags updates
---
 .../cassandra/mail/CassandraMessageMapper.java      |  6 +++---
 .../cassandra/mail/CassandraMessageMapperTest.java  | 21 +++++++++++++++++++++
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 82c9e6b..2934984 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -344,9 +344,9 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
-        Mono<ModSeq> newModSeq = computeNewModSeq(mailboxId);
-        return toBeUpdated
-            .concatMap(metadata -> newModSeq.flatMap(modSeq -> tryFlagsUpdate(flagsUpdateCalculator, modSeq, metadata)))
+        return computeNewModSeq(mailboxId)
+            .flatMapMany(newModSeq -> toBeUpdated
+            .concatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata)))
             .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
             .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
     }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index b97cf66..f115e60 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -25,13 +25,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.Iterator;
 import java.util.Optional;
 
+import javax.mail.Flags;
+
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.StatementRecorder;
+import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.FlagsUpdateCalculator;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.MapperProvider;
@@ -73,6 +77,23 @@ class CassandraMessageMapperTest extends MessageMapperTest {
             .hasSize(limit);
     }
 
+    @Test
+    void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException {
+        saveMessages();
+
+        StatementRecorder statementRecorder = new StatementRecorder();
+        cassandra.getConf().recordStatements(statementRecorder);
+
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+
+        assertThat(statementRecorder.listExecutedStatements())
+            .filteredOn(statement -> statement instanceof BoundStatement)
+            .extracting(BoundStatement.class::cast)
+            .extracting(statement -> statement.preparedStatement().getQueryString())
+            .filteredOn(statementString -> statementString.equals("UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;"))
+            .hasSize(1);
+    }
+
     private void consume(Iterator<MailboxMessage> inMailbox) {
         ImmutableList.copyOf(inMailbox);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org