You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/21 02:56:37 UTC

[james-project] 01/19: JAMES-3138 Reactify CurrentQuotaManager

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e03d38f6032b2e06a70ac4dbf9b487b36ff99de6
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Thu Apr 16 17:36:45 2020 +0700

    JAMES-3138 Reactify CurrentQuotaManager
---
 .../james/mailbox/quota/CurrentQuotaManager.java   |  6 +-
 .../quota/CassandraCurrentQuotaManager.java        | 36 ++++++------
 .../mailbox/jpa/quota/JpaCurrentQuotaManager.java  | 66 ++++++++++++----------
 .../quota/InMemoryCurrentQuotaManager.java         | 43 +++++++-------
 .../quota/InMemoryCurrentQuotaManagerTest.java     | 10 ++--
 .../store/quota/ListeningCurrentQuotaUpdater.java  | 53 ++++++++---------
 .../store/quota/StoreCurrentQuotaManager.java      |  7 ++-
 .../mailbox/store/quota/StoreQuotaManager.java     |  6 +-
 .../quota/ListeningCurrentQuotaUpdaterTest.java    |  7 ++-
 .../store/quota/StoreCurrentQuotaManagerTest.java  | 24 ++++----
 .../mailbox/store/quota/StoreQuotaManagerTest.java | 16 ++++--
 .../james/webadmin/routes/UserQuotaRoutesTest.java |  4 +-
 12 files changed, 149 insertions(+), 129 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java
index 06cc970..f1f3d41 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java
@@ -21,16 +21,16 @@ package org.apache.james.mailbox.quota;
 
 import org.apache.james.core.quota.QuotaCountUsage;
 import org.apache.james.core.quota.QuotaSizeUsage;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.QuotaRoot;
+import org.reactivestreams.Publisher;
 
 /**
  * This interface allows us to get the current value associated to a quota value
  */
 public interface CurrentQuotaManager {
 
-    QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) throws MailboxException;
+    Publisher<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot);
 
-    QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) throws MailboxException;
+    Publisher<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot);
 
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java
index 0254999..a4ba974 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java
@@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.quota.QuotaCountUsage;
 import org.apache.james.core.quota.QuotaSizeUsage;
 import org.apache.james.mailbox.cassandra.table.CassandraCurrentQuota;
