You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 06:50:47 UTC

[james-project] 03/04: JAMES-3265 Reduce statement count upon CassandraMessageMapper::delete & Flags updates

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a74ca3548cb4675052aa7ce1cd04a742cbf72510
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jun 24 13:18:40 2020 +0700

    JAMES-3265 Reduce statement count upon CassandraMessageMapper::delete & Flags updates
    
    This effectively optimizes IMAP EXPUNGE & STORE operations, dramatically limit counter writes,
    and should prevent uneeded large tumbstones ranges creations for deletedMessages and recentMessages projections.
---
 .../cassandra/mail/CassandraIndexTableHandler.java | 107 +++++++++++++--
 .../cassandra/mail/CassandraMailboxCounterDAO.java |   4 +-
 .../cassandra/mail/CassandraMessageMapper.java     |  61 +++++----
 .../mail/CassandraIndexTableHandlerTest.java       |   2 +-
 .../cassandra/mail/CassandraMessageMapperTest.java | 145 +++++++++++++++++----
 5 files changed, 253 insertions(+), 66 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 8fc3d91..47fd1eb 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -19,15 +19,23 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import java.util.Collection;
+import java.util.List;
+
 import javax.inject.Inject;
 import javax.mail.Flags;
 
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MailboxCounters;
+import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.util.streams.Iterators;
 
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Flux;
@@ -61,11 +69,39 @@ public class CassandraIndexTableHandler {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 updateFirstUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageIdWithMetaData.getComposedMessageId().getUid()),
                 mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()),
-                deletedMessageDAO.removeDeleted(mailboxId, uid),
+                updateDeletedMessageProjectionOnDelete(mailboxId, uid, composedMessageIdWithMetaData.getFlags()),
                 decrementCountersOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags()))
             .then();
     }
 
+    public Mono<Void> updateIndexOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                Flux.fromIterable(metaData)
+                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid())),
+                Flux.fromIterable(metaData)
+                    .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags())),
+                Flux.fromIterable(metaData)
+                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags())),
+                decrementCountersOnDelete(mailboxId, metaData))
+            .then();
+    }
+
+    private Mono<Void> updateRecentOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) {
+        if (flags.contains(Flags.Flag.RECENT)) {
+            return mailboxRecentDAO.removeFromRecent(mailboxId, uid);
+        }
+
+        return Mono.empty();
+    }
+
+    private Mono<Void> updateDeletedMessageProjectionOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) {
+        if (flags.contains(Flags.Flag.DELETED)) {
+            return deletedMessageDAO.removeDeleted(mailboxId, uid);
+        }
+
+        return Mono.empty();
+    }
+
     public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) {
         Flags flags = message.createFlags();
 
@@ -79,15 +115,32 @@ public class CassandraIndexTableHandler {
     }
 
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
+        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
+    }
+
+    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
                 manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
                 updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
-                applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(updatedFlags.userFlagIterator())),
+                manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
                 updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
             .then();
     }
 
+    private Mono<Void> manageApplicableFlagsOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        return applicableFlagDAO.updateApplicableFlags(mailboxId,
+            updatedFlags.stream()
+                .flatMap(flags -> Iterators.toStream(flags.userFlagIterator()))
+                .collect(Guavate.toImmutableSet()));
+    }
+
+    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        return Flux.fromIterable(updatedFlags)
+            .concatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags))
+            .then();
+    }
+
     private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
         if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) {
             return deletedMessageDAO.addDeleted(mailboxId, updatedFlags.getUid());
@@ -105,6 +158,19 @@ public class CassandraIndexTableHandler {
         return mailboxCounterDAO.decrementUnseenAndCount(mailboxId);
     }
 
+    private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
+        long seenCount = metaData.stream()
+            .map(MessageMetaData::getFlags)
+            .filter(flags -> flags.contains(Flags.Flag.SEEN))
+            .count();
+
+        return mailboxCounterDAO.remove(MailboxCounters.builder()
+            .mailboxId(mailboxId)
+            .count(metaData.size())
+            .unseen(seenCount)
+            .build());
+    }
+
     private Mono<Void> incrementCountersOnSave(CassandraId mailboxId, Flags flags) {
         if (flags.contains(Flags.Flag.SEEN)) {
             return mailboxCounterDAO.incrementCount(mailboxId);
@@ -119,16 +185,35 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
-    private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
-            return mailboxCounterDAO.incrementUnseen(mailboxId);
-        }
-        if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
-            return mailboxCounterDAO.decrementUnseen(mailboxId);
+    private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId,  List<UpdatedFlags> updatedFlags) {
+        int sum = updatedFlags.stream()
+            .mapToInt(flags -> {
+                if (flags.isModifiedToUnset(Flags.Flag.SEEN)) {
+                    return 1;
+                }
+                if (flags.isModifiedToSet(Flags.Flag.SEEN)) {
+                    return -1;
+                }
+                return 0;
+            })
+            .sum();
+
+        if (sum != 0) {
+            return mailboxCounterDAO.add(MailboxCounters.builder()
+                .mailboxId(mailboxId)
+                .count(0)
+                .unseen(sum)
+                .build());
         }
         return Mono.empty();
     }
 
