You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2023/06/13 02:54:50 UTC

[james-project] branch master updated (7485940e24 -> 25db464544)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 7485940e24 JAMES-2148 create domain mappings via the CLI (#1587)
     new fc391061a7 [FIX] JPA blocking calls should be moved to bounded elastic
     new 25db464544 JAMES-3911 JPA: Prevent concurrent operations on the same EntityManager

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 mailbox/jpa/pom.xml                                | 11 ++++++
 .../james/mailbox/jpa/mail/JPAMailboxMapper.java   | 20 +++++++---
 .../james/mailbox/jpa/mail/JPAMessageMapper.java   |  6 +++
 .../mailbox/jpa/openjpa/OpenJPAMessageManager.java | 37 ++++++++++++++++-
 .../james/mailbox/store/StoreMailboxManager.java   |  4 +-
 .../james/mailbox/store/StoreMessageManager.java   | 10 ++---
 .../james/mailbox/store/mail/AnnotationMapper.java | 27 +++++++++----
 .../james/mailbox/store/mail/MessageMapper.java    | 46 +++++++++++++---------
 .../james/mailbox/store/mail/ModSeqProvider.java   |  7 +++-
 .../james/mailbox/store/mail/UidProvider.java      |  7 +++-
 .../mailbox/store/user/SubscriptionMapper.java     | 12 ++++--
 mpt/impl/imap-mailbox/jpa/pom.xml                  |  3 ++
 server/apps/jpa-app/pom.xml                        |  3 ++
 .../jpa-app/sample-configuration/jvm.properties    |  3 +-
 server/apps/jpa-smtp-app/pom.xml                   | 13 ++++++
 .../sample-configuration/jvm.properties            |  3 +-
 server/apps/spring-app/pom.xml                     |  3 +-
 17 files changed, 166 insertions(+), 49 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/02: JAMES-3911 JPA: Prevent concurrent operations on the same EntityManager

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 25db4645440f6e50c4b471a414e11cff6dcf9c1b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 9 18:41:57 2023 +0700

    JAMES-3911 JPA: Prevent concurrent operations on the same EntityManager
---
 .../mailbox/jpa/openjpa/OpenJPAMessageManager.java | 37 +++++++++++++++++++++-
 .../james/mailbox/store/StoreMailboxManager.java   |  2 +-
 .../james/mailbox/store/StoreMessageManager.java   | 10 +++---
 3 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/openjpa/OpenJPAMessageManager.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/openjpa/OpenJPAMessageManager.java
index 567e8d5ac6..7226fb046a 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/openjpa/OpenJPAMessageManager.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/openjpa/OpenJPAMessageManager.java
@@ -20,14 +20,18 @@
 package org.apache.james.mailbox.jpa.openjpa;
 
 import java.time.Clock;
+import java.util.EnumSet;
 
 import javax.mail.Flags;
 
 import org.apache.james.events.EventBus;
 import org.apache.james.mailbox.MailboxPathLocker;
 import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.quota.QuotaManager;
 import org.apache.james.mailbox.quota.QuotaRootResolver;
 import org.apache.james.mailbox.store.BatchSizes;
@@ -37,23 +41,35 @@ import org.apache.james.mailbox.store.PreDeletionHooks;
 import org.apache.james.mailbox.store.StoreMailboxManager;
 import org.apache.james.mailbox.store.StoreMessageManager;
 import org.apache.james.mailbox.store.StoreRightManager;
+import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.ThreadIdGuessingAlgorithm;
 import org.apache.james.mailbox.store.search.MessageSearchIndex;
 
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Mono;
+
 /**
  * OpenJPA implementation of Mailbox
  */
 public class OpenJPAMessageManager extends StoreMessageManager {
+    private final MailboxSessionMapperFactory mapperFactory;
+    private final StoreRightManager storeRightManager;
+    private final Mailbox mailbox;
 
     public OpenJPAMessageManager(MailboxSessionMapperFactory mapperFactory,
                                  MessageSearchIndex index, EventBus eventBus,
                                  MailboxPathLocker locker, Mailbox mailbox,
                                  QuotaManager quotaManager, QuotaRootResolver quotaRootResolver,
                                  MessageId.Factory messageIdFactory, BatchSizes batchSizes,
-                                 StoreRightManager storeRightManager, ThreadIdGuessingAlgorithm threadIdGuessingAlgorithm, Clock clock) {
+                                 StoreRightManager storeRightManager, ThreadIdGuessingAlgorithm threadIdGuessingAlgorithm,
+                                 Clock clock) {
         super(StoreMailboxManager.DEFAULT_NO_MESSAGE_CAPABILITIES, mapperFactory, index, eventBus, locker, mailbox,
             quotaManager, quotaRootResolver, batchSizes, storeRightManager, PreDeletionHooks.NO_PRE_DELETION_HOOK,
             new MessageStorer.WithoutAttachment(mapperFactory, messageIdFactory, new OpenJPAMessageFactory(OpenJPAMessageFactory.AdvancedFeature.None), threadIdGuessingAlgorithm, clock));
+        this.storeRightManager = storeRightManager;
+        this.mapperFactory = mapperFactory;
+        this.mailbox = mailbox;
     }
 
     /**
@@ -65,4 +81,23 @@ public class OpenJPAMessageManager extends StoreMessageManager {
         flags.add(Flags.Flag.USER);
         return flags;
     }
+
+    public Mono<MailboxMetaData> getMetaDataReactive(MailboxMetaData.RecentMode recentMode, MailboxSession mailboxSession, EnumSet<MailboxMetaData.Item> items) throws MailboxException {
+        MailboxACL resolvedAcl = getResolvedAcl(mailboxSession);
+        if (!storeRightManager.hasRight(mailbox, MailboxACL.Right.Read, mailboxSession)) {
+            return Mono.just(MailboxMetaData.sensibleInformationFree(resolvedAcl, getMailboxEntity().getUidValidity(), isWriteable(mailboxSession)));
+        }
+        Flags permanentFlags = getPermanentFlags(mailboxSession);
+        UidValidity uidValidity = getMailboxEntity().getUidValidity();
+        MessageMapper messageMapper = mapperFactory.getMessageMapper(mailboxSession);
+
+        return messageMapper.executeReactive(
+            nextUid(messageMapper, items)
+                .flatMap(nextUid -> highestModSeq(messageMapper, items)
+                    .flatMap(highestModSeq -> firstUnseen(messageMapper, items)
+                        .flatMap(Throwing.function(firstUnseen -> recent(recentMode, mailboxSession)
+                            .flatMap(recents -> mailboxCounters(messageMapper, items)
+                                .map(counters -> new MailboxMetaData(recents, permanentFlags, uidValidity, nextUid, highestModSeq, counters.getCount(),
+                                    counters.getUnseen(), firstUnseen.orElse(null), isWriteable(mailboxSession), resolvedAcl))))))));
+    }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 5c530d907f..e1d272c5f8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -658,7 +658,7 @@ public class StoreMailboxManager implements MailboxManager {
                 return mailboxId;
             })
             .then(Mono.from(locker.executeReactiveWithLockReactive(from, mapper.findMailboxWithPathLike(query)
-                    .flatMap(sub -> {
+                    .concatMap(sub -> {
                         String subOriginalName = sub.getName();
                         String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length());
                         MailboxPath fromPath = new MailboxPath(from, subOriginalName);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 8ef76f2931..a5fe322689 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -594,14 +594,14 @@ public class StoreMessageManager implements MessageManager {
                 t5.getT4().getUnseen(), t5.getT3().orElse(null), isWriteable(mailboxSession), resolvedAcl)));
     }
 
-    private Mono<ModSeq> highestModSeq(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
+    protected Mono<ModSeq> highestModSeq(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
         if (items.contains(MailboxMetaData.Item.HighestModSeq)) {
             return messageMapper.getHighestModSeqReactive(mailbox);
         }
         return Mono.just(ModSeq.first());
     }
 
-    private Mono<MessageUid> nextUid(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
+    protected Mono<MessageUid> nextUid(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
         if (items.contains(MailboxMetaData.Item.NextUid)) {
             return messageMapper.getLastUidReactive(mailbox)
                 .map(optional -> optional
@@ -611,14 +611,14 @@ public class StoreMessageManager implements MessageManager {
         return Mono.just(MessageUid.MIN_VALUE);
     }
 
-    private Mono<Optional<MessageUid>> firstUnseen(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
+    protected Mono<Optional<MessageUid>> firstUnseen(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
         if (items.contains(MailboxMetaData.Item.FirstUnseen)) {
             return messageMapper.findFirstUnseenMessageUidReactive(getMailboxEntity());
         }
         return Mono.just(Optional.empty());
     }
 
-    private Mono<MailboxCounters> mailboxCounters(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
+    protected Mono<MailboxCounters> mailboxCounters(MessageMapper messageMapper, EnumSet<MailboxMetaData.Item> items) {
         if (items.contains(MailboxMetaData.Item.MailboxCounters)) {
             return messageMapper.getMailboxCountersReactive(getMailboxEntity());
         }
@@ -777,7 +777,7 @@ public class StoreMessageManager implements MessageManager {
      * Return a List which holds all uids of recent messages and optional reset
      * the recent flag on the messages for the uids
      */
-    private Mono<List<MessageUid>> recent(RecentMode recentMode, MailboxSession mailboxSession) throws MailboxException {
+    protected Mono<List<MessageUid>> recent(RecentMode recentMode, MailboxSession mailboxSession) throws MailboxException {
         MessageMapper messageMapper = mapperFactory.getMessageMapper(mailboxSession);
 
         switch (recentMode) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/02: [FIX] JPA blocking calls should be moved to bounded elastic

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fc391061a70af444fff6f8cd7954189f7a9740ce
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jun 2 14:27:19 2023 +0700

    [FIX] JPA blocking calls should be moved to bounded elastic
---
 mailbox/jpa/pom.xml                                | 11 ++++++
 .../james/mailbox/jpa/mail/JPAMailboxMapper.java   | 20 +++++++---
 .../james/mailbox/jpa/mail/JPAMessageMapper.java   |  6 +++
 .../james/mailbox/store/StoreMailboxManager.java   |  2 +-
 .../james/mailbox/store/mail/AnnotationMapper.java | 27 +++++++++----
 .../james/mailbox/store/mail/MessageMapper.java    | 46 +++++++++++++---------
 .../james/mailbox/store/mail/ModSeqProvider.java   |  7 +++-
 .../james/mailbox/store/mail/UidProvider.java      |  7 +++-
 .../mailbox/store/user/SubscriptionMapper.java     | 12 ++++--
 mpt/impl/imap-mailbox/jpa/pom.xml                  |  3 ++
 server/apps/jpa-app/pom.xml                        |  3 ++
 .../jpa-app/sample-configuration/jvm.properties    |  3 +-
 server/apps/jpa-smtp-app/pom.xml                   | 13 ++++++
 .../sample-configuration/jvm.properties            |  3 +-
 server/apps/spring-app/pom.xml                     |  3 +-
 15 files changed, 124 insertions(+), 42 deletions(-)

diff --git a/mailbox/jpa/pom.xml b/mailbox/jpa/pom.xml
index 729d94687f..b3020ba999 100644
--- a/mailbox/jpa/pom.xml
+++ b/mailbox/jpa/pom.xml
@@ -133,6 +133,17 @@
     </dependencies>
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <reuseForks>false</reuseForks>
+                    <forkCount>1</forkCount>
+                    <argLine>-Djava.library.path=
+                        -javaagent:"${settings.localRepository}"/org/jacoco/org.jacoco.agent/${jacoco-maven-plugin.version}/org.jacoco.agent-${jacoco-maven-plugin.version}-runtime.jar=destfile=${basedir}/target/jacoco.exec
+                        -Xms512m -Xmx1024m -Dopenjpa.Multithreaded=true</argLine>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>org.apache.openjpa</groupId>
                 <artifactId>openjpa-maven-plugin</artifactId>
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
index f9c9f449a3..f691f5c1c3 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.jpa.mail;
 
+import java.util.NoSuchElementException;
+
 import javax.persistence.EntityExistsException;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.NoResultException;
@@ -48,6 +50,7 @@ import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * Data access management for mailbox.
@@ -92,7 +95,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
                 getEntityManager().persist(persistedMailbox);
 
                 return new Mailbox(mailboxPath, uidValidity, persistedMailbox.getMailboxId());
-            }))
+            }).subscribeOn(Schedulers.boundedElastic()))
             .onErrorMap(PersistenceException.class, e -> new MailboxException("Save of mailbox " + mailboxPath.getName() + " failed", e));
     }
 