@@ -36,12 +37,13 @@ import org.apache.james.mailbox.model.QuotaRoot;
 import org.apache.james.mailbox.store.quota.StoreCurrentQuotaManager;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager {
 
-    private final Session session;
+    private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement increaseStatement;
     private final PreparedStatement decreaseStatement;
     private final PreparedStatement getCurrentMessageCountStatement;
@@ -49,7 +51,7 @@ public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager {
 
     @Inject
     public CassandraCurrentQuotaManager(Session session) {
-        this.session = session;
+        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.increaseStatement = session.prepare(update(CassandraCurrentQuota.TABLE_NAME)
             .with(incr(CassandraCurrentQuota.MESSAGE_COUNT, bindMarker()))
             .and(incr(CassandraCurrentQuota.STORAGE, bindMarker()))
@@ -67,34 +69,30 @@ public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager {
     }
 
     @Override
-    public void increase(QuotaOperation quotaOperation) {
-        session.execute(increaseStatement.bind(quotaOperation.count().asLong(),
+    public Mono<Void> increase(QuotaOperation quotaOperation) {
+        return cassandraAsyncExecutor.executeVoid(increaseStatement.bind(quotaOperation.count().asLong(),
             quotaOperation.size().asLong(),
             quotaOperation.quotaRoot().getValue()));
     }
 
     @Override
-    public void decrease(QuotaOperation quotaOperation) {
-        session.execute(decreaseStatement.bind(quotaOperation.count().asLong(),
+    public Mono<Void> decrease(QuotaOperation quotaOperation) {
+        return cassandraAsyncExecutor.executeVoid(decreaseStatement.bind(quotaOperation.count().asLong(),
             quotaOperation.size().asLong(),
             quotaOperation.quotaRoot().getValue()));
     }
 
     @Override
-    public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) {
-        ResultSet resultSet = session.execute(getCurrentMessageCountStatement.bind(quotaRoot.getValue()));
-        if (resultSet.isExhausted()) {
-            return QuotaCountUsage.count(0L);
-        }
-        return QuotaCountUsage.count(resultSet.one().getLong(CassandraCurrentQuota.MESSAGE_COUNT));
+    public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) {
+        return cassandraAsyncExecutor.executeSingleRow(getCurrentMessageCountStatement.bind(quotaRoot.getValue()))
+            .map(row -> QuotaCountUsage.count(row.getLong(CassandraCurrentQuota.MESSAGE_COUNT)))
+            .defaultIfEmpty(QuotaCountUsage.count(0L));
     }
 
     @Override
-    public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) {
-        ResultSet resultSet = session.execute(getCurrentStorageStatement.bind(quotaRoot.getValue()));
-        if (resultSet.isExhausted()) {
-            return QuotaSizeUsage.size(0L);
-        }
-        return QuotaSizeUsage.size(resultSet.one().getLong(CassandraCurrentQuota.STORAGE));
+    public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) {
+        return cassandraAsyncExecutor.executeSingleRow(getCurrentStorageStatement.bind(quotaRoot.getValue()))
+            .map(row -> QuotaSizeUsage.size(row.getLong(CassandraCurrentQuota.STORAGE)))
+            .defaultIfEmpty(QuotaSizeUsage.size(0L));
     }
 }
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java
index 48ef98a..f56bab0 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java
@@ -33,6 +33,8 @@ import org.apache.james.mailbox.model.QuotaOperation;
 import org.apache.james.mailbox.model.QuotaRoot;
 import org.apache.james.mailbox.store.quota.StoreCurrentQuotaManager;
 
+import reactor.core.publisher.Mono;
+
 public class JpaCurrentQuotaManager implements StoreCurrentQuotaManager {
 
     public static final long NO_MESSAGES = 0L;
@@ -48,49 +50,53 @@ public class JpaCurrentQuotaManager implements StoreCurrentQuotaManager {
     }
 
     @Override
-    public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) {
+    public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) {
         EntityManager entityManager = entityManagerFactory.createEntityManager();
-        return Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+
+        return Mono.fromCallable(() -> Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
             .map(JpaCurrentQuota::getMessageCount)
-            .orElse(QuotaCountUsage.count(NO_STORED_BYTES));
+            .orElse(QuotaCountUsage.count(NO_STORED_BYTES)));
     }
 
     @Override
-    public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) {
+    public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) {
         EntityManager entityManager = entityManagerFactory.createEntityManager();
-        return Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+
+        return Mono.fromCallable(() -> Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
             .map(JpaCurrentQuota::getSize)
-            .orElse(QuotaSizeUsage.size(NO_STORED_BYTES));
+            .orElse(QuotaSizeUsage.size(NO_STORED_BYTES)));
     }
 
     @Override
-    public void increase(QuotaOperation quotaOperation) {
-        transactionRunner.run(
-            entityManager -> {
-                QuotaRoot quotaRoot = quotaOperation.quotaRoot();
-
-                JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
-                    .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
-
-                entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
-                    jpaCurrentQuota.getMessageCount().asLong() + quotaOperation.count().asLong(),
-                    jpaCurrentQuota.getSize().asLong() + quotaOperation.size().asLong()));
-            });
+    public Mono<Void> increase(QuotaOperation quotaOperation) {
+        return Mono.fromRunnable(() ->
+            transactionRunner.run(
+                entityManager -> {
+                    QuotaRoot quotaRoot = quotaOperation.quotaRoot();
+
+                    JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+                        .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
+
+                    entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
+                        jpaCurrentQuota.getMessageCount().asLong() + quotaOperation.count().asLong(),
+                        jpaCurrentQuota.getSize().asLong() + quotaOperation.size().asLong()));
+                }));
     }
 
     @Override
-    public void decrease(QuotaOperation quotaOperation) {
-        transactionRunner.run(
-            entityManager -> {
-                QuotaRoot quotaRoot = quotaOperation.quotaRoot();
-
-                JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
-                    .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
-
-                entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
-                    jpaCurrentQuota.getMessageCount().asLong() - quotaOperation.count().asLong(),
-                    jpaCurrentQuota.getSize().asLong() - quotaOperation.size().asLong()));
-            });
+    public Mono<Void> decrease(QuotaOperation quotaOperation) {
+        return Mono.fromRunnable(() ->
+            transactionRunner.run(
+                entityManager -> {
+                    QuotaRoot quotaRoot = quotaOperation.quotaRoot();
+
+                    JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+                        .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
+
+                    entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
+                        jpaCurrentQuota.getMessageCount().asLong() - quotaOperation.count().asLong(),
+                        jpaCurrentQuota.getSize().asLong() - quotaOperation.size().asLong()));
+                }));
     }
 
     private JpaCurrentQuota retrieveUserQuota(EntityManager entityManager, QuotaRoot quotaRoot) {
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java
index 6a917e6..fcbe5ff 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.mailbox.inmemory.quota;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.UnaryOperator;
 
@@ -40,6 +39,8 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
+import reactor.core.publisher.Mono;
+
 public class InMemoryCurrentQuotaManager implements StoreCurrentQuotaManager {
 
     private final LoadingCache<QuotaRoot, AtomicReference<CurrentQuotas>> quotaCache;
@@ -55,38 +56,34 @@ public class InMemoryCurrentQuotaManager implements StoreCurrentQuotaManager {
     }
 
     @Override
-    public void increase(QuotaOperation quotaOperation) throws MailboxException {
-        updateQuota(quotaOperation.quotaRoot(), quota -> quota.increase(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
+    public Mono<Void> increase(QuotaOperation quotaOperation) {
+        return updateQuota(quotaOperation.quotaRoot(), quota -> quota.increase(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
     }
 
     @Override
-    public void decrease(QuotaOperation quotaOperation) throws MailboxException {
-        updateQuota(quotaOperation.quotaRoot(), quota -> quota.decrease(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
+    public Mono<Void> decrease(QuotaOperation quotaOperation) {
+        return updateQuota(quotaOperation.quotaRoot(), quota -> quota.decrease(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
     }
 
     @Override
-    public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) throws MailboxException {
-        try {
-            return quotaCache.get(quotaRoot).get().count();
-        } catch (ExecutionException e) {
-            throw new MailboxException("Exception caught", e);
-        }
+    public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) {
+        return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().count())
+            .onErrorMap(this::wrapAsMailboxException);
     }
 
     @Override
-    public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) throws MailboxException {
-        try {
-            return quotaCache.get(quotaRoot).get().size();
-        } catch (ExecutionException e) {
-            throw new MailboxException("Exception caught", e);
-        }
+    public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) {
+        return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().size())
+            .onErrorMap(this::wrapAsMailboxException);
+    }
+
+    private Mono<Void> updateQuota(QuotaRoot quotaRoot, UnaryOperator<CurrentQuotas> quotaFunction) {
+        return Mono.fromCallable(() -> quotaCache.get(quotaRoot).updateAndGet(quotaFunction))
+            .onErrorMap(this::wrapAsMailboxException)
+            .then();
     }
 
-    private void updateQuota(QuotaRoot quotaRoot, UnaryOperator<CurrentQuotas> quotaFunction) throws MailboxException {
-        try {
-            quotaCache.get(quotaRoot).updateAndGet(quotaFunction);
-        } catch (ExecutionException e) {
-            throw new MailboxException("Exception caught", e);
-        }
+    private Throwable wrapAsMailboxException(Throwable throwable) {
+        return new MailboxException("Exception caught", throwable);
     }
 }
diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java
index 2014706..26e2b1f 100644
--- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java
+++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java
@@ -56,7 +56,7 @@ class InMemoryCurrentQuotaManagerTest {
         when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null))
             .thenReturn(CURRENT_QUOTAS);
 
-        assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(18));
+        assertThat(testee.getCurrentMessageCount(QUOTA_ROOT).block()).isEqualTo(QuotaCountUsage.count(18));
     }
 
     @Test
@@ -64,7 +64,7 @@ class InMemoryCurrentQuotaManagerTest {
         when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null))
             .thenReturn(CURRENT_QUOTAS);
 
-        assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(512));
+        assertThat(testee.getCurrentStorage(QUOTA_ROOT).block()).isEqualTo(QuotaSizeUsage.size(512));
     }
 
     @Test
@@ -73,9 +73,9 @@ class InMemoryCurrentQuotaManagerTest {
             .thenReturn(CURRENT_QUOTAS);
 
         QuotaOperation quotaOperation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100));
-        testee.increase(quotaOperation);
+        testee.increase(quotaOperation).block();
 
-        assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(28));
-        assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(612));
+        assertThat(testee.getCurrentMessageCount(QUOTA_ROOT).block()).isEqualTo(QuotaCountUsage.count(28));
+        assertThat(testee.getCurrentStorage(QUOTA_ROOT).block()).isEqualTo(QuotaSizeUsage.size(612));
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
index bf96d76..c8bc83a 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
@@ -40,6 +40,8 @@ import org.apache.james.mailbox.store.event.EventFactory;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailboxListener, QuotaUpdater {
     public static class ListeningCurrentQuotaUpdaterGroup extends Group {
 
@@ -89,36 +91,34 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
 
     private void handleExpungedEvent(Expunged expunged, QuotaRoot quotaRoot) {
         computeQuotaOperation(expunged, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
-            currentQuotaManager.decrease(quotaOperation);
-
-            eventBus.dispatch(
-                EventFactory.quotaUpdated()
-                    .randomEventId()
-                    .user(expunged.getUsername())
-                    .quotaRoot(quotaRoot)
-                    .quotaCount(quotaManager.getMessageQuota(quotaRoot))
-                    .quotaSize(quotaManager.getStorageQuota(quotaRoot))
-                    .instant(Instant.now())
-                    .build(),
-                NO_REGISTRATION_KEYS)
+            currentQuotaManager.decrease(quotaOperation)
+                .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
+                    EventFactory.quotaUpdated()
+                        .randomEventId()
+                        .user(expunged.getUsername())
+                        .quotaRoot(quotaRoot)
+                        .quotaCount(quotaManager.getMessageQuota(quotaRoot))
+                        .quotaSize(quotaManager.getStorageQuota(quotaRoot))
+                        .instant(Instant.now())
+                        .build(),
+                    NO_REGISTRATION_KEYS)).sneakyThrow()))
                 .block();
         }).sneakyThrow());
     }
 
     private void handleAddedEvent(Added added, QuotaRoot quotaRoot) {
         computeQuotaOperation(added, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
-            currentQuotaManager.increase(quotaOperation);
-
-            eventBus.dispatch(
-                EventFactory.quotaUpdated()
-                    .randomEventId()
-                    .user(added.getUsername())
-                    .quotaRoot(quotaRoot)
-                    .quotaCount(quotaManager.getMessageQuota(quotaRoot))
-                    .quotaSize(quotaManager.getStorageQuota(quotaRoot))
-                    .instant(Instant.now())
-                    .build(),
-                NO_REGISTRATION_KEYS)
+            currentQuotaManager.increase(quotaOperation)
+                .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
+                    EventFactory.quotaUpdated()
+                        .randomEventId()
+                        .user(added.getUsername())
+                        .quotaRoot(quotaRoot)
+                        .quotaCount(quotaManager.getMessageQuota(quotaRoot))
+                        .quotaSize(quotaManager.getStorageQuota(quotaRoot))
+                        .instant(Instant.now())
+                        .build(),
+                    NO_REGISTRATION_KEYS)).sneakyThrow()))
                 .block();
         }).sneakyThrow());
     }
