You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/26 02:39:51 UTC

[james-project] branch master updated (93a32bb5e7 -> b145cdc8c0)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 93a32bb5e7 JAMES-3758 Documentation of deleting old messages via Web Admin interface, and companion Expires mailet.
     new b618e81a3d JAMES-3765: Improve some IMAP commands (STORE, COPY, MOVE) performance
     new 31b9d6e746 JAMES-3765: Optimize update deleted messages using Cassandra Batch Statement
     new b05b1d2df1 JAMES-3765: Optimize update RECENT using Cassandra Batch Statement
     new b145cdc8c0 JAMES-3765: Optimize update UNSEEN using Cassandra Batch Statement

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/mail/CassandraDeletedMessageDAO.java |  50 +++++++
 .../cassandra/mail/CassandraFirstUnseenDAO.java    |  51 +++++++
 .../cassandra/mail/CassandraIndexTableHandler.java | 162 +++++++++++++--------
 .../cassandra/mail/CassandraMailboxRecentsDAO.java |  50 +++++++
 .../cassandra/mail/CassandraMessageMapper.java     |  17 ++-
 .../mail/CassandraDeletedMessageDAOTest.java       |  16 +-
 .../mail/CassandraFirstUnseenDAOTest.java          |  19 +--
 .../mail/CassandraMailboxRecentDAOTest.java        |  18 +--
 8 files changed, 283 insertions(+), 100 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/04: JAMES-3765: Optimize update UNSEEN using Cassandra Batch Statement

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b145cdc8c040280cd379fd514804fdd41ae3f918
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu May 12 13:44:33 2022 +0700

    JAMES-3765: Optimize update UNSEEN using Cassandra Batch Statement
---
 .../cassandra/mail/CassandraFirstUnseenDAO.java    | 51 ++++++++++++++++++
 .../cassandra/mail/CassandraIndexTableHandler.java | 63 +++++++++++++---------
 .../cassandra/mail/CassandraMessageMapper.java     |  4 +-
 .../mail/CassandraFirstUnseenDAOTest.java          | 19 +++----
 4 files changed, 98 insertions(+), 39 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
index 6c5c2d91b9..c58499ea4d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAO.java
@@ -29,19 +29,27 @@ import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable
 import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraFirstUnseenTable.UID;
 
