You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/12/30 03:35:33 UTC

[james-project] 21/29: JAMES-3484 Cassandra mailbox should group copies/moves

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d15aa361e53c65052dff96b76ceb9fe9e3be5222
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 25 13:22:05 2020 +0700

    JAMES-3484 Cassandra mailbox should group copies/moves
    
    On one of my production instances, I notice some copy operations are slow. Moving 60 messages takes around 2 seconds (~33ms per message).
    
    More interestingly, a total of 1042 Cassandra queries is generated! (~17 per messages)
    
    The moves is currently performed on a per message basis, sequencially.
    
    However, by grouping updates together we can:
     - Allocate a single MODSEQ thus saving on ModSeq generation
     - Allocate several UIDs at once by asking for a UID range
     - As we are no longer performing id generation for each message, we can parallelize the message insertion...
     - And the tables indexes (applicable flags, mailbox counters) can be grouped instead of being performed for each messages. Other table indexes updates can be further parallelized yielding further enhancements.
    
    In brief, according to the glowroot capture attached we can expect a 75% performance enhancement by:
     - Cassandra query volume reduction
     - Operation parallelization
    
    We also expect a positive impact on overall Cassandra performances from the above enhancements.
    
    Glowroot capture:
    
    {code:java}
     ASYNC
    Transaction type:
    Web
    Transaction name:
    /jmap
    Start:
    2020-12-22 3:40:57.645 pm (+07:00)
    Duration:
    2,085.1 milliseconds
    Breakdown (Main Thread): 	total (ms) 	count
    http request
    	0.46 	1
    Breakdown (Auxiliary Threads): 	total (ms) 	count
    auxiliary thread
    	2,744.8 	6,959
    jmapMethod
    	1,936.4 	1
    cassandra query
    	50.2 	1,042
    Breakdown (Async Timers): 	total (ms) 	count
    cassandra query
    	3,907.5 	1,042
    JVM Thread Stats (Main Thread)
    CPU time: 0.42 milliseconds
    Blocked time: 0.0 milliseconds
    Waited time: 0.0 milliseconds
    Allocated memory: 18.5 KB
    JVM Thread Stats (Auxiliary Threads)
    CPU time: 489.6 milliseconds
    Blocked time: 0.0 milliseconds
    Waited time: 1,924.0 milliseconds
    Allocated memory: 17.6 MB
    {code}
---
 .../java/org/apache/james/mailbox/MessageUid.java  |  7 +-
 .../cassandra/mail/CassandraIndexTableHandler.java | 59 ++++++++++++++-
 .../cassandra/mail/CassandraMailboxCounterDAO.java |  7 ++
 .../cassandra/mail/CassandraMessageMapper.java     | 84 ++++++++++++++++++++--
 .../cassandra/mail/CassandraUidProvider.java       | 45 ++++++++++--
 .../cassandra/mail/CassandraUidProviderTest.java   | 15 ++++
 .../james/mailbox/store/StoreMessageManager.java   | 33 ++++++---
 .../james/mailbox/store/mail/MessageMapper.java    | 20 +++++-
 8 files changed, 242 insertions(+), 28 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageUid.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageUid.java