+    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        return Flux.fromIterable(updatedFlags)
+            .concatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags))
+            .then();
+    }
+
     private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
         if (updatedFlags.isModifiedToUnset(Flags.Flag.RECENT)) {
             return mailboxRecentDAO.removeFromRecent(mailboxId, updatedFlags.getUid());
@@ -161,6 +246,12 @@ public class CassandraIndexTableHandler {
         return firstUnseenDAO.removeUnread(mailboxId, uid);
     }
 
+    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        return Flux.fromIterable(updatedFlags)
+            .concatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags))
+            .then();
+    }
+
     private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
         if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
             return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid());
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
index 80ae4ba..460decf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
@@ -136,7 +136,7 @@ public class CassandraMailboxCounterDAO {
             .build();
     }
 
-    private Mono<Void> add(MailboxCounters counters) {
+    public Mono<Void> add(MailboxCounters counters) {
         CassandraId mailboxId = (CassandraId) counters.getMailboxId();
         return cassandraAsyncExecutor.executeVoid(
             bindWithMailbox(mailboxId, addToCounters)
@@ -144,7 +144,7 @@ public class CassandraMailboxCounterDAO {
                 .setLong(UNSEEN, counters.getUnseen()));
     }
 
-    private Mono<Void> remove(MailboxCounters counters) {
+    public Mono<Void> remove(MailboxCounters counters) {
         CassandraId mailboxId = (CassandraId) counters.getMailboxId();
         return cassandraAsyncExecutor.executeVoid(
             bindWithMailbox(mailboxId, removeToCounters)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 2934984..9fbf32c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -64,6 +64,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.retry.Retry;
 
 public class CassandraMessageMapper implements MessageMapper {
@@ -139,25 +140,30 @@ public class CassandraMessageMapper implements MessageMapper {
 
     @Override
     public void delete(Mailbox mailbox, MailboxMessage message) {
-        deleteAsFuture(message)
+        ComposedMessageIdWithMetaData metaData = message.getComposedMessageIdWithMetaData();
+
+        deleteAndHandleIndexUpdates(metaData)
             .block();
     }
 
-    private Mono<Void> deleteAsFuture(MailboxMessage message) {
-        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = message.getComposedMessageIdWithMetaData();
+    private Mono<Void> deleteAndHandleIndexUpdates(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+        ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
+        CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
 
-        return deleteUsingMailboxId(composedMessageIdWithMetaData);
+        return delete(composedMessageIdWithMetaData)
+             .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
     }
 
-    private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
+    private Mono<Void> delete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
         ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
         CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId();
         CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
         MessageUid uid = composedMessageId.getUid();
+
         return Flux.merge(
                 imapUidDAO.delete(messageId, mailboxId),
                 messageIdDAO.delete(mailboxId, uid))
-            .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
+                .then();
     }
 
     @Override
@@ -211,25 +217,20 @@ public class CassandraMessageMapper implements MessageMapper {
     public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> uids) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return Flux.fromStream(uids.stream())
-            .flatMap(messageUid -> expungeOne(mailboxId, messageUid), cassandraConfiguration.getExpungeChunkSize())
-            .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
+        return Flux.fromIterable(MessageRange.toRanges(uids))
+            .concatMap(range -> messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited()))
+            .flatMap(this::expungeOne, cassandraConfiguration.getExpungeChunkSize())
+            .collect(Guavate.toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData))
+            .flatMap(messageMap -> indexTableHandler.updateIndexOnDelete(mailboxId, messageMap.values())
+                .thenReturn(messageMap))
+            .subscribeOn(Schedulers.elastic())
             .block();
     }
 
-    private Mono<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) {
-        return retrieveComposedId(mailboxId, messageUid)
-            .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
-            .flatMap(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata)
-                .map(pair -> pair.toMailboxMessage(idWithMetadata, ImmutableList.of())));
-    }
-
-    private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) {
-        return messageIdDAO.retrieve(mailboxId, uid)
-            .handle((t, sink) ->
-                t.ifPresentOrElse(
-                    sink::next,
-                    () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid)));
+    private Mono<SimpleMailboxMessage> expungeOne(ComposedMessageIdWithMetaData metaData) {
+        return delete(metaData)
+            .then(messageDAO.retrieveMessage(metaData, FetchType.Metadata)
+                .map(pair -> pair.toMailboxMessage(metaData, ImmutableList.of())));
     }
 
     @Override
@@ -237,7 +238,7 @@ public class CassandraMessageMapper implements MessageMapper {
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = original.getComposedMessageIdWithMetaData();
 
         MessageMetaData messageMetaData = copy(destinationMailbox, original);
-        deleteUsingMailboxId(composedMessageIdWithMetaData).block();
+        deleteAndHandleIndexUpdates(composedMessageIdWithMetaData).block();
 
         return messageMetaData;
     }
@@ -357,14 +358,12 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
-        return Flux.fromIterable(result.getSucceeded())
-            .flatMap(Throwing
-                .function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags))
-                .fallbackTo(failedIndex -> {
-                    LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedIndex.getUid());
-                    return Mono.empty();
-                }))
-            .then(Mono.just(result));
+        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
+            .onErrorResume(e -> {
+                LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
+                return Mono.empty();
+            })
+            .thenReturn(result);
     }
 
     @Override
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
index ae090d1..beb08d5 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
@@ -286,7 +286,7 @@ class CassandraIndexTableHandlerTest {
 
         testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData(
                 new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID),
-                new Flags(),
+                new Flags(Flags.Flag.DELETED),
                 MODSEQ),
             MAILBOX_ID).block();
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index c3c8bc3..8581377 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -59,39 +59,136 @@ class CassandraMessageMapperTest extends MessageMapperTest {
         return new CassandraMapperProvider(cassandraCluster.getCassandraCluster());
     }
 