+import java.util.List;
+import java.util.stream.Stream;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraFirstUnseenDAO {
+    private static final int BATCH_STATEMENT_WINDOW = 1024;
+    private static final int LOW_CONCURRENCY = 2;
+
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement addStatement;
     private final PreparedStatement deleteStatement;
@@ -100,12 +108,55 @@ public class CassandraFirstUnseenDAO {
                 .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> addUnread(CassandraId mailboxId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(
+                addStatement.bind()
+                    .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                    .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(addStatement.bind()
+                        .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeUnread(CassandraId cassandraId, MessageUid uid) {
         return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid())
             .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> removeUnread(CassandraId mailboxId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(deleteStatement.bind()
+                        .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeAll(CassandraId cassandraId) {
         return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid()));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index b1a5f45758..b16a020e54 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
-
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -78,8 +76,7 @@ public class CassandraIndexTableHandler {
 
     public Mono<Void> updateIndexOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
+                updateFirstUnseenOnDeleteWithMetadata(mailboxId, metaData),
                 updateRecentOnDeleteWithMetadata(mailboxId, metaData),
                 updateDeletedMessageProjectionOnDeleteWithMetadata(mailboxId, metaData),
                 decrementCountersOnDelete(mailboxId, metaData))
@@ -88,8 +85,7 @@ public class CassandraIndexTableHandler {
 
     public Mono<Void> updateIndexOnDeleteComposedId(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> metaData) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
+                updateFirstUnseenOnDelete(mailboxId, metaData),
                 updateRecentOnDeleteWithComposeId(mailboxId, metaData),
                 updateDeletedMessageProjectionOnDelete(mailboxId, metaData),
             decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
@@ -142,7 +138,7 @@ public class CassandraIndexTableHandler {
             .then();
     }
 
-    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId, int reactorConcurrency) {
+    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId) {
         ImmutableSet<String> userFlags = messages.stream()
             .flatMap(message -> Stream.of(message.createFlags().getUserFlags()))
             .collect(ImmutableSet.toImmutableSet());
@@ -152,8 +148,7 @@ public class CassandraIndexTableHandler {
 
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 checkDeletedOnAdd(mailboxId, messages),
-                Flux.fromIterable(messages)
-                    .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
+                updateFirstUnseenOnAdd(mailboxId, messages),
                 addRecentOnSave(mailboxId, messages),
                 incrementCountersOnSave(mailboxId, flags),
                 applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
@@ -161,15 +156,14 @@ public class CassandraIndexTableHandler {
     }
 
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        int fairConcurrency = 4;
-        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags), fairConcurrency);
+        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
     }
 
-    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
+    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
                 manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
-                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
+                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
                 manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
                 updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
             .then();
@@ -303,6 +297,12 @@ public class CassandraIndexTableHandler {
         return firstUnseenDAO.addUnread(mailboxId, uid);
     }
 
+    private Mono<Void> updateFirstUnseenOnAdd(CassandraId mailboxId, Collection<MailboxMessage> mailboxMessages) {
+        return firstUnseenDAO.addUnread(mailboxId, mailboxMessages.stream().filter(mailboxMessage -> !mailboxMessage.createFlags().contains(Flags.Flag.SEEN))
+            .map(MailboxMessage::getUid)
+            .collect(Collectors.toList()));
+    }
+
     private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) {
         if (flags.contains(Flags.Flag.DELETED)) {
             return deletedMessageDAO.addDeleted(mailboxId, uid);
@@ -324,19 +324,32 @@ public class CassandraIndexTableHandler {
         return firstUnseenDAO.removeUnread(mailboxId, uid);
     }
 
-    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
-        return Flux.fromIterable(updatedFlags)
-            .flatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
-            .then();
+    private Mono<Void> updateFirstUnseenOnDeleteWithMetadata(CassandraId mailboxId, Collection<MessageMetaData> metaDatas) {
+        return firstUnseenDAO.removeUnread(mailboxId, metaDatas.stream().filter(metaData -> !metaData.getFlags().contains(Flags.Flag.SEEN))
+            .map(MessageMetaData::getUid)
+            .collect(Collectors.toList()));
     }
 
-    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) {
-            return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid());
-        }
-        if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) {
-            return firstUnseenDAO.removeUnread(mailboxId, updatedFlags.getUid());
-        }
-        return Mono.empty();
+    private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> composedMessageIdWithMetaData) {
+        return firstUnseenDAO.removeUnread(mailboxId, composedMessageIdWithMetaData.stream().filter(composeId -> !composeId.getFlags().contains(Flags.Flag.SEEN))
+            .map(composeId -> composeId.getComposedMessageId().getUid())
+            .collect(Collectors.toList()));
+    }
+
+    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        ImmutableList.Builder<MessageUid> addUnreadUidsBuilder = ImmutableList.builder();
+        ImmutableList.Builder<MessageUid> removeUnreadUidsBuilder = ImmutableList.builder();
+        updatedFlags.forEach(flag -> {
+                if (flag.isModifiedToUnset(Flags.Flag.SEEN)) {
+                    addUnreadUidsBuilder.add(flag.getUid());
+                } else if (flag.isModifiedToSet(Flags.Flag.SEEN)) {
+                    removeUnreadUidsBuilder.add(flag.getUid());
+                }
+            });
+
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                firstUnseenDAO.addUnread(mailboxId, addUnreadUidsBuilder.build()),
+                firstUnseenDAO.removeUnread(mailboxId, removeUnreadUidsBuilder.build()))
+            .then();
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 06fcdf1994..54b9c18bed 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -521,7 +521,7 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
-        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded(), reactorConcurrency)
+        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
             .onErrorResume(e -> {
                 LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
                 return Mono.empty();
@@ -644,7 +644,7 @@ public class CassandraMessageMapper implements MessageMapper {
             .concatMap(id -> imapUidDAO.insert(id).thenReturn(id))
             .flatMap(id -> messageIdDAO.insert(id)
                 .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), reactorConcurrency)
-            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId, reactorConcurrency));
+            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId));
     }
 
     private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
