You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/26 02:39:52 UTC
[james-project] 01/04: JAMES-3765: Improve some IMAP commands (STORE, COPY, MOVE) performance
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b618e81a3d3e0c59f89aed56c3cb065c8685bab9
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Wed May 11 16:57:24 2022 +0700
JAMES-3765: Improve some IMAP commands (STORE, COPY, MOVE) performance
---
.../cassandra/mail/CassandraIndexTableHandler.java | 32 +++++++++++-----------
.../cassandra/mail/CassandraMessageMapper.java | 21 ++++++++++----
2 files changed, 31 insertions(+), 22 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index a3b5410963..5e2cd75fca 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -129,8 +129,7 @@ public class CassandraIndexTableHandler {
.then();
}
- public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId) {
- int lowConcurrency = 2;
+ public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId, int reactorConcurrency) {
ImmutableSet<String> userFlags = messages.stream()
.flatMap(message -> Stream.of(message.createFlags().getUserFlags()))
.collect(ImmutableSet.toImmutableSet());
@@ -140,27 +139,28 @@ public class CassandraIndexTableHandler {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
Flux.fromIterable(messages)
- .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+ .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
Flux.fromIterable(messages)
- .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+ .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), reactorConcurrency),
Flux.fromIterable(messages)
- .flatMap(message -> addRecentOnSave(mailboxId, message), lowConcurrency),
+ .flatMap(message -> addRecentOnSave(mailboxId, message), reactorConcurrency),
incrementCountersOnSave(mailboxId, flags),
applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
.then();
}
public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
- return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
+ int fairConcurrency = 4;
+ return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags), fairConcurrency);
}
- public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags),
- manageRecentOnFlagsUpdate(mailboxId, updatedFlags),
- updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags),
+ manageRecentOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
+ updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency),
manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags),
- updateDeletedOnFlagsUpdate(mailboxId, updatedFlags))
+ updateDeletedOnFlagsUpdate(mailboxId, updatedFlags, reactorConcurrency))
.then();
}
@@ -171,9 +171,9 @@ public class CassandraIndexTableHandler {
.collect(ImmutableSet.toImmutableSet()));
}
- private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
return Flux.fromIterable(updatedFlags)
- .concatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags))
+ .flatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
.then();
}
@@ -261,9 +261,9 @@ public class CassandraIndexTableHandler {
return Mono.empty();
}
- private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
return Flux.fromIterable(updatedFlags)
- .concatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags))
+ .flatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
.then();
}
@@ -299,9 +299,9 @@ public class CassandraIndexTableHandler {
return firstUnseenDAO.removeUnread(mailboxId, uid);
}
- private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) {
+ private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags, int reactorConcurrency) {
return Flux.fromIterable(updatedFlags)
- .concatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags))
+ .flatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags), reactorConcurrency)
.then();
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 9bb21f4a9e..06fcdf1994 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -107,6 +107,7 @@ public class CassandraMessageMapper implements MessageMapper {
private final CassandraConfiguration cassandraConfiguration;
private final RecomputeMailboxCountersService recomputeMailboxCountersService;
private final SecureRandom secureRandom;
+ private final int reactorConcurrency;
public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider modSeqProvider,
CassandraAttachmentMapper attachmentMapper,
@@ -133,6 +134,7 @@ public class CassandraMessageMapper implements MessageMapper {
this.cassandraConfiguration = cassandraConfiguration;
this.recomputeMailboxCountersService = recomputeMailboxCountersService;
this.secureRandom = new SecureRandom();
+ this.reactorConcurrency = evaluateReactorConcurrency();
}
@Override
@@ -218,7 +220,7 @@ public class CassandraMessageMapper implements MessageMapper {
CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
return Flux.fromIterable(composedMessageIdWithMetaData)
- .concatMap(this::delete)
+ .flatMap(this::delete, reactorConcurrency)
.then(indexTableHandler.updateIndexOnDeleteComposedId(mailboxId, composedMessageIdWithMetaData));
}
@@ -508,7 +510,7 @@ public class CassandraMessageMapper implements MessageMapper {
private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
return computeNewModSeq(mailboxId)
.flatMapMany(newModSeq -> toBeUpdated
- .concatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata)))
+ .flatMap(metadata -> tryFlagsUpdate(flagsUpdateCalculator, newModSeq, metadata), reactorConcurrency))
.reduce(FlagsUpdateStageResult.none(), FlagsUpdateStageResult::merge)
.flatMap(result -> updateIndexesForUpdatesResult(mailboxId, result));
}
@@ -519,7 +521,7 @@ public class CassandraMessageMapper implements MessageMapper {
}
private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) {
- return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded())
+ return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded(), reactorConcurrency)
.onErrorResume(e -> {
LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e);
return Mono.empty();
@@ -637,13 +639,12 @@ public class CassandraMessageMapper implements MessageMapper {
}
private Mono<Void> insertIds(Collection<MailboxMessage> messages, CassandraId mailboxId) {
- int lowConcurrency = 4;
return Flux.fromIterable(messages)
.map(message -> computeId(message, mailboxId))
.concatMap(id -> imapUidDAO.insert(id).thenReturn(id))
.flatMap(id -> messageIdDAO.insert(id)
- .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), lowConcurrency)
- .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId));
+ .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), reactorConcurrency)
+ .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId, reactorConcurrency));
}
private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
@@ -707,4 +708,12 @@ public class CassandraMessageMapper implements MessageMapper {
}
});
}
+
+ private int evaluateReactorConcurrency() {
+ if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+ // Prevent parallel execution to prevent CAS contention because of LightWeight transactions
+ return 1;
+ }
+ return 4;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org