@@ -107,7 +110,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
 
                 getEntityManager().persist(persistedMailbox);
                 return (MailboxId) persistedMailbox.getMailboxId();
-            }))
+            }).subscribeOn(Schedulers.boundedElastic()))
             .onErrorMap(PersistenceException.class, e -> new MailboxException("Save of mailbox " + mailbox.getName() + " failed", e));
     }
 
@@ -133,12 +136,15 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
             .getSingleResult()
             .toMailbox())
             .onErrorResume(NoResultException.class, e -> Mono.empty())
-            .onErrorResume(PersistenceException.class, e -> Mono.error(new MailboxException("Exception upon JPA execution", e)));
+            .onErrorResume(NoSuchElementException.class, e -> Mono.empty())
+            .onErrorResume(PersistenceException.class, e -> Mono.error(new MailboxException("Exception upon JPA execution", e)))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     @Override
     public Mono<Mailbox> findMailboxById(MailboxId id) {
         return Mono.fromCallable(() -> loadJpaMailbox(id).toMailbox())
+            .subscribeOn(Schedulers.boundedElastic())
             .onErrorMap(PersistenceException.class, e -> new MailboxException("Search of mailbox " + id.serialize() + " failed", e));
     }
 
@@ -161,6 +167,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
             JPAMailbox jpaMailbox = getEntityManager().find(JPAMailbox.class, mailboxId.getRawId());
             getEntityManager().remove(jpaMailbox);
         })