-    @Test
-    void findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster cassandra) throws MailboxException {
-        saveMessages();
+    @Nested
+    class StatementLimitationTests {
+        @Test
+        void deleteMessagesShouldGroupMessageReads(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
 
-        StatementRecorder statementRecorder = new StatementRecorder();
-        cassandra.getConf().recordStatements(statementRecorder);
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+            cassandra.getConf().printStatements();
 
-        int limit = 2;
-        consume(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Full, limit));
+            messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
 
+            assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatementStartingWith(
+                "SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen," +
+                    "flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND ")))
+                .hasSize(1);
+        }
 
-        assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
-            "SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments " +
-                "FROM messageV2 WHERE messageId=:messageId;")))
-            .hasSize(limit);
-    }
+        @Test
+        void deleteMessagesShouldGroupCounterUpdates(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
 
-    @Test
-    void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException {
-        saveMessages();
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+            cassandra.getConf().printStatements();
 
-        StatementRecorder statementRecorder = new StatementRecorder();
-        cassandra.getConf().recordStatements(statementRecorder);
+            messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
 
-        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+            assertThat(statementRecorder.listExecutedStatements(
+                Selector.preparedStatementStartingWith("UPDATE mailboxCounters SET ")))
+                .hasSize(1);
+        }
 
-        assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
-            "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;")))
-            .hasSize(1);
-    }
+        @Test
+        void deleteMessagesShouldNotDeleteMessageNotMarkedAsDeletedInDeletedProjection(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
 
-    private void consume(Iterator<MailboxMessage> inMailbox) {
-        ImmutableList.copyOf(inMailbox);
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+            cassandra.getConf().printStatements();
+
+            messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+
+            assertThat(statementRecorder.listExecutedStatements(
+                Selector.preparedStatement("DELETE FROM messageDeleted WHERE mailboxId=:mailboxId AND uid=:uid;")))
+                .isEmpty();
+        }
+
+        @Test
+        void deleteMessagesShouldNotDeleteMessageNotMarkedAsRecentInRecentProjection(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+            cassandra.getConf().printStatements();
+
+            messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+
+            assertThat(statementRecorder.listExecutedStatements(
+                Selector.preparedStatement("DELETE FROM messageDeleted WHERE mailboxId=:mailboxId AND uid=:uid;")))
+                .isEmpty();
+        }
+
+        @Test
+        void deleteMessagesShouldNotDeleteMessageNotMarkedAsUnSeenInFirstUnseenProjection(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+            FlagsUpdateCalculator markAsRead = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD);
+            messageMapper.updateFlags(benwaInboxMailbox, markAsRead, MessageRange.all());
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+            cassandra.getConf().printStatements();
+
+            messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+
+            assertThat(statementRecorder.listExecutedStatements(
+                Selector.preparedStatement("DELETE FROM firstUnseen WHERE mailboxId=:mailboxId AND uid=:uid;")))
+                .isEmpty();
+        }
+
+        @Test
+        void updateFlagsShouldUpdateMailboxCountersOnce(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+            cassandra.getConf().printStatements();
+
+            messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+
+
+            assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatementStartingWith(
+                "UPDATE mailboxCounters SET ")))
+                .hasSize(1);
+        }
+
+        @Test
+        void findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+
+            int limit = 2;
+            consume(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Full, limit));
+
+
+            assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
+                "SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments " +
+                    "FROM messageV2 WHERE messageId=:messageId;")))
+                .hasSize(limit);
+        }
+
+        @Test
+        void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+
+            messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all());
+
+            assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
+                "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;")))
+                .hasSize(1);
+        }
+
+        private void consume(Iterator<MailboxMessage> inMailbox) {
+            ImmutableList.copyOf(inMailbox);
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org