index eb91bd8..46223b2 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageUid.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageUid.java
@@ -72,7 +72,12 @@ public class MessageUid implements Comparable<MessageUid> {
     }
 
     public MessageUid next() {
-        return new MessageUid(uid + 1);
+        return next(1);
+    }
+
+    public MessageUid next(int count) {
+        Preconditions.checkArgument(count > 0);
+        return new MessageUid(uid + count);
     }
 
     public boolean isFirst() {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
index 6894e70..c5d1359 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java
@@ -23,6 +23,7 @@ import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.mail.Flags;
@@ -88,6 +89,20 @@ public class CassandraIndexTableHandler {
             .then();
     }
 
+    public Mono<Void> updateIndexOnDeleteComposedId(CassandraId mailboxId, Collection<ComposedMessageIdWithMetaData> metaData) {
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                Flux.fromIterable(metaData)
+                    .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getComposedMessageId().getUid()), DEFAULT_CONCURRENCY),
+                Flux.fromIterable(metaData)
+                    .flatMap(message -> updateRecentOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+                Flux.fromIterable(metaData)
+                    .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getComposedMessageId().getUid(), message.getFlags()), DEFAULT_CONCURRENCY),
+            decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
+                .map(ComposedMessageIdWithMetaData::getFlags)
+                .collect(Guavate.toImmutableList())))
+            .then();
+    }
+
     private Mono<Void> updateRecentOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) {
         if (flags.contains(Flags.Flag.RECENT)) {
             return mailboxRecentDAO.removeFromRecent(mailboxId, uid);
@@ -116,6 +131,27 @@ public class CassandraIndexTableHandler {
             .then();
     }
 
+    public Mono<Void> updateIndexOnAdd(Collection<MailboxMessage> messages, CassandraId mailboxId) {
+        int lowConcurrency = 2;
+        ImmutableSet<String> userFlags = messages.stream()
+            .flatMap(message -> Stream.of(message.createFlags().getUserFlags()))
+            .collect(Guavate.toImmutableSet());
+        List<Flags> flags = messages.stream()
+            .flatMap(message -> Stream.of(message.createFlags()))
+            .collect(Guavate.toImmutableList());
+
+        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE,
+                Flux.fromIterable(messages)
+                    .flatMap(message -> checkDeletedOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+                Flux.fromIterable(messages)
+                    .flatMap(message -> updateFirstUnseenOnAdd(mailboxId, message.createFlags(), message.getUid()), lowConcurrency),
+                Flux.fromIterable(messages)
+                    .flatMap(message -> addRecentOnSave(mailboxId, message), lowConcurrency),
+                incrementCountersOnSave(mailboxId, flags),
+                applicableFlagDAO.updateApplicableFlags(mailboxId, userFlags))
+            .then();
+    }
+
     public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) {
         return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags));
     }
@@ -161,14 +197,19 @@ public class CassandraIndexTableHandler {
     }
 
     private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) {
-        long unseenCount = metaData.stream()
+        return decrementCountersOnDeleteFlags(mailboxId, metaData.stream()
             .map(MessageMetaData::getFlags)
-            .filter(flags -> !flags.contains(Flags.Flag.SEEN))
+            .collect(Guavate.toImmutableList()));
+    }
+
+    private Mono<Void> decrementCountersOnDeleteFlags(CassandraId mailboxId, Collection<Flags> flags) {
+        long unseenCount = flags.stream()
+            .filter(flag -> !flag.contains(Flags.Flag.SEEN))
             .count();
 
         return mailboxCounterDAO.remove(MailboxCounters.builder()
             .mailboxId(mailboxId)
-            .count(metaData.size())
+            .count(flags.size())
             .unseen(unseenCount)
             .build());
     }
@@ -180,6 +221,18 @@ public class CassandraIndexTableHandler {
         return mailboxCounterDAO.incrementUnseenAndCount(mailboxId);
     }
 
