You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 06:50:47 UTC
[james-project] 03/04: JAMES-3265 Reduce statement count upon
CassandraMessageMapper::delete & Flags updates
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a74ca3548cb4675052aa7ce1cd04a742cbf72510
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jun 24 13:18:40 2020 +0700
JAMES-3265 Reduce statement count upon CassandraMessageMapper::delete & Flags updates
This effectively optimizes IMAP EXPUNGE & STORE operations, dramatically limit counter writes,
and should prevent uneeded large tumbstones ranges creations for deletedMessages and recentMessages projections.
---
.../cassandra/mail/CassandraIndexTableHandler.java | 107 +++++++++++++--
.../cassandra/mail/CassandraMailboxCounterDAO.java | 4 +-
.../cassandra/mail/CassandraMessageMapper.java | 61 +++++----
.../mail/CassandraIndexTableHandlerTest.java | 2 +-
.../cassandra/mail/CassandraMessageMapperTest.java | 145 +++++++++++++++++----
5 files changed, 253 insertions(+), 66 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 8fc3d91..47fd1eb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -19,15 +19,23 @@
package org.apache.james.mailbox.cassandra.mail;
+import java.util.Collection;
+import java.util.List;
+
import javax.inject.Inject;
import javax.mail.Flags;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MailboxCounters;
+import org.apache.james.mailbox.model.MessageMetaData;
import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.util.streams.Iterators;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Flux;
@@ -61,11 +69,39 @@ public class CassandraIndexTableHandler {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
updateFirstUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageIdWithMetaData.getComposedMessageId().getUid()),
mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()),
- deletedMessageDAO.removeDeleted(mailboxId, uid),
+ updateDeletedMessageProjectionOnDelete(mailboxId, uid, composedMessageIdWithMetaData.getFlags()),
decrementCountersOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags()))
.then();
}
+ public Mono<Void> updateIndexOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
+ return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+ Flux.fromIterable(metaData)
+ .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid())),
+ Flux.fromIterable(metaData)
+ .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags())),
+ Flux.fromIterable(metaData)
+ .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags())),
+ decrementCountersOnDelete(mailboxId, metaData))
+ .then();
+ }
+
+ private Mono<Void> updateRecentOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) {
+ if (flags.contains(Flags.Flag.RECENT)) {
+ return mailboxRecentDAO.removeFromRecent(mailboxId, uid);
+ }
+
+ return Mono.empty();
+ }
+
+ private Mono<Void> updateDeletedMessageProjectionOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) {
+ if (flags.contains(Flags.Flag.DELETED)) {
+ return deletedMessageDAO.removeDeleted(mailboxId, uid);
+ }
+
+ return Mono.empty();
+ }
+
public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) {
Flags flags = message.createFlags();
@@ -79,15 +115,32 @@ public class CassandraIndexTableHandler {
}
public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
+ return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
+ }
+
+ public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
- applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(updatedFlags.userFlagIterator())),
+ manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
.then();
}
+ private Mono<Void> manageApplicableFlagsOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ return applicableFlagDAO.updateApplicableFlags(mailboxId,
+ updatedFlags.stream()
+ .flatMap(flags -> Iterators.toStream(flags.userFlagIterator()))
+ .collect(Guavate.toImmutableSet()));
+ }
+
+ private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ return Flux.fromIterable(updatedFlags)
+ .concatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags))
+ .then();
+ }
+
private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) {
return deletedMessageDAO.addDeleted(mailboxId, updatedFlags.getUid());
@@ -105,6 +158,19 @@ public class CassandraIndexTableHandler {
return mailboxCounterDAO.decrementUnseenAndCount(mailboxId);
}
+ private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
+ long seenCount = metaData.stream()
+ .map(MessageMetaData::getFlags)
+ .filter(flags -> flags.contains(Flags.Flag.SEEN))
+ .count();
+
+ return mailboxCounterDAO.remove(MailboxCounters.builder()
+ .mailboxId(mailboxId)
+ .count(metaData.size())
+ .unseen(seenCount)
+ .build());
+ }
+
private Mono<Void> incrementCountersOnSave(CassandraId mailboxId, Flags flags) {
if (flags.contains(Flags.Flag.SEEN)) {
return mailboxCounterDAO.incrementCount(mailboxId);
@@ -119,16 +185,35 @@ public class CassandraIndexTableHandler {
return Mono.empty();
}
- private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
- if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
- return mailboxCounterDAO.incrementUnseen(mailboxId);
- }
- if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
- return mailboxCounterDAO.decrementUnseen(mailboxId);
+ private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ int sum = updatedFlags.stream()
+ .mapToInt(flags -> {
+ if (flags.isModifiedToUnset(Flags.Flag.SEEN)) {
+ return 1;
+ }
+ if (flags.isModifiedToSet(Flags.Flag.SEEN)) {
+ return -1;
+ }
+ return 0;
+ })
+ .sum();
+
+ if (sum != 0) {
+ return mailboxCounterDAO.add(MailboxCounters.builder()
+ .mailboxId(mailboxId)
+ .count(0)
+ .unseen(sum)
+ .build());
}
return Mono.empty();
}
+ private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ return Flux.fromIterable(updatedFlags)
+ .concatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags))
+ .then();
+ }
+
private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
if (updatedFlags.isModifiedToUnset(Flags.Flag.RECENT)) {
return mailboxRecentDAO.removeFromRecent(mailboxId, updatedFlags.getUid());
@@ -161,6 +246,12 @@ public class CassandraIndexTableHandler {
return firstUnseenDAO.removeUnread(mailboxId, uid);
}
+ private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ return Flux.fromIterable(updatedFlags)
+ .concatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags))
+ .then();
+ }
+
private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid());
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
index 80ae4ba..460decf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
@@ -136,7 +136,7 @@ public class CassandraMailboxCounterDAO {
.build();
}
- private Mono<Void> add(MailboxCounters counters) {
+ public Mono<Void> add(MailboxCounters counters) {
CassandraId mailboxId = (CassandraId) counters.getMailboxId();
return cassandraAsyncExecutor.executeVoid(
bindWithMailbox(mailboxId, addToCounters)
@@ -144,7 +144,7 @@ public class CassandraMailboxCounterDAO {
.setLong(UNSEEN, counters.getUnseen()));
}
- private Mono<Void> remove(MailboxCounters counters) {
+ public Mono<Void> remove(MailboxCounters counters) {
CassandraId mailboxId = (CassandraId) counters.getMailboxId();
return cassandraAsyncExecutor.executeVoid(
bindWithMailbox(mailboxId, removeToCounters)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 2934984..9fbf32c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -64,6 +64,7 @@ import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
public class CassandraMessageMapper implements MessageMapper {
@@ -139,25 +140,30 @@ public class CassandraMessageMapper implements MessageMapper {
@Override
public void delete(Mailbox mailbox, MailboxMessage message) {
- deleteAsFuture(message)
+ ComposedMessageIdWithMetaData metaData = message.getComposedMessageIdWithMetaData();
+
+ deleteAndHandleIndexUpdates(metaData)
.block();
}
- private Mono<Void> deleteAsFuture(MailboxMessage message) {
- ComposedMessageIdWithMetaData composedMessageIdWithMetaData = message.getComposedMessageIdWithMetaData();
+ private Mono<Void> deleteAndHandleIndexUpdates(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+ ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
+ CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
- return deleteUsingMailboxId(composedMessageIdWithMetaData);
+ return delete(composedMessageIdWithMetaData)
+ .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
}
- private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+ private Mono<Void> delete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId();
CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
MessageUid uid = composedMessageId.getUid();
+
return Flux.merge(
imapUidDAO.delete(messageId, mailboxId),
messageIdDAO.delete(mailboxId, uid))
- .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
+ .then();
}
@Override
@@ -211,25 +217,20 @@ public class CassandraMessageMapper implements MessageMapper {
public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> uids) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return Flux.fromStream(uids.stream())
- .flatMap(messageUid -> expungeOne(mailboxId, messageUid), cassandraConfiguration.getExpungeChunkSize())
- .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
+ return Flux.fromIterable(MessageRange.toRanges(uids))
+ .concatMap(range -> messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited()))
+ .flatMap(this::expungeOne, cassandraConfiguration.getExpungeChunkSize())
+ .collect(Guavate.toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
+ .flatMap(messageMap -> indexTableHandler.updateIndexOnDelete(mailboxId, messageMap.values())
+ .thenReturn(messageMap))
+ .subscribeOn(Schedulers.elastic())
.block();
}
- private Mono<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) {
- return retrieveComposedId(mailboxId, messageUid)
- .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
- .flatMap(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata)
- .map(pair -> pair.toMailboxMessage(idWithMetadata, ImmutableList.of())));
- }
-
- private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
- return messageIdDAO.retrieve(mailboxId, uid)
- .handle((t, sink) ->
- t.ifPresentOrElse(
- sink::next,
- () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)));
+ private Mono<SimpleMailboxMessage> expungeOne(ComposedMessageIdWithMetaData metaData) {
+ return delete(metaData)
+ .then(messageDAO.retrieveMessage(metaData, FetchType.Metadata)
+ .map(pair -> pair.toMailboxMessage(metaData, ImmutableList.of())));
}
@Override
@@ -237,7 +238,7 @@ public class CassandraMessageMapper implements MessageMapper {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = original.getComposedMessageIdWithMetaData();
MessageMetaData messageMetaData = copy(destinationMailbox, original);
- deleteUsingMailboxId(composedMessageIdWithMetaData).block();
+ deleteAndHandleIndexUpdates(composedMessageIdWithMetaData).block();
return messageMetaData;
}
@@ -357,14 +358,12 @@ public class CassandraMessageMapper implements MessageMapper {
}
private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
- return Flux.fromIterable(result.getSucceeded())
- .flatMap(Throwing
- .function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))
- .fallbackTo(failedIndex -> {
- LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedIndex.getUid());
- return Mono.empty();
- }))
- .then(Mono.just(result));
+ return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
+ .onErrorResume(e -> {
+ LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
+ return Mono.empty();
+ })
+ .thenReturn(result);
}
@Override
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
index ae090d1..beb08d5 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
@@ -286,7 +286,7 @@ class CassandraIndexTableHandlerTest {
testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData(
new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID),
- new Flags(),
+ new Flags(Flags.Flag.DELETED),
MODSEQ),
MAILBOX_ID).block();
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index c3c8bc3..8581377 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -59,39 +59,136 @@ class CassandraMessageMapperTest extends MessageMapperTest {
return new CassandraMapperProvider(cassandraCluster.getCassandraCluster());
}
- @Test
- void findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster cassandra) throws MailboxException {
- saveMessages();
+ @Nested
+ class StatementLimitationTests {
+ @Test
+ void deleteMessagesShouldGroupMessageReads(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
- StatementRecorder statementRecorder = new StatementRecorder();
- cassandra.getConf().recordStatements(statementRecorder);
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+ cassandra.getConf().printStatements();
- int limit = 2;
- consume(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Full, limit));
+ messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+ assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatementStartingWith(
+ "SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen," +
+ "flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND ")))
+ .hasSize(1);
+ }
- assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
- "SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments " +
- "FROM messageV2 WHERE messageId=:messageId;")))
- .hasSize(limit);
- }
+ @Test
+ void deleteMessagesShouldGroupCounterUpdates(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
- @Test
- void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException {
- saveMessages();
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+ cassandra.getConf().printStatements();
- StatementRecorder statementRecorder = new StatementRecorder();
- cassandra.getConf().recordStatements(statementRecorder);
+ messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
- messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+ assertThat(statementRecorder.listExecutedStatements(
+ Selector.preparedStatementStartingWith("UPDATE mailboxCounters SET ")))
+ .hasSize(1);
+ }
- assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
- "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;")))
- .hasSize(1);
- }
+ @Test
+ void deleteMessagesShouldNotDeleteMessageNotMarkedAsDeletedInDeletedProjection(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
- private void consume(Iterator<MailboxMessage> inMailbox) {
- ImmutableList.copyOf(inMailbox);
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+ cassandra.getConf().printStatements();
+
+ messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+
+ assertThat(statementRecorder.listExecutedStatements(
+ Selector.preparedStatement("DELETE FROM messageDeleted WHERE mailboxId=:mailboxId AND uid=:uid;")))
+ .isEmpty();
+ }
+
+ @Test
+ void deleteMessagesShouldNotDeleteMessageNotMarkedAsRecentInRecentProjection(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+ cassandra.getConf().printStatements();
+
+ messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+
+ assertThat(statementRecorder.listExecutedStatements(
+ Selector.preparedStatement("DELETE FROM messageDeleted WHERE mailboxId=:mailboxId AND uid=:uid;")))
+ .isEmpty();
+ }
+
+ @Test
+ void deleteMessagesShouldNotDeleteMessageNotMarkedAsUnSeenInFirstUnseenProjection(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+ FlagsUpdateCalculator markAsRead = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD);
+ messageMapper.updateFlags(benwaInboxMailbox, markAsRead, MessageRange.all());
+
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+ cassandra.getConf().printStatements();
+
+ messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+
+ assertThat(statementRecorder.listExecutedStatements(
+ Selector.preparedStatement("DELETE FROM firstUnseen WHERE mailboxId=:mailboxId AND uid=:uid;")))
+ .isEmpty();
+ }
+
+ @Test
+ void updateFlagsShouldUpdateMailboxCountersOnce(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+ cassandra.getConf().printStatements();
+
+ messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+
+
+ assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatementStartingWith(
+ "UPDATE mailboxCounters SET ")))
+ .hasSize(1);
+ }
+
+ @Test
+ void findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+
+ int limit = 2;
+ consume(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Full, limit));
+
+
+ assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
+ "SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments " +
+ "FROM messageV2 WHERE messageId=:messageId;")))
+ .hasSize(limit);
+ }
+
+ @Test
+ void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+
+ StatementRecorder statementRecorder = new StatementRecorder();
+ cassandra.getConf().recordStatements(statementRecorder);
+
+ messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+
+ assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
+ "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;")))
+ .hasSize(1);
+ }
+
+ private void consume(Iterator<MailboxMessage> inMailbox) {
+ ImmutableList.copyOf(inMailbox);
+ }
}
@Nested
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org