You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/26 02:39:52 UTC

[james-project] 01/04: JAMES-3765: Improve some IMAP commands (STORE, COPY, MOVE) performance

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b618e81a3d3e0c59f89aed56c3cb065c8685bab9
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Wed May 11 16:57:24 2022 +0700

    JAMES-3765: Improve some IMAP commands (STORE, COPY, MOVE) performance
---
 .../cassandra/mail/CassandraIndexTableHandler.java | 32 +++++++++++-----------
 .../cassandra/mail/CassandraMessageMapper.java     | 21 ++++++++++----
 2 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index a3b5410963..5e2cd75fca 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -129,8 +129,7 @@ public class CassandraIndexTableHandler {
             .then();
     }
 
-    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId) {
-        int lowConcurrency = 2;
+    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId, int reactorConcurrency) {
         ImmutableSet<String> userFlags = messages.stream()
             .flatMap(message -> Stream.of(message.createFlags().getUserFlags()))
             .collect(ImmutableSet.toImmutableSet());
@@ -140,27 +139,28 @@ public class CassandraIndexTableHandler {
 
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 Flux.fromIterable(messages)
-                    .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+                    .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
                 Flux.fromIterable(messages)
-                    .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+                    .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
                 Flux.fromIterable(messages)
-                    .flatMap(message -> addRecentOnSave(mailboxId, message), lowConcurrency),
+                    .flatMap(message -> addRecentOnSave(mailboxId, message), reactorConcurrency),
                 incrementCountersOnSave(mailboxId, flags),
                 applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
             .then();
     }
 
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
-        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
+        int fairConcurrency = 4;
+        return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags), fairConcurrency);
     }
 
-    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
                 manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
-                manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
-                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
+                manageRecentOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
+                updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
                 manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
-                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
+                updateDeletedOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency))
             .then();
     }
 
@@ -171,9 +171,9 @@ public class CassandraIndexTableHandler {
                 .collect(ImmutableSet.toImmutableSet()));
     }
 
-    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.fromIterable(updatedFlags)
-            .concatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags))
+            .flatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
             .then();
     }
 
@@ -261,9 +261,9 @@ public class CassandraIndexTableHandler {
         return Mono.empty();
     }
 
-    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.fromIterable(updatedFlags)
-            .concatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags))
+            .flatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
             .then();
     }
 
@@ -299,9 +299,9 @@ public class CassandraIndexTableHandler {
         return firstUnseenDAO.removeUnread(mailboxId, uid);
     }
 
-    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+    private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
         return Flux.fromIterable(updatedFlags)
-            .concatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags))
+            .flatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
             .then();
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 9bb21f4a9e..06fcdf1994 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -107,6 +107,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private final CassandraConfiguration cassandraConfiguration;
     private final RecomputeMailboxCountersService recomputeMailboxCountersService;
     private final SecureRandom secureRandom;
+    private final int reactorConcurrency;
 
     public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider modSeqProvider,
                                   CassandraAttachmentMapper attachmentMapper,
@@ -133,6 +134,7 @@ public class CassandraMessageMapper implements MessageMapper {
         this.cassandraConfiguration = cassandraConfiguration;
         this.recomputeMailboxCountersService = recomputeMailboxCountersService;
         this.secureRandom = new SecureRandom();
+        this.reactorConcurrency = evaluateReactorConcurrency();
     }
 
     @Override
@@ -218,7 +220,7 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
 
         return Flux.fromIterable(composedMessageIdWithMetaData)
-             .concatMap(this::delete)
+             .flatMap(this::delete, reactorConcurrency)
              .then(indexTableHandler.updateIndexOnDeleteComposedId(mailboxId, composedMessageIdWithMetaData));
     }
 
@@ -508,7 +510,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
         return computeNewModSeq(mailboxId)
             .flatMapMany(newModSeq -> toBeUpdated
-            .concatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata)))
+            .flatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata), reactorConcurrency))
             .reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
             .flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
     }
@@ -519,7 +521,7 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
-        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
+        return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded(), reactorConcurrency)
             .onErrorResume(e -> {
                 LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
                 return Mono.empty();
@@ -637,13 +639,12 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<Void> insertIds(Collection<MailboxMessage> messages, CassandraId mailboxId) {
-        int lowConcurrency = 4;
         return Flux.fromIterable(messages)
             .map(message -> computeId(message, mailboxId))
             .concatMap(id -> imapUidDAO.insert(id).thenReturn(id))
             .flatMap(id -> messageIdDAO.insert(id)
-                .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), lowConcurrency)
-            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId));
+                .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), reactorConcurrency)
+            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId, reactorConcurrency));
     }
 
     private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
@@ -707,4 +708,12 @@ public class CassandraMessageMapper implements MessageMapper {
                 }
             });
     }
+
+    private int evaluateReactorConcurrency() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            // Prevent parallel execution to prevent CAS contention because of LightWeight transactions
+            return 1;
+        }
+        return 4;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org