+        .subscribeOn(Schedulers.boundedElastic())
         .onErrorMap(PersistenceException.class, e -> new MailboxException("Delete of mailbox " + mailbox + " failed", e))
         .then();
     }
@@ -169,6 +176,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
     public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
         String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query);
         return Mono.fromCallable(() -> findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike))
+            .subscribeOn(Schedulers.boundedElastic())
             .flatMapIterable(TypedQuery::getResultList)
             .map(JPAMailbox::toMailbox)
             .filter(query::matches)
@@ -192,6 +200,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
                 .setParameter("namespaceParam", mailbox.getNamespace())
                 .setParameter("userParam", mailbox.getUser().asString())
                 .getSingleResult()))
+            .subscribeOn(Schedulers.boundedElastic())
             .filter(numberOfChildMailboxes -> numberOfChildMailboxes > 0)
             .hasElement();
     }
@@ -199,6 +208,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
     @Override
     public Flux<Mailbox> list() {
         return Mono.fromCallable(() -> getEntityManager().createNamedQuery("listMailboxes", JPAMailbox.class))
+            .subscribeOn(Schedulers.boundedElastic())
             .flatMapIterable(TypedQuery::getResultList)
             .onErrorMap(PersistenceException.class, e -> new MailboxException("Delete of mailboxes failed", e))
             .map(JPAMailbox::toMailbox);
@@ -211,7 +221,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
             MailboxACL newACL = mailbox.getACL().apply(mailboxACLCommand);
             mailbox.setACL(newACL);
             return ACLDiff.computeDiff(oldACL, newACL);
-        });
+        }).subscribeOn(Schedulers.boundedElastic());
     }
 
     @Override