index a3a31a07d4..9d7aec0236 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraFirstUnseenDAOTest.java
@@ -22,6 +22,8 @@ package org.apache.james.mailbox.cassandra.mail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
+import java.util.List;
+
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.mailbox.MessageUid;
@@ -72,8 +74,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void removeAllShouldDeleteAllUidEntries() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeAll(MAILBOX_ID).block();
 
@@ -88,9 +89,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void retrieveFirstUnreadShouldReturnLowestUnreadUid() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
             .isEqualByComparingTo(UID_1);
@@ -114,9 +113,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void retrieveFirstUnreadShouldBeOrderIndependent() {
-        testee.addUnread(MAILBOX_ID, UID_2).block();
-
-        testee.addUnread(MAILBOX_ID, UID_1).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         assertThat(testee.retrieveFirstUnread(MAILBOX_ID).block())
             .isEqualByComparingTo(UID_1);
@@ -152,8 +149,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void removeUnreadShouldRemoveLastUnread() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeUnread(MAILBOX_ID, UID_2).block();
 
@@ -163,8 +159,7 @@ class CassandraFirstUnseenDAOTest {
 
     @Test
     void removeUnreadShouldHaveNoEffectWhenNotLast() {
-        testee.addUnread(MAILBOX_ID, UID_1).block();
-        testee.addUnread(MAILBOX_ID, UID_2).block();
+        testee.addUnread(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeUnread(MAILBOX_ID, UID_1).block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/04: JAMES-3765: Improve some IMAP commands (STORE, COPY, MOVE) performance

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b618e81a3d3e0c59f89aed56c3cb065c8685bab9
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Wed May 11 16:57:24 2022 +0700

    JAMES-3765: Improve some IMAP commands (STORE, COPY, MOVE) performance
---
 .../cassandra/mail/CassandraIndexTableHandler.java | 32 +++++++++++-----------
 .../cassandra/mail/CassandraMessageMapper.java     | 21 ++++++++++----
 2 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index a3b5410963..5e2cd75fca 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -129,8 +129,7 @@ public class CassandraIndexTableHandler {
             .then();
     }
 
-    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId) {
-        int lowConcurrency = 2;
+    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId, int reactorConcurrency) {
         ImmutableSet<String> userFlags = messages.stream()
             .flatMap(message -> Stream.of(message.createFlags().getUserFlags()))
             .collect(ImmutableSet.toImmutableSet());
@@ -140,27 +139,28 @@ public class CassandraIndexTableHandler {
 
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 Flux.fromIterable(messages)
-                    .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+                    .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
                 Flux.fromIterable(messages)
-                    .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+                    .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
                 Flux.fromIterable(messages)
-                    .flatMap(message -> addRecentOnSave(mailboxId, message), lowConcurrency),
+                    .flatMap(message -> addRecentOnSave(mailboxId, message), reactorConcurrency),
                 incrementCountersOnSave(mailboxId, flags),
                 applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
             .then();
     }
 
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
+        int fairConcurrency = 4;
+        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags), fairConcurrency);
     }
 
-    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
-                manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
-                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
+                manageRecentOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
+                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
                 manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
-                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
+                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency))
             .then();
     }
 
@@ -171,9 +171,9 @@ public class CassandraIndexTableHandler {
                 .collect(ImmutableSet.toImmutableSet()));
     }
 
-    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.fromIterable(updatedFlags)
-            .concatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags))
+            .flatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
             .then();
     }
 
@@ -261,9 +261,9 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
-    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.fromIterable(updatedFlags)
-            .concatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags))
+            .flatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
             .then();
     }
 
@@ -299,9 +299,9 @@ public class CassandraIndexTableHandler {
         return firstUnseenDAO.removeUnread(mailboxId, uid);
     }
 