+    private Mono<Void> incrementCountersOnSave(CassandraId mailboxId, Collection<Flags> flags) {
+        long unseenCount = flags.stream()
+            .filter(flag -> !flag.contains(Flags.Flag.SEEN))
+            .count();
+
+        return mailboxCounterDAO.add(MailboxCounters.builder()
+            .mailboxId(mailboxId)
+            .count(flags.size())
+            .unseen(unseenCount)
+            .build());
+    }
+
     private Mono<Void> addRecentOnSave(CassandraId mailboxId, MailboxMessage message) {
         if (message.createFlags().contains(Flags.Flag.RECENT)) {
             return mailboxRecentDAO.addToRecent(mailboxId, message.getUid());
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
index 460decf..8c04502 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java
@@ -194,6 +194,13 @@ public class CassandraMailboxCounterDAO {
         return cassandraAsyncExecutor.executeVoid(bindWithMailbox(mailboxId, incrementUnseenAndCountStatement));
     }
 
+    public Mono<Void> incrementUnseenAndCount(CassandraId mailboxId, long count, long unseen) {
+        return cassandraAsyncExecutor.executeVoid(
+            bindWithMailbox(mailboxId, addToCounters)
+                .setLong(COUNT, count)
+                .setLong(UNSEEN, unseen));
+    }
+
     private BoundStatement bindWithMailbox(CassandraId mailboxId, PreparedStatement statement) {
         return statement.bind()
             .setUUID(MAILBOX_ID, mailboxId.asUuid());
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 5ced7bf..88c23e6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -24,6 +24,7 @@ import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.security.SecureRandom;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -65,6 +66,7 @@ import org.slf4j.LoggerFactory;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Streams;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -197,6 +199,18 @@ public class CassandraMessageMapper implements MessageMapper {
              .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId));
     }
 
+    private Mono<Void> deleteAndHandleIndexUpdates(Collection<ComposedMessageIdWithMetaData> composedMessageIdWithMetaData) {
+        if (composedMessageIdWithMetaData.isEmpty()) {
+            return Mono.empty();
+        }
+        ComposedMessageId composedMessageId = composedMessageIdWithMetaData.iterator().next().getComposedMessageId();
+        CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId();
+
+        return Flux.fromIterable(composedMessageIdWithMetaData)
+             .concatMap(this::delete)
+             .then(indexTableHandler.updateIndexOnDeleteComposedId(mailboxId, composedMessageIdWithMetaData));
+    }
+
     private Mono<Void> delete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
         ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
         CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId();
@@ -295,6 +309,18 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     @Override
+    public List<MessageMetaData> move(Mailbox mailbox, List<MailboxMessage> original) throws MailboxException {
+        List<ComposedMessageIdWithMetaData> beforeCopy = original.stream()
+            .map(MailboxMessage::getComposedMessageIdWithMetaData)
+            .collect(Guavate.toImmutableList());
+
+        List<MessageMetaData> messageMetaData = copy(mailbox, original);
+        deleteAndHandleIndexUpdates(beforeCopy).block();
+
+        return messageMetaData;
+    }
+
+    @Override
     public ModSeq getHighestModSeq(Mailbox mailbox) throws MailboxException {
         return modSeqProvider.highestModSeq(mailbox);
     }
@@ -312,7 +338,7 @@ public class CassandraMessageMapper implements MessageMapper {
 
     private Mono<MailboxMessage> addUidAndModseq(MailboxMessage message, CassandraId mailboxId) {
         Mono<MessageUid> messageUidMono = uidProvider
-            .nextUid(mailboxId)
+            .nextUids(mailboxId)
             .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
 
         Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
@@ -419,6 +445,15 @@ public class CassandraMessageMapper implements MessageMapper {
         return setInMailbox(mailbox, original);
     }
 
+    public List<MessageMetaData> copy(Mailbox mailbox, List<MailboxMessage> originals) throws MailboxException {
+        return setInMailbox(mailbox, originals.stream()
+            .map(original -> {
+                original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build());
+                return original;
+            })
+            .collect(Guavate.toImmutableList()));
+    }
+
     @Override
     public Optional<MessageUid> getLastUid(Mailbox mailbox) throws MailboxException {
         return uidProvider.lastUid(mailbox);
@@ -441,6 +476,29 @@ public class CassandraMessageMapper implements MessageMapper {
             .map(MailboxMessage::metaData));
     }
 
+    private List<MessageMetaData> setInMailbox(Mailbox mailbox, List<MailboxMessage> messages) throws MailboxException {
+        CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
+
+        Mono<List<MessageUid>> uids = uidProvider.nextUids(mailboxId, messages.size());
+        Mono<ModSeq> nextModSeq = modSeqProvider.nextModSeq(mailboxId);
+
+        Mono<List<MailboxMessage>> messagesWithUidAndModSeq = nextModSeq.flatMap(modSeq -> uids.map(uidList -> Pair.of(uidList, modSeq)))
+            .map(pair -> pair.getKey().stream()
+                .map(uid -> Pair.of(uid, pair.getRight())))
+            .map(uidsAndModSeq -> Streams.zip(uidsAndModSeq, messages.stream(),
+                (uidAndModseq, aMessage) -> {
+                    aMessage.setUid(uidAndModseq.getKey());
+                    aMessage.setModSeq((uidAndModseq.getValue()));
+                    return aMessage;
+                }).collect(Guavate.toImmutableList()));
+
+        return block(messagesWithUidAndModSeq
+            .flatMap(list -> insertIds(list, mailboxId).thenReturn(list))
+            .map(list -> list.stream()
+                .map(MailboxMessage::metaData)
+                .collect(Guavate.toImmutableList())));
+    }
+
     private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return messageDAOV3.save(message)
@@ -448,11 +506,7 @@ public class CassandraMessageMapper implements MessageMapper {
     }
 
     private Mono<Void> insertIds(MailboxMessage message, CassandraId mailboxId) {
-        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder()
-                .composedMessageId(new ComposedMessageId(mailboxId, message.getMessageId(), message.getUid()))
-                .flags(message.createFlags())
-                .modSeq(message.getModSeq())
-                .build();
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = computeId(message, mailboxId);
         return imapUidDAO.insert(composedMessageIdWithMetaData)
             .then(Flux.merge(
                 messageIdDAO.insert(composedMessageIdWithMetaData)
@@ -461,6 +515,24 @@ public class CassandraMessageMapper implements MessageMapper {
             .then());
     }
 
+    private ComposedMessageIdWithMetaData computeId(MailboxMessage message, CassandraId mailboxId) {
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, message.getMessageId(), message.getUid()))
+                .flags(message.createFlags())
+                .modSeq(message.getModSeq())
+                .build();
+        return composedMessageIdWithMetaData;
+    }
+
+    private Mono<Void> insertIds(Collection<MailboxMessage> messages, CassandraId mailboxId) {
+        int lowConcurrency = 4;
+        return Flux.fromIterable(messages)
+            .map(message -> computeId(message, mailboxId))
+            .concatMap(id -> imapUidDAO.insert(id).thenReturn(id))
+            .flatMap(id -> messageIdDAO.insert(id)
+                .retryWhen(Retry.backoff(MAX_RETRY, MIN_RETRY_BACKOFF).maxBackoff(MAX_RETRY_BACKOFF)), lowConcurrency)
+            .then(indexTableHandler.updateIndexOnAdd(messages, mailboxId));
+    }
 
     private Mono<FlagsUpdateStageResult> tryFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, ModSeq newModSeq, ComposedMessageIdWithMetaData oldMetaData) {
         Flags oldFlags = oldMetaData.getFlags();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
index da048f0..afc7111 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
@@ -31,7 +31,9 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.time.Duration;
+import java.util.List;
 import java.util.Optional;
+import java.util.stream.LongStream;
 
 import javax.inject.Inject;
 
@@ -48,6 +50,7 @@ import org.apache.james.mailbox.store.mail.UidProvider;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
+import com.github.steveash.guavate.Guavate;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -89,7 +92,7 @@ public class CassandraUidProvider implements UidProvider {
 
     private PreparedStatement prepareInsert(Session session) {
         return session.prepare(insertInto(TABLE_NAME)
-            .value(NEXT_UID, MessageUid.MIN_VALUE.asLong())
+            .value(NEXT_UID, bindMarker(NEXT_UID))
             .value(MAILBOX_ID, bindMarker(MAILBOX_ID))
             .ifNotExists());
     }
@@ -102,12 +105,12 @@ public class CassandraUidProvider implements UidProvider {
     @Override
     public MessageUid nextUid(MailboxId mailboxId) throws MailboxException {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return nextUid(cassandraId)
+        return nextUids(cassandraId)
             .blockOptional()
             .orElseThrow(() -> new MailboxException("Error during Uid update"));
     }
 
-    public Mono<MessageUid> nextUid(CassandraId cassandraId) {
+    public Mono<MessageUid> nextUids(CassandraId cassandraId) {
         Mono<MessageUid> updateUid = findHighestUid(cassandraId)
             .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid));
 
@@ -119,6 +122,26 @@ public class CassandraUidProvider implements UidProvider {
             .retryWhen(Retry.backoff(maxUidRetries, firstBackoff).scheduler(Schedulers.elastic()));
     }
 
+    public Mono<List<MessageUid>> nextUids(CassandraId cassandraId, int count) {
+        Mono<List<MessageUid>> updateUid = findHighestUid(cassandraId)
+            .flatMap(messageUid -> tryUpdateUid(cassandraId, messageUid, count)
+                .map(highest -> range(messageUid, highest)));
+
+        Duration firstBackoff = Duration.ofMillis(10);
+        return updateUid
+            .switchIfEmpty(tryInsert(cassandraId, count)
+                .map(highest -> range(MessageUid.MIN_VALUE, highest)))
+            .switchIfEmpty(updateUid)
+            .single()
+            .retryWhen(Retry.backoff(maxUidRetries, firstBackoff).scheduler(Schedulers.elastic()));
+    }
+
+    private List<MessageUid> range(MessageUid lowerExclusive, MessageUid higherInclusive) {
+        return LongStream.range(lowerExclusive.asLong() + 1, higherInclusive.asLong() + 1)
+            .mapToObj(MessageUid::of)
+            .collect(Guavate.toImmutableList());
+    }
+
     @Override
     public Optional<MessageUid> lastUid(Mailbox mailbox) {
         return findHighestUid((CassandraId) mailbox.getMailboxId())
@@ -134,7 +157,11 @@ public class CassandraUidProvider implements UidProvider {
     }
 
     private Mono<MessageUid> tryUpdateUid(CassandraId mailboxId, MessageUid uid) {
-        MessageUid nextUid = uid.next();
+        return tryUpdateUid(mailboxId, uid, 1);
+    }
+
+    private Mono<MessageUid> tryUpdateUid(CassandraId mailboxId, MessageUid uid, int count) {
+        MessageUid nextUid = uid.next(count);
         return executor.executeReturnApplied(
                 updateStatement.bind()
                         .setUUID(MAILBOX_ID, mailboxId.asUuid())
@@ -147,11 +174,21 @@ public class CassandraUidProvider implements UidProvider {
     private Mono<MessageUid> tryInsert(CassandraId mailboxId) {
         return executor.executeReturnApplied(
             insertStatement.bind()
+                .setLong(NEXT_UID, MessageUid.MIN_VALUE.asLong())
                 .setUUID(MAILBOX_ID, mailboxId.asUuid()))
             .map(success -> successToUid(MessageUid.MIN_VALUE, success))
             .handle(publishIfPresent());
     }
 
+    private Mono<MessageUid> tryInsert(CassandraId mailboxId, int count) {
+        return executor.executeReturnApplied(
+            insertStatement.bind()
+                .setLong(NEXT_UID, MessageUid.MIN_VALUE.next(count).asLong())
+                .setUUID(MAILBOX_ID, mailboxId.asUuid()))
+            .map(success -> successToUid(MessageUid.MIN_VALUE.next(count), success))
+            .handle(publishIfPresent());
+    }
+
     private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
         if (success) {
             return Optional.of(uid);
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
index 4901b19..f2a7a87 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProviderTest.java
@@ -99,4 +99,19 @@ class CassandraUidProviderTest {
 
         assertThat(messageUids).hasSize(nbEntries);
     }
+
+    @Test
+    void nextUidsShouldGenerateUniqueValuesWhenParallelCalls() throws ExecutionException, InterruptedException {
+        int threadCount = 10;
+        int nbOperations = 100;
+
+        ConcurrentSkipListSet<MessageUid> messageUids = new ConcurrentSkipListSet<>();
+        ConcurrentTestRunner.builder()
+                .operation((threadNumber, step) -> messageUids.addAll(uidProvider.nextUids((CassandraId) mailbox.getMailboxId(), 10).block()))
+            .threadCount(threadCount)
+            .operationCount(nbOperations / threadCount)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        assertThat(messageUids).hasSize(nbOperations * 10);
+    }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index e57a6bb..dd2cf83 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -715,31 +715,42 @@ public class StoreMessageManager implements MessageManager {
     }
 
     private Iterator<MessageMetaData> copy(Iterator<MailboxMessage> originalRows, MailboxSession session) throws MailboxException {
+        int batchSize = batchSizes.getCopyBatchSize().orElse(1);
         final List<MessageMetaData> copiedRows = new ArrayList<>();
         final MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
 
+        Iterator<List<MailboxMessage>> groupedOriginalRows = com.google.common.collect.Iterators.partition(originalRows, batchSize);
+
         while (originalRows.hasNext()) {
-            final MailboxMessage originalMessage = originalRows.next();
+            List<MailboxMessage> originalMessages = groupedOriginalRows.next();
+
             new QuotaChecker(quotaManager, quotaRootResolver, mailbox)
-                .tryAddition(1, originalMessage.getFullContentOctets());
-            MessageMetaData data = messageMapper.execute(
-                () -> messageMapper.copy(getMailboxEntity(), originalMessage));
-            copiedRows.add(data);
+                .tryAddition(originalMessages.size(), originalMessages.stream()
+                    .mapToLong(MailboxMessage::getFullContentOctets)
+                    .sum());
+            List<MessageMetaData> data = messageMapper.execute(
+                () -> messageMapper.copy(getMailboxEntity(), originalMessages));
+            copiedRows.addAll(data);
         }
         return copiedRows.iterator();
     }
 
     private MoveResult move(Iterator<MailboxMessage> originalRows, MailboxSession session) throws MailboxException {
+        int batchSize = batchSizes.getMoveBatchSize().orElse(1);
         final List<MessageMetaData> movedRows = new ArrayList<>();
         final List<MessageMetaData> originalRowsCopy = new ArrayList<>();
         final MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
 
-        while (originalRows.hasNext()) {
-            final MailboxMessage originalMessage = originalRows.next();
-            originalRowsCopy.add(originalMessage.metaData());
-            MessageMetaData data = messageMapper.execute(
-                () -> messageMapper.move(getMailboxEntity(), originalMessage));
-            movedRows.add(data);
+        Iterator<List<MailboxMessage>> groupedOriginalRows = com.google.common.collect.Iterators.partition(originalRows, batchSize);
+
+        while (groupedOriginalRows.hasNext()) {
+            List<MailboxMessage> originalMessages = groupedOriginalRows.next();
+            originalRowsCopy.addAll(originalMessages.stream()
+                .map(MailboxMessage::metaData)
+                .collect(Guavate.toImmutableList()));
+            List<MessageMetaData> data = messageMapper.execute(
+                () -> messageMapper.move(getMailboxEntity(), originalMessages));
+            movedRows.addAll(data);
         }
         return new MoveResult(movedRows.iterator(), originalRowsCopy.iterator());
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
index 1d4f6f4..0ea6b0d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
@@ -44,6 +44,8 @@ import org.apache.james.mailbox.store.mail.model.Property;
 import org.apache.james.mailbox.store.transaction.Mapper;
 import org.apache.james.util.streams.Iterators;
 
+import com.github.fge.lambdas.Throwing;
+import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
@@ -167,7 +169,13 @@ public interface MessageMapper extends Mapper {
      * @param mailbox the Mailbox to copy to
      * @param original the original to copy
      */
-    MessageMetaData copy(Mailbox mailbox,MailboxMessage original) throws MailboxException;
+    MessageMetaData copy(Mailbox mailbox, MailboxMessage original) throws MailboxException;
+
+    default List<MessageMetaData> copy(Mailbox mailbox, List<MailboxMessage> original) throws MailboxException {
+        return original.stream()
+            .map(Throwing.<MailboxMessage, MessageMetaData>function(message -> copy(mailbox, message)).sneakyThrow())
+            .collect(Guavate.toImmutableList());
+    }
     
     /**
      * Move the given {@link MailboxMessage} to a new mailbox and return the uid of the moved. Be aware that the given uid is just a suggestion for the uid of the moved
@@ -176,8 +184,14 @@ public interface MessageMapper extends Mapper {
      * @param mailbox the Mailbox to move to
      * @param original the original to move
      */
-    MessageMetaData move(Mailbox mailbox,MailboxMessage original) throws MailboxException;
-    
+    MessageMetaData move(Mailbox mailbox, MailboxMessage original) throws MailboxException;
+
+    default List<MessageMetaData> move(Mailbox mailbox, List<MailboxMessage> original) throws MailboxException {
+        return original.stream()
+            .map(Throwing.<MailboxMessage, MessageMetaData>function(message -> move(mailbox, message)).sneakyThrow())
+            .collect(Guavate.toImmutableList());
+    }
+
     
     /**
      * Return the last uid which were used for storing a MailboxMessage in the {@link Mailbox} or null if no


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org