You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 06:50:45 UTC
[james-project] 01/04: JAMES-3265 CassandraMessageMapper should
limit modseq allocation upon flags updates
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a2dd14bc57eabeb8780556ca7cd24c74c198f751
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jun 24 08:51:37 2020 +0700
JAMES-3265 CassandraMessageMapper should limit modseq allocation upon flags updates
---
.../cassandra/mail/CassandraMessageMapper.java | 6 +++---
.../cassandra/mail/CassandraMessageMapperTest.java | 21 +++++++++++++++++++++
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 82c9e6b..2934984 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -344,9 +344,9 @@ public class CassandraMessageMapper implements MessageMapper {
}
private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
- Mono<ModSeq> newModSeq = computeNewModSeq(mailboxId);
- return toBeUpdated
- .concatMap(metadata -> newModSeq.flatMap(modSeq -> tryFlagsUpdate(flagsUpdateCalculator, modSeq, metadata)))
+ return computeNewModSeq(mailboxId)
+ .flatMapMany(newModSeq -> toBeUpdated
+ .concatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata)))
.reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
.flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index b97cf66..f115e60 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -25,13 +25,17 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Iterator;
import java.util.Optional;
+import javax.mail.Flags;
+
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.StatementRecorder;
+import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.store.FlagsUpdateCalculator;
import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.MapperProvider;
@@ -73,6 +77,23 @@ class CassandraMessageMapperTest extends MessageMapperTest {
.hasSize(limit);
}
+ @Test
+ void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+
+ messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+
+ assertThat(statementRecorder.listExecutedStatements())
+ .filteredOn(statement -> statement instanceof BoundStatement)
+ .extracting(BoundStatement.class::cast)
+ .extracting(statement -> statement.preparedStatement().getQueryString())
+ .filteredOn(statementString -> statementString.equals("UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;"))
+ .hasSize(1);
+ }
+
private void consume(Iterator<MailboxMessage> inMailbox) {
ImmutableList.copyOf(inMailbox);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org