-    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.fromIterable(updatedFlags)
-            .concatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags))
+            .flatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
             .then();
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 9bb21f4a9e..06fcdf1994 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -107,6 +107,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private final CassandraConfiguration cassandraConfiguration;
     private final RecomputeMailboxCountersService recomputeMailboxCountersService;
     private final SecureRandom secureRandom;
+    private final int reactorConcurrency;
 
     public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider modSeqProvider,
                                   CassandraAttachmentMapper attachmentMapper,
@@ -133,6 +134,7 @@ public class CassandraMessageMapper implements MessageMapper {
         this.cassandraConfiguration = cassandraConfiguration;
         this.recomputeMailboxCountersService = recomputeMailboxCountersService;
         this.secureRandom = new SecureRandom();
+        this.reactorConcurrency = evaluateReactorConcurrency();
     }
 
     @Override
@@ -218,7 +220,7 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
 
         return Flux.fromIterable(composedMessageIdWithMetaData)
-             .concatMap(this::delete)
+             .flatMap(this::delete, reactorConcurrency)
              .then(indexTableHandler.updateIndexOnDeleteComposedId(mailboxId, composedMessageIdWithMetaData));
     }
 
@@ -508,7 +510,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
         return computeNewModSeq(mailboxId)
             .flatMapMany(newModSeq -> toBeUpdated
-            .concatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata)))
+            .flatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata), reactorConcurrency))
             .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
             .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
     }
@@ -519,7 +521,7 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
-        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
+        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded(), reactorConcurrency)
             .onErrorResume(e -> {
                 LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
                 return Mono.empty();
@@ -637,13 +639,12 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<Void> insertIds(Collection<MailboxMessage> messages, CassandraId mailboxId) {
-        int lowConcurrency = 4;
         return Flux.fromIterable(messages)
             .map(message -> computeId(message, mailboxId))
             .concatMap(id -> imapUidDAO.insert(id).thenReturn(id))
             .flatMap(id -> messageIdDAO.insert(id)
-                .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), lowConcurrency)
-            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId));
+                .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), reactorConcurrency)
+            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId, reactorConcurrency));
     }
 
     private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
@@ -707,4 +708,12 @@ public class CassandraMessageMapper implements MessageMapper {
                 }
             });
     }
+
+    private int evaluateReactorConcurrency() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            // Prevent parallel execution to prevent CAS contention because of LightWeight transactions
+            return 1;
+        }
+        return 4;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/04: JAMES-3765: Optimize update deleted messages using Cassandra Batch Statement

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 31b9d6e746de34ce3365790d5dc3c3680ead9717
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu May 12 12:18:03 2022 +0700

    JAMES-3765: Optimize update deleted messages using Cassandra Batch Statement
---
 .../cassandra/mail/CassandraDeletedMessageDAO.java | 50 +++++++++++++++++++
 .../cassandra/mail/CassandraIndexTableHandler.java | 57 ++++++++++++++--------
 .../mail/CassandraDeletedMessageDAOTest.java       | 16 ++----
 3 files changed, 91 insertions(+), 32 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
index 69e301b6b4..01abf65271 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAO.java
@@ -30,6 +30,9 @@ import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTa
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.TABLE_NAME;
 import static org.apache.james.mailbox.cassandra.table.CassandraDeletedMessageTable.UID;
 