@@ -220,7 +230,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
             MailboxACL oldMailboxAcl = mailbox.getACL();
             mailbox.setACL(mailboxACL);
             return ACLDiff.computeDiff(oldMailboxAcl, mailboxACL);
-        });
+        }).subscribeOn(Schedulers.boundedElastic());
     }
 
     @Override
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
index da2b5fd2a4..e30765683b 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java
@@ -102,6 +102,12 @@ public class JPAMessageMapper extends JPATransactionalMapper implements MessageM
             .subscribeOn(Schedulers.boundedElastic());
     }
 
+    @Override
+    public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int limitAsInt) {
+        return Flux.defer(Throwing.supplier(() -> Flux.fromIterable(findAsList(mailbox.getMailboxId(), messageRange, limitAsInt))).sneakyThrow())
+            .subscribeOn(Schedulers.boundedElastic());
+    }
+
     @Override
     public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange set, FetchType fType, int max)
             throws MailboxException {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index d69ff57375..5c530d907f 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -578,7 +578,7 @@ public class StoreMailboxManager implements MailboxManager {
             return subscriptionMapper.findSubscriptionsForUserReactive(fromSession.getUser())
                 .collectList()
                 .flatMap(subscriptions -> Flux.fromIterable(renamedResults)
-                    .flatMap(renamedResult -> {
+                    .concatMap(renamedResult -> {
                         Function<Subscription, Mono<Void>> renameFunction = subscription -> subscriptionMapper.deleteReactive(subscription)
                             .then(subscriptionMapper.saveReactive(new Subscription(toSession.getUser(), renamedResult.getDestinationPath().asEscapedString())));
                         Subscription legacySubscription = new Subscription(fromSession.getUser(), renamedResult.getOriginPath().getName());
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
index da3146fcc9..53d84bc651 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
@@ -30,6 +30,7 @@ import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface AnnotationMapper extends Mapper {
     /**
@@ -42,7 +43,8 @@ public interface AnnotationMapper extends Mapper {
     List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId);
 
     default Publisher<MailboxAnnotation> getAllAnnotationsReactive(MailboxId mailboxId) {
-        return Flux.fromIterable(getAllAnnotations(mailboxId));
+        return Flux.fromIterable(getAllAnnotations(mailboxId))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -56,7 +58,8 @@ public interface AnnotationMapper extends Mapper {
     List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, Set<MailboxAnnotationKey> keys);
 
     default Publisher<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
-        return Flux.fromIterable(getAnnotationsByKeys(mailboxId, keys));
+        return Flux.fromIterable(getAnnotationsByKeys(mailboxId, keys))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -70,7 +73,8 @@ public interface AnnotationMapper extends Mapper {
     List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys);
 
     default Publisher<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
-        return Flux.fromIterable(getAnnotationsByKeysWithOneDepth(mailboxId, keys));
+        return Flux.fromIterable(getAnnotationsByKeysWithOneDepth(mailboxId, keys))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -84,7 +88,8 @@ public interface AnnotationMapper extends Mapper {
     List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys);
 
     default Publisher<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
-        return Flux.fromIterable(getAnnotationsByKeysWithAllDepth(mailboxId, keys));
+        return Flux.fromIterable(getAnnotationsByKeysWithAllDepth(mailboxId, keys))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -96,7 +101,9 @@ public interface AnnotationMapper extends Mapper {
     void deleteAnnotation(MailboxId mailboxId, MailboxAnnotationKey key);
 
     default Publisher<Void> deleteAnnotationReactive(MailboxId mailboxId, MailboxAnnotationKey key) {
-        return Mono.fromRunnable(() -> deleteAnnotation(mailboxId, key));
+        return Mono.fromRunnable(() -> deleteAnnotation(mailboxId, key))
+            .subscribeOn(Schedulers.boundedElastic())
+            .then();
     }
 
     /**
@@ -109,7 +116,9 @@ public interface AnnotationMapper extends Mapper {
     void insertAnnotation(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation);
 
     default Publisher<Void> insertAnnotationReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
-        return Mono.fromRunnable(() -> insertAnnotation(mailboxId, mailboxAnnotation));
+        return Mono.fromRunnable(() -> insertAnnotation(mailboxId, mailboxAnnotation))
+            .subscribeOn(Schedulers.boundedElastic())
+            .then();
     }
 
     /**
@@ -122,7 +131,8 @@ public interface AnnotationMapper extends Mapper {
     boolean exist(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation);
 
     default Publisher<Boolean> existReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
-        return Mono.fromCallable(() -> exist(mailboxId, mailboxAnnotation));
+        return Mono.fromCallable(() -> exist(mailboxId, mailboxAnnotation))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -132,6 +142,7 @@ public interface AnnotationMapper extends Mapper {
     int countAnnotations(MailboxId mailboxId);
 
     default Publisher<Integer> countAnnotationsReactive(MailboxId mailboxId) {
-        return Mono.fromCallable(() -> countAnnotations(mailboxId));
+        return Mono.fromCallable(() -> countAnnotations(mailboxId))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
index f37068826f..d87250c23e 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
@@ -51,6 +51,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * Maps {@link MailboxMessage} in a {@link org.apache.james.mailbox.MessageManager}. A {@link MessageMapper} has a lifecycle from the start of a request
@@ -97,11 +98,8 @@ public interface MessageMapper extends Mapper {
     List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, MessageRange messageRange) throws MailboxException;
 
     default Flux<MessageUid> retrieveMessagesMarkedForDeletionReactive(Mailbox mailbox, MessageRange messageRange) {
-        try {
-            return Flux.fromIterable(retrieveMessagesMarkedForDeletion(mailbox, messageRange));
-        } catch (MailboxException e) {
-            return Flux.error(e);
-        }
+        return Flux.defer(Throwing.supplier(() -> Flux.fromIterable(retrieveMessagesMarkedForDeletion(mailbox, messageRange))).sneakyThrow())
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -113,7 +111,8 @@ public interface MessageMapper extends Mapper {
     MailboxCounters getMailboxCounters(Mailbox mailbox) throws MailboxException;
 
     default Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> getMailboxCounters(mailbox));
+        return Mono.fromCallable(() -> getMailboxCounters(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -128,7 +127,8 @@ public interface MessageMapper extends Mapper {
     Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> uids) throws MailboxException;
 
     default Mono<Map<MessageUid, MessageMetaData>> deleteMessagesReactive(Mailbox mailbox, List<MessageUid> uids) {
-        return Mono.fromCallable(() -> deleteMessages(mailbox, uids));
+        return Mono.fromCallable(() -> deleteMessages(mailbox, uids))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -137,7 +137,8 @@ public interface MessageMapper extends Mapper {
     MessageUid findFirstUnseenMessageUid(Mailbox mailbox) throws MailboxException;
 
     default Mono<Optional<MessageUid>> findFirstUnseenMessageUidReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> Optional.ofNullable(findFirstUnseenMessageUid(mailbox)));
+        return Mono.fromCallable(() -> Optional.ofNullable(findFirstUnseenMessageUid(mailbox)))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -147,7 +148,8 @@ public interface MessageMapper extends Mapper {
     List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) throws MailboxException;
 
     default Mono<List<MessageUid>> findRecentMessageUidsInMailboxReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> findRecentMessageUidsInMailbox(mailbox));
+        return Mono.fromCallable(() -> findRecentMessageUidsInMailbox(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
 
@@ -171,7 +173,8 @@ public interface MessageMapper extends Mapper {
             final MessageRange set) throws MailboxException;
 
     default Mono<List<UpdatedFlags>> updateFlagsReactive(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange set) {
-        return Mono.fromCallable(() -> ImmutableList.copyOf(updateFlags(mailbox, flagsUpdateCalculator, set)));
+        return Mono.fromCallable(() -> (List<UpdatedFlags>) ImmutableList.copyOf(updateFlags(mailbox, flagsUpdateCalculator, set)))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     default Optional<UpdatedFlags> updateFlags(Mailbox mailbox, MessageUid uid, FlagsUpdateCalculator flagsUpdateCalculator) throws MailboxException {
@@ -193,7 +196,8 @@ public interface MessageMapper extends Mapper {
     }
 
     default Mono<List<UpdatedFlags>> resetRecentReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> resetRecent(mailbox));
+        return Mono.fromCallable(() -> resetRecent(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     
@@ -213,11 +217,13 @@ public interface MessageMapper extends Mapper {
     }
 
     default Mono<MessageMetaData> copyReactive(Mailbox mailbox, MailboxMessage original) {
-        return Mono.fromCallable(() -> copy(mailbox, original));
+        return Mono.fromCallable(() -> copy(mailbox, original))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     default Mono<List<MessageMetaData>> copyReactive(Mailbox mailbox, List<MailboxMessage> original) {
-        return Mono.fromCallable(() -> copy(mailbox, original));
+        return Mono.fromCallable(() -> copy(mailbox, original))
+            .subscribeOn(Schedulers.boundedElastic());
     }
     
     /**
@@ -236,12 +242,13 @@ public interface MessageMapper extends Mapper {
     }
 
     default Mono<MessageMetaData> moveReactive(Mailbox mailbox, MailboxMessage original) {
-        return Mono.fromCallable(() -> move(mailbox, original));
+        return Mono.fromCallable(() -> move(mailbox, original))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     default Mono<List<MessageMetaData>> moveReactive(Mailbox mailbox, List<MailboxMessage> original) {
         return Flux.fromIterable(original)
-            .flatMap(message -> moveReactive(mailbox, message), ReactorUtils.DEFAULT_CONCURRENCY)
+            .concatMap(message -> moveReactive(mailbox, message), ReactorUtils.DEFAULT_CONCURRENCY)
             .collectList();
     }
 
@@ -252,7 +259,8 @@ public interface MessageMapper extends Mapper {
     Optional<MessageUid> getLastUid(Mailbox mailbox) throws MailboxException;
 
     default Mono<Optional<MessageUid>> getLastUidReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> getLastUid(mailbox));
+        return Mono.fromCallable(() -> getLastUid(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -261,13 +269,15 @@ public interface MessageMapper extends Mapper {
     ModSeq getHighestModSeq(Mailbox mailbox) throws MailboxException;
 
     default Mono<ModSeq> getHighestModSeqReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> getHighestModSeq(mailbox));
+        return Mono.fromCallable(() -> getHighestModSeq(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     Flags getApplicableFlag(Mailbox mailbox) throws MailboxException;
 
     default Mono<Flags> getApplicableFlagReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> getApplicableFlag(mailbox));
+        return Mono.fromCallable(() -> getApplicableFlag(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
index 84b0a0f76d..f1af829383 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/ModSeqProvider.java
@@ -24,6 +24,7 @@ import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * Take care of provide mod-seqences for a given {@link Mailbox}. Be aware that implementations
@@ -55,7 +56,8 @@ public interface ModSeqProvider {
     ModSeq highestModSeq(Mailbox mailbox) throws MailboxException;
 
     default Mono<ModSeq> highestModSeqReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> highestModSeq(mailbox));
+        return Mono.fromCallable(() -> highestModSeq(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -64,6 +66,7 @@ public interface ModSeqProvider {
     ModSeq highestModSeq(MailboxId mailboxId) throws MailboxException;
 
     default Mono<ModSeq> nextModSeqReactive(MailboxId mailboxId) {
-        return Mono.fromCallable(() -> nextModSeq(mailboxId));
+        return Mono.fromCallable(() -> nextModSeq(mailboxId))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
index 18f8a22db0..f4e516772d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/UidProvider.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * Take care of provide uids for a given {@link Mailbox}. Be aware that implementations
@@ -50,13 +51,15 @@ public interface UidProvider {
     Optional<MessageUid> lastUid(Mailbox mailbox) throws MailboxException;
 
     default Mono<Optional<MessageUid>> lastUidReactive(Mailbox mailbox) {
-        return Mono.fromCallable(() -> lastUid(mailbox));
+        return Mono.fromCallable(() -> lastUid(mailbox))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     MessageUid nextUid(MailboxId mailboxId) throws MailboxException;
 
     default Mono<MessageUid> nextUidReactive(MailboxId mailboxId) {
-        return Mono.fromCallable(() -> nextUid(mailboxId));
+        return Mono.fromCallable(() -> nextUid(mailboxId))
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     default Mono<List<MessageUid>> nextUids(MailboxId mailboxId, int count) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/user/SubscriptionMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/user/SubscriptionMapper.java
index 6f0f233d95..78b789cd5c 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/user/SubscriptionMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/user/SubscriptionMapper.java
@@ -30,6 +30,7 @@ import com.google.common.base.Functions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * Mapper for {@link Subscription}
@@ -43,7 +44,9 @@ public interface SubscriptionMapper extends Mapper {
     void save(Subscription subscription) throws SubscriptionException;
 
     default Mono<Void> saveReactive(Subscription subscription) {
-        return Mono.fromRunnable(Throwing.runnable(() -> save(subscription)));
+        return Mono.fromRunnable(Throwing.runnable(() -> save(subscription)))
+            .subscribeOn(Schedulers.boundedElastic())
+            .then();
     }
 
     /**
@@ -55,7 +58,8 @@ public interface SubscriptionMapper extends Mapper {
 
     default Flux<Subscription> findSubscriptionsForUserReactive(Username user) {
         return Mono.fromCallable(() -> findSubscriptionsForUser(user))
-            .flatMapIterable(Functions.identity());
+            .flatMapIterable(Functions.identity())
+            .subscribeOn(Schedulers.boundedElastic());
     }
 
     /**
@@ -65,6 +69,8 @@ public interface SubscriptionMapper extends Mapper {
     void delete(Subscription subscription) throws SubscriptionException;
 
     default Mono<Void> deleteReactive(Subscription subscription) {
-        return Mono.fromRunnable(Throwing.runnable(() -> delete(subscription)));
+        return Mono.fromRunnable(Throwing.runnable(() -> delete(subscription)))
+            .subscribeOn(Schedulers.boundedElastic())
+            .then();
     }
 }
\ No newline at end of file
diff --git a/mpt/impl/imap-mailbox/jpa/pom.xml b/mpt/impl/imap-mailbox/jpa/pom.xml
index 9b952c2f0a..795553c714 100644
--- a/mpt/impl/imap-mailbox/jpa/pom.xml
+++ b/mpt/impl/imap-mailbox/jpa/pom.xml
@@ -104,6 +104,9 @@
                 <configuration>
                     <reuseForks>true</reuseForks>
                     <forkCount>1C</forkCount>
+                    <argLine>-Djava.library.path=
+                        -javaagent:"${settings.localRepository}"/org/jacoco/org.jacoco.agent/${jacoco-maven-plugin.version}/org.jacoco.agent-${jacoco-maven-plugin.version}-runtime.jar=destfile=${basedir}/target/jacoco.exec
+                        -Xms512m -Xmx1024m -Dopenjpa.Multithreaded=true</argLine>
                 </configuration>
             </plugin>
         </plugins>
diff --git a/server/apps/jpa-app/pom.xml b/server/apps/jpa-app/pom.xml
index 70ec087051..d45acccc49 100644
--- a/server/apps/jpa-app/pom.xml
+++ b/server/apps/jpa-app/pom.xml
@@ -357,6 +357,9 @@
                     <reuseForks>false</reuseForks>
                     <!-- Speed up a bit things -->
                     <forkCount>1C</forkCount>
+                    <argLine>-Djava.library.path=
+                        -javaagent:"${settings.localRepository}"/org/jacoco/org.jacoco.agent/${jacoco-maven-plugin.version}/org.jacoco.agent-${jacoco-maven-plugin.version}-runtime.jar=destfile=${basedir}/target/jacoco.exec
+                        -Xms512m -Xmx1024m -Dopenjpa.Multithreaded=true</argLine>
                 </configuration>
             </plugin>
             <plugin>
diff --git a/server/apps/jpa-app/sample-configuration/jvm.properties b/server/apps/jpa-app/sample-configuration/jvm.properties
index 99175bbd68..73b964c9b4 100644
--- a/server/apps/jpa-app/sample-configuration/jvm.properties
+++ b/server/apps/jpa-app/sample-configuration/jvm.properties
@@ -49,4 +49,5 @@ james.jmx.credential.generation=true
 
 # Disable Remote Code Execution feature from JMX
 # CF https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.management/share/classes/com/sun/jmx/remote/security/MBeanServerAccessController.java#L646
-jmx.remote.x.mlet.allow.getMBeansFromURL=false
\ No newline at end of file
+jmx.remote.x.mlet.allow.getMBeansFromURL=false
+openjpa.Multithreaded=true
\ No newline at end of file
diff --git a/server/apps/jpa-smtp-app/pom.xml b/server/apps/jpa-smtp-app/pom.xml
index 2e7dee343c..7e395b905f 100644
--- a/server/apps/jpa-smtp-app/pom.xml
+++ b/server/apps/jpa-smtp-app/pom.xml
@@ -234,6 +234,19 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <!-- Isolation issues with Derby inMemory database -->
+                    <reuseForks>false</reuseForks>
+                    <!-- Speed up a bit things -->
+                    <forkCount>1C</forkCount>
+                    <argLine>-Djava.library.path=
+                        -javaagent:"${settings.localRepository}"/org/jacoco/org.jacoco.agent/${jacoco-maven-plugin.version}/org.jacoco.agent-${jacoco-maven-plugin.version}-runtime.jar=destfile=${basedir}/target/jacoco.exec
+                        -Xms512m -Xmx1024m -Dopenjpa.Multithreaded=true</argLine>
+                </configuration>
+            </plugin>
             <plugin>
                 <groupId>com.google.cloud.tools</groupId>
                 <artifactId>jib-maven-plugin</artifactId>
diff --git a/server/apps/jpa-smtp-app/sample-configuration/jvm.properties b/server/apps/jpa-smtp-app/sample-configuration/jvm.properties
index 99175bbd68..73b964c9b4 100644
--- a/server/apps/jpa-smtp-app/sample-configuration/jvm.properties
+++ b/server/apps/jpa-smtp-app/sample-configuration/jvm.properties
@@ -49,4 +49,5 @@ james.jmx.credential.generation=true
 
 # Disable Remote Code Execution feature from JMX
 # CF https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.management/share/classes/com/sun/jmx/remote/security/MBeanServerAccessController.java#L646
-jmx.remote.x.mlet.allow.getMBeansFromURL=false
\ No newline at end of file
+jmx.remote.x.mlet.allow.getMBeansFromURL=false
+openjpa.Multithreaded=true
\ No newline at end of file
diff --git a/server/apps/spring-app/pom.xml b/server/apps/spring-app/pom.xml
index 3675babf53..4f684213a4 100644
--- a/server/apps/spring-app/pom.xml
+++ b/server/apps/spring-app/pom.xml
@@ -70,7 +70,8 @@
         <james.system-property1>-Djames.message.usememorycopy=false</james.system-property1>
         <!-- Prevents Logjam (CVE-2015-4000) -->
         <james.system-property2>-Djdk.tls.ephemeralDHKeySize=2048</james.system-property2>
-        <james.system-properties>${james.system-property1} ${james.system-property2}</james.system-properties>
+        <james.system-property3>-openjpa.Multithreaded=true</james.system-property3>
+        <james.system-properties>${james.system-property1} ${james.system-property2} ${james.system-property3}</james.system-properties>
         <!-- this name is used for James's folders on Debian systems and james user -->
         <james.debian.user>apache-james</james.debian.user>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org