@@ -144,8 +144,9 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
         boolean mailboxContainedMessages = mailboxDeletionEvent.getDeletedMessageCount().asLong() > 0;
         if (mailboxContainedMessages) {
             currentQuotaManager.decrease(new QuotaOperation(mailboxDeletionEvent.getQuotaRoot(),
-                mailboxDeletionEvent.getDeletedMessageCount(),
-                mailboxDeletionEvent.getTotalDeletedSize()));
+                    mailboxDeletionEvent.getDeletedMessageCount(),
+                    mailboxDeletionEvent.getTotalDeletedSize()))
+                .block();
         }
     }
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java
index d785f89..5d6cb08 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java
@@ -19,14 +19,15 @@
 
 package org.apache.james.mailbox.store.quota;
 
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.QuotaOperation;
 import org.apache.james.mailbox.quota.CurrentQuotaManager;
 
+import reactor.core.publisher.Mono;
+
 public interface StoreCurrentQuotaManager extends CurrentQuotaManager {
 
-    void increase(QuotaOperation quotaOperation) throws MailboxException;
+    Mono<Void> increase(QuotaOperation quotaOperation);
 
-    void decrease(QuotaOperation quotaOperation) throws MailboxException;
+    Mono<Void> decrease(QuotaOperation quotaOperation);
 
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java
index 82d227d..a731e91 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java
@@ -35,6 +35,8 @@ import org.apache.james.mailbox.quota.CurrentQuotaManager;
 import org.apache.james.mailbox.quota.MaxQuotaManager;
 import org.apache.james.mailbox.quota.QuotaManager;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Default implementation for the Quota Manager.
  *
@@ -54,7 +56,7 @@ public class StoreQuotaManager implements QuotaManager {
     public Quota<QuotaCountLimit, QuotaCountUsage> getMessageQuota(QuotaRoot quotaRoot) throws MailboxException {
         Map<Scope, QuotaCountLimit> maxMessageDetails = maxQuotaManager.listMaxMessagesDetails(quotaRoot);
         return Quota.<QuotaCountLimit, QuotaCountUsage>builder()
-            .used(currentQuotaManager.getCurrentMessageCount(quotaRoot))
+            .used(Mono.from(currentQuotaManager.getCurrentMessageCount(quotaRoot)).block())
             .computedLimit(maxQuotaManager.getMaxMessage(maxMessageDetails).orElse(QuotaCountLimit.unlimited()))
             .limitsByScope(maxMessageDetails)
             .build();
@@ -65,7 +67,7 @@ public class StoreQuotaManager implements QuotaManager {
     public Quota<QuotaSizeLimit, QuotaSizeUsage> getStorageQuota(QuotaRoot quotaRoot) throws MailboxException {
         Map<Scope, QuotaSizeLimit> maxStorageDetails = maxQuotaManager.listMaxStorageDetails(quotaRoot);
         return Quota.<QuotaSizeLimit, QuotaSizeUsage>builder()
-            .used(currentQuotaManager.getCurrentStorage(quotaRoot))
+            .used(Mono.from(currentQuotaManager.getCurrentStorage(quotaRoot)).block())
             .computedLimit(maxQuotaManager.getMaxStorage(maxStorageDetails).orElse(QuotaSizeLimit.unlimited()))
             .limitsByScope(maxStorageDetails)
             .build();
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java
index 6825773..a43ad52 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java
@@ -96,6 +96,7 @@ class ListeningCurrentQuotaUpdaterTest {
         when(added.getUids()).thenReturn(Lists.newArrayList(MessageUid.of(36), MessageUid.of(38)));
         when(added.getUsername()).thenReturn(USERNAME_BENWA);
         when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT);
+        when(mockedCurrentQuotaManager.increase(QUOTA)).thenAnswer(any -> Mono.empty());
 
         testee.event(added);
 
@@ -111,6 +112,7 @@ class ListeningCurrentQuotaUpdaterTest {
         when(expunged.getMailboxId()).thenReturn(MAILBOX_ID);
         when(expunged.getUsername()).thenReturn(USERNAME_BENWA);
         when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT);
+        when(mockedCurrentQuotaManager.decrease(QUOTA)).thenAnswer(any -> Mono.empty());
 
         testee.event(expunged);
 
@@ -145,6 +147,8 @@ class ListeningCurrentQuotaUpdaterTest {
 
     @Test
     void mailboxDeletionEventShouldDecreaseCurrentQuotaValues() throws Exception {
+        QuotaOperation operation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(5));
+
         MailboxListener.MailboxDeletion deletion = mock(MailboxListener.MailboxDeletion.class);
         when(deletion.getQuotaRoot()).thenReturn(QUOTA_ROOT);
         when(deletion.getDeletedMessageCount()).thenReturn(QuotaCountUsage.count(10));
@@ -152,10 +156,11 @@ class ListeningCurrentQuotaUpdaterTest {
         when(deletion.getMailboxId()).thenReturn(MAILBOX_ID);
         when(deletion.getUsername()).thenReturn(USERNAME_BENWA);
         when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT);
+        when(mockedCurrentQuotaManager.decrease(operation)).thenAnswer(any -> Mono.empty());
 
         testee.event(deletion);
 
-        verify(mockedCurrentQuotaManager).decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(5)));
+        verify(mockedCurrentQuotaManager).decrease(operation);
     }
 
     @Test
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java
index df39dd2..02701fb 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java
@@ -30,6 +30,8 @@ import org.apache.james.mailbox.model.QuotaRoot;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Mono;
+
 public abstract class StoreCurrentQuotaManagerTest {
     private static final QuotaRoot QUOTA_ROOT = QuotaRoot.quotaRoot("benwa", Optional.empty());
     
@@ -44,32 +46,32 @@ public abstract class StoreCurrentQuotaManagerTest {
 
     @Test
     void getCurrentStorageShouldReturnZeroByDefault() throws Exception {
-        assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(0));
+        assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(0));
     }
 
     @Test
     void increaseShouldWork() throws Exception {
-        testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)));
+        testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block();
 
-        assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(10));
-        assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(100));
+        assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(10));
+        assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(100));
     }
 
     @Test
     void decreaseShouldWork() throws Exception {
-        testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(20), QuotaSizeUsage.size(200)));
+        testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(20), QuotaSizeUsage.size(200))).block();
 