+import java.util.List;
+import java.util.stream.Stream;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -37,9 +40,11 @@ import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.model.MessageRange;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.google.common.collect.Lists;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -47,6 +52,8 @@ import reactor.core.publisher.Mono;
 public class CassandraDeletedMessageDAO {
     private static final String UID_TO = "uid_to";
     private static final String UID_FROM = "uid_from";
+    private static final int BATCH_STATEMENT_WINDOW = 1024;
+    private static final int LOW_CONCURRENCY = 2;
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement addStatement;
@@ -123,12 +130,55 @@ public class CassandraDeletedMessageDAO {
                 .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> addDeleted(CassandraId cassandraId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(
+                addStatement.bind()
+                    .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                    .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(addStatement.bind()
+                        .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeDeleted(CassandraId cassandraId, MessageUid uid) {
         return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid())
             .setLong(UID, uid.asLong()));
     }
 
+    public Mono<Void> removeDeleted(CassandraId cassandraId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+                .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                .setLong(UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(deleteStatement.bind()
+                        .setUUID(MAILBOX_ID, cassandraId.asUuid())
+                        .setLong(UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> removeAll(CassandraId cassandraId) {
         return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
             .setUUID(MAILBOX_ID, cassandraId.asUuid()));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 5e2cd75fca..27682cc714 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -23,6 +23,7 @@ import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
@@ -81,8 +82,7 @@ public class CassandraIndexTableHandler {
                     .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
                 Flux.fromIterable(metaData)
                     .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+                updateDeletedMessageProjectionOnDeleteWithMetadata(mailboxId, metaData),
                 decrementCountersOnDelete(mailboxId, metaData))
             .then();
     }
@@ -93,8 +93,7 @@ public class CassandraIndexTableHandler {
                     .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
                 Flux.fromIterable(metaData)
                     .flatMap(message -> updateRecentOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+                updateDeletedMessageProjectionOnDelete(mailboxId, metaData),
             decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
                 .map(ComposedMessageIdWithMetaData::getFlags)
                 .collect(ImmutableList.toImmutableList())))
@@ -117,6 +116,18 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
+    private Mono<Void> updateDeletedMessageProjectionOnDelete(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> metaDatas) {
+        return deletedMessageDAO.removeDeleted(mailboxId, metaDatas.stream().filter(composedId -> composedId.getFlags().contains(Flags.Flag.DELETED))
+            .map(composedId -> composedId.getComposedMessageId().getUid())
+            .collect(Collectors.toList()));
+    }
+
+    private Mono<Void> updateDeletedMessageProjectionOnDeleteWithMetadata(CassandraId mailboxId, Collection<MessageMetaData> metaDatas) {
+        return deletedMessageDAO.removeDeleted(mailboxId, metaDatas.stream().filter(metaData -> metaData.getFlags().contains(Flags.Flag.DELETED))
+            .map(MessageMetaData::getUid)
+            .collect(Collectors.toList()));
+    }
+
     public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) {
         Flags flags = message.createFlags();
 
@@ -138,8 +149,7 @@ public class CassandraIndexTableHandler {
             .collect(ImmutableList.toImmutableList());
 
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
-                Flux.fromIterable(messages)
-                    .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
+                checkDeletedOnAdd(mailboxId, messages),
                 Flux.fromIterable(messages)
                     .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
                 Flux.fromIterable(messages)
@@ -160,7 +170,7 @@ public class CassandraIndexTableHandler {
                 manageRecentOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
                 updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
                 manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
-                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency))
+                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
             .then();
     }
 
@@ -171,20 +181,21 @@ public class CassandraIndexTableHandler {
                 .collect(ImmutableSet.toImmutableSet()));
     }
 
-    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
-        return Flux.fromIterable(updatedFlags)
-            .flatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
-            .then();
-    }
+    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        ImmutableList.Builder<MessageUid> addDeletedUidsBuilder = ImmutableList.builder();
+        ImmutableList.Builder<MessageUid> removeDeletedUidsBuilder = ImmutableList.builder();
+        updatedFlags.forEach(flag -> {
+                if (flag.isModifiedToSet(Flags.Flag.DELETED)) {
+                    addDeletedUidsBuilder.add(flag.getUid());
+                } else if (flag.isModifiedToUnset(Flags.Flag.DELETED)) {
+                    removeDeletedUidsBuilder.add(flag.getUid());
+                }
+            });
 
-    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) {
-            return deletedMessageDAO.addDeleted(mailboxId, updatedFlags.getUid());
-        } else if (updatedFlags.isModifiedToUnset(Flags.Flag.DELETED)) {
-            return deletedMessageDAO.removeDeleted(mailboxId, updatedFlags.getUid());
-        } else {
-            return Mono.empty();
-        }
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+            deletedMessageDAO.addDeleted(mailboxId, addDeletedUidsBuilder.build()),
+            deletedMessageDAO.removeDeleted(mailboxId, removeDeletedUidsBuilder.build()))
+            .then();
     }
 
     private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Flags flags) {
@@ -292,6 +303,12 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
+    private Mono<Void> checkDeletedOnAdd(CassandraId mailboxId, Collection<MailboxMessage> mailboxMessages) {
+        return deletedMessageDAO.addDeleted(mailboxId, mailboxMessages.stream().filter(message -> message.createFlags().contains(Flags.Flag.DELETED))
+            .map(MailboxMessage::getUid)
+            .collect(Collectors.toList()));
+    }
+
     private Mono<Void> updateFirstUnseenOnDelete(CassandraId mailboxId, Flags flags, MessageUid uid) {
         if (flags.contains(Flags.Flag.SEEN)) {
             return Mono.empty();
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
index 87cc6a40c6..2bb4f3b790 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraDeletedMessageDAOTest.java
@@ -66,8 +66,7 @@ class CassandraDeletedMessageDAOTest {
 
     @Test
     void addDeletedMessageShouldThenBeReportedAsDeletedMessage() {
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         List<MessageUid> result = testee.retrieveDeletedMessage(MAILBOX_ID, MessageRange.all())
                 .collectList()
@@ -78,8 +77,7 @@ class CassandraDeletedMessageDAOTest {
 
     @Test
     void retrieveDeletedMessageShouldNotReturnDeletedEntries() {
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2)).block();
 
         testee.removeAll(MAILBOX_ID).block();
 
@@ -122,8 +120,7 @@ class CassandraDeletedMessageDAOTest {
 
     @Test
     void removeDeletedMessageShouldNotAffectOtherMessage() {
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_2, UID_1)).block();
 
         testee.removeDeleted(MAILBOX_ID, UID_1).block();
 
@@ -150,12 +147,7 @@ class CassandraDeletedMessageDAOTest {
     }
 
     private void addMessageForRetrieveTest() {
-        testee.addDeleted(MAILBOX_ID, UID_1).block();
-        testee.addDeleted(MAILBOX_ID, UID_2).block();
-        testee.addDeleted(MAILBOX_ID, UID_3).block();
-        testee.addDeleted(MAILBOX_ID, UID_4).block();
-        testee.addDeleted(MAILBOX_ID, UID_7).block();
-        testee.addDeleted(MAILBOX_ID, UID_8).block();
+        testee.addDeleted(MAILBOX_ID, List.of(UID_1, UID_2, UID_3, UID_4, UID_7, UID_8)).block();
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/04: JAMES-3765: Optimize update RECENT using Cassandra Batch Statement

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b05b1d2df16d09ad6bdc8c082103285873bc74ac
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Thu May 12 13:14:18 2022 +0700

    JAMES-3765: Optimize update RECENT using Cassandra Batch Statement
---
 .../cassandra/mail/CassandraIndexTableHandler.java | 58 ++++++++++++----------
 .../cassandra/mail/CassandraMailboxRecentsDAO.java | 50 +++++++++++++++++++
 .../mail/CassandraMailboxRecentDAOTest.java        | 18 +++----
 3 files changed, 91 insertions(+), 35 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 27682cc714..b1a5f45758 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -80,8 +80,7 @@ public class CassandraIndexTableHandler {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 Flux.fromIterable(metaData)
                     .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid()), DEFAULT_CONCURRENCY),
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+                updateRecentOnDeleteWithMetadata(mailboxId, metaData),
                 updateDeletedMessageProjectionOnDeleteWithMetadata(mailboxId, metaData),
                 decrementCountersOnDelete(mailboxId, metaData))
             .then();
@@ -91,8 +90,7 @@ public class CassandraIndexTableHandler {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 Flux.fromIterable(metaData)
                     .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
-                Flux.fromIterable(metaData)
-                    .flatMap(message -> updateRecentOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+                updateRecentOnDeleteWithComposeId(mailboxId, metaData),
                 updateDeletedMessageProjectionOnDelete(mailboxId, metaData),
             decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
                 .map(ComposedMessageIdWithMetaData::getFlags)
@@ -100,12 +98,16 @@ public class CassandraIndexTableHandler {
             .then();
     }
 
-    private Mono<Void> updateRecentOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) {
-        if (flags.contains(Flags.Flag.RECENT)) {
-            return mailboxRecentDAO.removeFromRecent(mailboxId, uid);
-        }
+    private Mono<Void> updateRecentOnDeleteWithMetadata(CassandraId mailboxId, Collection<MessageMetaData> metaDatas) {
+        return mailboxRecentDAO.removeFromRecent(mailboxId, metaDatas.stream().filter(metaData -> metaData.getFlags().contains(Flags.Flag.RECENT))
+            .map(MessageMetaData::getUid)
+            .collect(Collectors.toList()));
+    }
 
-        return Mono.empty();
+    private Mono<Void> updateRecentOnDeleteWithComposeId(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> composedMessageIdWithMetaDatas) {
+        return mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaDatas.stream().filter(composedId -> composedId.getFlags().contains(Flags.Flag.RECENT))
+            .map(composedId -> composedId.getComposedMessageId().getUid())
+            .collect(Collectors.toList()));
     }
 
     private Mono<Void> updateDeletedMessageProjectionOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) {
@@ -152,8 +154,7 @@ public class CassandraIndexTableHandler {
                 checkDeletedOnAdd(mailboxId, messages),
                 Flux.fromIterable(messages)
                     .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
-                Flux.fromIterable(messages)
-                    .flatMap(message -> addRecentOnSave(mailboxId, message), reactorConcurrency),
+                addRecentOnSave(mailboxId, messages),
                 incrementCountersOnSave(mailboxId, flags),
                 applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
             .then();
@@ -167,7 +168,7 @@ public class CassandraIndexTableHandler {
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
-                manageRecentOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
+                manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
                 updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
                 manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
                 updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
@@ -249,6 +250,12 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
+    private Mono<Void> addRecentOnSave(CassandraId mailboxId, Collection<MailboxMessage> messages) {
+        return mailboxRecentDAO.addToRecent(mailboxId, messages.stream().filter(message -> message.createFlags().contains(Flags.Flag.RECENT))
+            .map(MailboxMessage::getUid)
+            .collect(Collectors.toList()));
+    }
+
     private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId,  List<UpdatedFlags> updatedFlags) {
         int sum = updatedFlags.stream()
             .mapToInt(flags -> {
@@ -272,20 +279,21 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
-    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
-        return Flux.fromIterable(updatedFlags)
-            .flatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
-            .then();
-    }
+    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+        ImmutableList.Builder<MessageUid> addRecentUidsBuilder = ImmutableList.builder();
+        ImmutableList.Builder<MessageUid> removeRecentUidsBuilder = ImmutableList.builder();
+        updatedFlags.forEach(flag -> {
+                if (flag.isModifiedToSet(Flags.Flag.RECENT)) {
+                    addRecentUidsBuilder.add(flag.getUid());
+                } else if (flag.isModifiedToUnset(Flags.Flag.RECENT)) {
+                    removeRecentUidsBuilder.add(flag.getUid());
+                }
+            });
 
-    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        if (updatedFlags.isModifiedToUnset(Flags.Flag.RECENT)) {
-            return mailboxRecentDAO.removeFromRecent(mailboxId, updatedFlags.getUid());
-        }
-        if (updatedFlags.isModifiedToSet(Flags.Flag.RECENT)) {
-            return mailboxRecentDAO.addToRecent(mailboxId, updatedFlags.getUid());
-        }
-        return Mono.empty();
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                mailboxRecentDAO.removeFromRecent(mailboxId, removeRecentUidsBuilder.build()),
+                mailboxRecentDAO.addToRecent(mailboxId, addRecentUidsBuilder.build()))
+            .then();
     }
 
     private Mono<Void> updateFirstUnseenOnAdd(CassandraId mailboxId, Flags flags, MessageUid uid) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index d7d59bf557..1858af3e4d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -24,6 +24,9 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
+import java.util.List;
+import java.util.stream.Stream;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
@@ -31,15 +34,20 @@ import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.table.CassandraMailboxRecentsTable;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.collect.Lists;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraMailboxRecentsDAO {
+    private static final int BATCH_STATEMENT_WINDOW = 1024;
+    private static final int LOW_CONCURRENCY = 2;
+
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement readStatement;
     private final PreparedStatement deleteStatement;
@@ -101,6 +109,27 @@ public class CassandraMailboxRecentsDAO {
             .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, messageUid.asLong()));
     }
 
+    public Mono<Void> removeFromRecent(CassandraId mailboxId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(deleteStatement.bind()
+                .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid())
+                .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(deleteStatement.bind()
+                        .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid())
+                        .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
+
     public Mono<Void> delete(CassandraId mailboxId) {
         return cassandraAsyncExecutor.executeVoid(deleteAllStatement.bind()
             .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid()));
@@ -111,4 +140,25 @@ public class CassandraMailboxRecentsDAO {
             .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid())
             .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, messageUid.asLong()));
     }
+
+    public Mono<Void> addToRecent(CassandraId mailboxId, List<MessageUid> uids) {
+        if (uids.size() == 1) {
+            return cassandraAsyncExecutor.executeVoid(addStatement.bind()
+                .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid())
+                .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, uids.iterator().next().asLong()));
+        } else {
+            Stream<BatchStatement> batches = Lists.partition(uids, BATCH_STATEMENT_WINDOW)
+                .stream()
+                .map(uidBatch -> {
+                    BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
+                    uidBatch.forEach(uid -> batch.add(addStatement.bind()
+                        .setUUID(CassandraMailboxRecentsTable.MAILBOX_ID, mailboxId.asUuid())
+                        .setLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, uid.asLong())));
+                    return batch;
+                });
+            return Flux.fromStream(batches)
+                .flatMap(cassandraAsyncExecutor::executeVoid, LOW_CONCURRENCY)
+                .then();
+        }
+    }
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
index e11bf822a6..b95302224c 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
@@ -22,6 +22,8 @@ package org.apache.james.mailbox.cassandra.mail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
+import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
@@ -80,8 +82,7 @@ class CassandraMailboxRecentDAOTest {
 
     @Test
     void getRecentMessageUidsInMailboxShouldNotReturnDeletedItems() {
-        testee.addToRecent(CASSANDRA_ID, UID1).block();
-        testee.addToRecent(CASSANDRA_ID, UID2).block();
+        testee.addToRecent(CASSANDRA_ID, List.of(UID1, UID2)).block();
 
         testee.delete(CASSANDRA_ID).block();
 
@@ -108,9 +109,7 @@ class CassandraMailboxRecentDAOTest {
 
     @Test
     void addToRecentShouldAddUidWhenNotEmpty() {
-        testee.addToRecent(CASSANDRA_ID, UID1).block();
-
-        testee.addToRecent(CASSANDRA_ID, UID2).block();
+        testee.addToRecent(CASSANDRA_ID, List.of(UID1, UID2)).block();
 
         assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID)
                 .collectList()
@@ -120,8 +119,7 @@ class CassandraMailboxRecentDAOTest {
 
     @Test
     void removeFromRecentShouldOnlyRemoveUidWhenNotEmpty() {
-        testee.addToRecent(CASSANDRA_ID, UID1).block();
-        testee.addToRecent(CASSANDRA_ID, UID2).block();
+        testee.addToRecent(CASSANDRA_ID, List.of(UID1, UID2)).block();
 
         testee.removeFromRecent(CASSANDRA_ID, UID2).block();
 
@@ -146,9 +144,9 @@ class CassandraMailboxRecentDAOTest {
     void getRecentMessageUidsInMailboxShouldNotTimeoutWhenOverPagingLimit() {
         int pageSize = 5000;
         int size = pageSize + 1000;
-        IntStream.range(0, size)
-            .parallel()
-            .forEach(i -> testee.addToRecent(CASSANDRA_ID, MessageUid.of(i + 1)).block());
+
+        testee.addToRecent(CASSANDRA_ID, IntStream.range(0, size)
+            .mapToObj(i -> MessageUid.of(i + 1)).collect(Collectors.toList())).block();
 
         assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID)
                 .collectList()


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org