-        testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)));
+        testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block();
 
-        assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(10));
-        assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(100));
+        assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(10));
+        assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(100));
     }
 
     @Test
     void decreaseShouldNotFailWhenItLeadsToNegativeValues() throws Exception {
-        testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)));
+        testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block();
 
-        assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(-10));
-        assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(-100));
+        assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(-10));
+        assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(-100));
     }
 }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java
index fbaba67..205f827 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.quota.MaxQuotaManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Mono;
+
 class StoreQuotaManagerTest {
 
     StoreQuotaManager testee;
@@ -57,7 +59,9 @@ class StoreQuotaManagerTest {
     @Test
     void getMessageQuotaShouldWorkWithNumericValues() throws Exception {
         when(mockedMaxQuotaManager.getMaxMessage(any(Map.class))).thenReturn(Optional.of(QuotaCountLimit.count(360L)));
-        when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenReturn(QuotaCountUsage.count(36L));
+        when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenAnswer(any ->
+            Mono.fromCallable(() -> QuotaCountUsage.count(36L)));
+
         assertThat(testee.getMessageQuota(quotaRoot)).isEqualTo(
             Quota.<QuotaCountLimit, QuotaCountUsage>builder().used(QuotaCountUsage.count(36)).computedLimit(QuotaCountLimit.count(360)).build());
     }
@@ -66,7 +70,9 @@ class StoreQuotaManagerTest {
     @Test
     void getStorageQuotaShouldWorkWithNumericValues() throws Exception {
         when(mockedMaxQuotaManager.getMaxStorage(any(Map.class))).thenReturn(Optional.of(QuotaSizeLimit.size(360L)));
-        when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenReturn(QuotaSizeUsage.size(36L));
+        when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenAnswer(any ->
+            Mono.fromCallable(() -> QuotaSizeUsage.size(36L)));
+
         assertThat(testee.getStorageQuota(quotaRoot)).isEqualTo(
             Quota.<QuotaSizeLimit, QuotaSizeUsage>builder().used(QuotaSizeUsage.size(36)).computedLimit(QuotaSizeLimit.size(360)).build());
     }
@@ -75,7 +81,8 @@ class StoreQuotaManagerTest {
     @Test
     void getStorageQuotaShouldCalculateCurrentQuotaWhenUnlimited() throws Exception {
         when(mockedMaxQuotaManager.getMaxStorage(any(Map.class))).thenReturn(Optional.of(QuotaSizeLimit.unlimited()));
-        when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenReturn(QuotaSizeUsage.size(36L));
+        when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenAnswer(any ->
+            Mono.fromCallable(() -> QuotaSizeUsage.size(36L)));
 
         assertThat(testee.getStorageQuota(quotaRoot)).isEqualTo(
             Quota.<QuotaSizeLimit, QuotaSizeUsage>builder().used(QuotaSizeUsage.size(36)).computedLimit(QuotaSizeLimit.unlimited()).build());
@@ -85,7 +92,8 @@ class StoreQuotaManagerTest {
     @Test
     void getMessageQuotaShouldCalculateCurrentQuotaWhenUnlimited() throws Exception {
         when(mockedMaxQuotaManager.getMaxMessage(any(Map.class))).thenReturn(Optional.of(QuotaCountLimit.unlimited()));
-        when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenReturn(QuotaCountUsage.count(36L));
+        when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenAnswer(any ->
+            Mono.fromCallable(() -> QuotaCountUsage.count(36L)));
 
         assertThat(testee.getMessageQuota(quotaRoot)).isEqualTo(
             Quota.<QuotaCountLimit, QuotaCountUsage>builder().used(QuotaCountUsage.count(36)).computedLimit(QuotaCountLimit.unlimited()).build());
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java
index e320c5a..d656dcc 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java
@@ -852,7 +852,7 @@ class UserQuotaRoutesTest {
 
             maxQuotaManager.setMaxStorage(userQuotaRootResolver.forUser(BOB), QuotaSizeLimit.size(80));
             maxQuotaManager.setMaxMessage(userQuotaRootResolver.forUser(BOB), QuotaCountLimit.count(100));
-            currentQuotaManager.increase(quotaIncrease);
+            currentQuotaManager.increase(quotaIncrease).block();
 
             JsonPath jsonPath =
                 when()
@@ -881,7 +881,7 @@ class UserQuotaRoutesTest {
 
             maxQuotaManager.setMaxStorage(userQuotaRootResolver.forUser(BOB), QuotaSizeLimit.unlimited());
             maxQuotaManager.setMaxMessage(userQuotaRootResolver.forUser(BOB), QuotaCountLimit.unlimited());
-            currentQuotaManager.increase(quotaIncrease);
+            currentQuotaManager.increase(quotaIncrease).block();
 
             JsonPath jsonPath =
                 when()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org