You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/21 02:56:37 UTC
[james-project] 01/19: JAMES-3138 Reactify CurrentQuotaManager
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e03d38f6032b2e06a70ac4dbf9b487b36ff99de6
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Thu Apr 16 17:36:45 2020 +0700
JAMES-3138 Reactify CurrentQuotaManager
---
.../james/mailbox/quota/CurrentQuotaManager.java | 6 +-
.../quota/CassandraCurrentQuotaManager.java | 36 ++++++------
.../mailbox/jpa/quota/JpaCurrentQuotaManager.java | 66 ++++++++++++----------
.../quota/InMemoryCurrentQuotaManager.java | 43 +++++++-------
.../quota/InMemoryCurrentQuotaManagerTest.java | 10 ++--
.../store/quota/ListeningCurrentQuotaUpdater.java | 53 ++++++++---------
.../store/quota/StoreCurrentQuotaManager.java | 7 ++-
.../mailbox/store/quota/StoreQuotaManager.java | 6 +-
.../quota/ListeningCurrentQuotaUpdaterTest.java | 7 ++-
.../store/quota/StoreCurrentQuotaManagerTest.java | 24 ++++----
.../mailbox/store/quota/StoreQuotaManagerTest.java | 16 ++++--
.../james/webadmin/routes/UserQuotaRoutesTest.java | 4 +-
12 files changed, 149 insertions(+), 129 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java
index 06cc970..f1f3d41 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/quota/CurrentQuotaManager.java
@@ -21,16 +21,16 @@ package org.apache.james.mailbox.quota;
import org.apache.james.core.quota.QuotaCountUsage;
import org.apache.james.core.quota.QuotaSizeUsage;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.QuotaRoot;
+import org.reactivestreams.Publisher;
/**
* This interface allows us to get the current value associated to a quota value
*/
public interface CurrentQuotaManager {
- QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) throws MailboxException;
+ Publisher<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot);
- QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) throws MailboxException;
+ Publisher<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot);
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java
index 0254999..a4ba974 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraCurrentQuotaManager.java
@@ -28,6 +28,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
import javax.inject.Inject;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.core.quota.QuotaCountUsage;
import org.apache.james.core.quota.QuotaSizeUsage;
import org.apache.james.mailbox.cassandra.table.CassandraCurrentQuota;
@@ -36,12 +37,13 @@ import org.apache.james.mailbox.model.QuotaRoot;
import org.apache.james.mailbox.store.quota.StoreCurrentQuotaManager;
import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
+import reactor.core.publisher.Mono;
+
public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager {
- private final Session session;
+ private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final PreparedStatement increaseStatement;
private final PreparedStatement decreaseStatement;
private final PreparedStatement getCurrentMessageCountStatement;
@@ -49,7 +51,7 @@ public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager {
@Inject
public CassandraCurrentQuotaManager(Session session) {
- this.session = session;
+ this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.increaseStatement = session.prepare(update(CassandraCurrentQuota.TABLE_NAME)
.with(incr(CassandraCurrentQuota.MESSAGE_COUNT, bindMarker()))
.and(incr(CassandraCurrentQuota.STORAGE, bindMarker()))
@@ -67,34 +69,30 @@ public class CassandraCurrentQuotaManager implements StoreCurrentQuotaManager {
}
@Override
- public void increase(QuotaOperation quotaOperation) {
- session.execute(increaseStatement.bind(quotaOperation.count().asLong(),
+ public Mono<Void> increase(QuotaOperation quotaOperation) {
+ return cassandraAsyncExecutor.executeVoid(increaseStatement.bind(quotaOperation.count().asLong(),
quotaOperation.size().asLong(),
quotaOperation.quotaRoot().getValue()));
}
@Override
- public void decrease(QuotaOperation quotaOperation) {
- session.execute(decreaseStatement.bind(quotaOperation.count().asLong(),
+ public Mono<Void> decrease(QuotaOperation quotaOperation) {
+ return cassandraAsyncExecutor.executeVoid(decreaseStatement.bind(quotaOperation.count().asLong(),
quotaOperation.size().asLong(),
quotaOperation.quotaRoot().getValue()));
}
@Override
- public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) {
- ResultSet resultSet = session.execute(getCurrentMessageCountStatement.bind(quotaRoot.getValue()));
- if (resultSet.isExhausted()) {
- return QuotaCountUsage.count(0L);
- }
- return QuotaCountUsage.count(resultSet.one().getLong(CassandraCurrentQuota.MESSAGE_COUNT));
+ public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) {
+ return cassandraAsyncExecutor.executeSingleRow(getCurrentMessageCountStatement.bind(quotaRoot.getValue()))
+ .map(row -> QuotaCountUsage.count(row.getLong(CassandraCurrentQuota.MESSAGE_COUNT)))
+ .defaultIfEmpty(QuotaCountUsage.count(0L));
}
@Override
- public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) {
- ResultSet resultSet = session.execute(getCurrentStorageStatement.bind(quotaRoot.getValue()));
- if (resultSet.isExhausted()) {
- return QuotaSizeUsage.size(0L);
- }
- return QuotaSizeUsage.size(resultSet.one().getLong(CassandraCurrentQuota.STORAGE));
+ public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) {
+ return cassandraAsyncExecutor.executeSingleRow(getCurrentStorageStatement.bind(quotaRoot.getValue()))
+ .map(row -> QuotaSizeUsage.size(row.getLong(CassandraCurrentQuota.STORAGE)))
+ .defaultIfEmpty(QuotaSizeUsage.size(0L));
}
}
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java
index 48ef98a..f56bab0 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/quota/JpaCurrentQuotaManager.java
@@ -33,6 +33,8 @@ import org.apache.james.mailbox.model.QuotaOperation;
import org.apache.james.mailbox.model.QuotaRoot;
import org.apache.james.mailbox.store.quota.StoreCurrentQuotaManager;
+import reactor.core.publisher.Mono;
+
public class JpaCurrentQuotaManager implements StoreCurrentQuotaManager {
public static final long NO_MESSAGES = 0L;
@@ -48,49 +50,53 @@ public class JpaCurrentQuotaManager implements StoreCurrentQuotaManager {
}
@Override
- public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) {
+ public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
- return Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+
+ return Mono.fromCallable(() -> Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
.map(JpaCurrentQuota::getMessageCount)
- .orElse(QuotaCountUsage.count(NO_STORED_BYTES));
+ .orElse(QuotaCountUsage.count(NO_STORED_BYTES)));
}
@Override
- public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) {
+ public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
- return Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+
+ return Mono.fromCallable(() -> Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
.map(JpaCurrentQuota::getSize)
- .orElse(QuotaSizeUsage.size(NO_STORED_BYTES));
+ .orElse(QuotaSizeUsage.size(NO_STORED_BYTES)));
}
@Override
- public void increase(QuotaOperation quotaOperation) {
- transactionRunner.run(
- entityManager -> {
- QuotaRoot quotaRoot = quotaOperation.quotaRoot();
-
- JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
- .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
-
- entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
- jpaCurrentQuota.getMessageCount().asLong() + quotaOperation.count().asLong(),
- jpaCurrentQuota.getSize().asLong() + quotaOperation.size().asLong()));
- });
+ public Mono<Void> increase(QuotaOperation quotaOperation) {
+ return Mono.fromRunnable(() ->
+ transactionRunner.run(
+ entityManager -> {
+ QuotaRoot quotaRoot = quotaOperation.quotaRoot();
+
+ JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+ .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
+
+ entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
+ jpaCurrentQuota.getMessageCount().asLong() + quotaOperation.count().asLong(),
+ jpaCurrentQuota.getSize().asLong() + quotaOperation.size().asLong()));
+ }));
}
@Override
- public void decrease(QuotaOperation quotaOperation) {
- transactionRunner.run(
- entityManager -> {
- QuotaRoot quotaRoot = quotaOperation.quotaRoot();
-
- JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
- .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
-
- entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
- jpaCurrentQuota.getMessageCount().asLong() - quotaOperation.count().asLong(),
- jpaCurrentQuota.getSize().asLong() - quotaOperation.size().asLong()));
- });
+ public Mono<Void> decrease(QuotaOperation quotaOperation) {
+ return Mono.fromRunnable(() ->
+ transactionRunner.run(
+ entityManager -> {
+ QuotaRoot quotaRoot = quotaOperation.quotaRoot();
+
+ JpaCurrentQuota jpaCurrentQuota = Optional.ofNullable(retrieveUserQuota(entityManager, quotaRoot))
+ .orElse(new JpaCurrentQuota(quotaRoot.getValue(), NO_MESSAGES, NO_STORED_BYTES));
+
+ entityManager.merge(new JpaCurrentQuota(quotaRoot.getValue(),
+ jpaCurrentQuota.getMessageCount().asLong() - quotaOperation.count().asLong(),
+ jpaCurrentQuota.getSize().asLong() - quotaOperation.size().asLong()));
+ }));
}
private JpaCurrentQuota retrieveUserQuota(EntityManager entityManager, QuotaRoot quotaRoot) {
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java
index 6a917e6..fcbe5ff 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManager.java
@@ -19,7 +19,6 @@
package org.apache.james.mailbox.inmemory.quota;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
@@ -40,6 +39,8 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import reactor.core.publisher.Mono;
+
public class InMemoryCurrentQuotaManager implements StoreCurrentQuotaManager {
private final LoadingCache<QuotaRoot, AtomicReference<CurrentQuotas>> quotaCache;
@@ -55,38 +56,34 @@ public class InMemoryCurrentQuotaManager implements StoreCurrentQuotaManager {
}
@Override
- public void increase(QuotaOperation quotaOperation) throws MailboxException {
- updateQuota(quotaOperation.quotaRoot(), quota -> quota.increase(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
+ public Mono<Void> increase(QuotaOperation quotaOperation) {
+ return updateQuota(quotaOperation.quotaRoot(), quota -> quota.increase(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
}
@Override
- public void decrease(QuotaOperation quotaOperation) throws MailboxException {
- updateQuota(quotaOperation.quotaRoot(), quota -> quota.decrease(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
+ public Mono<Void> decrease(QuotaOperation quotaOperation) {
+ return updateQuota(quotaOperation.quotaRoot(), quota -> quota.decrease(new CurrentQuotas(quotaOperation.count(), quotaOperation.size())));
}
@Override
- public QuotaCountUsage getCurrentMessageCount(QuotaRoot quotaRoot) throws MailboxException {
- try {
- return quotaCache.get(quotaRoot).get().count();
- } catch (ExecutionException e) {
- throw new MailboxException("Exception caught", e);
- }
+ public Mono<QuotaCountUsage> getCurrentMessageCount(QuotaRoot quotaRoot) {
+ return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().count())
+ .onErrorMap(this::wrapAsMailboxException);
}
@Override
- public QuotaSizeUsage getCurrentStorage(QuotaRoot quotaRoot) throws MailboxException {
- try {
- return quotaCache.get(quotaRoot).get().size();
- } catch (ExecutionException e) {
- throw new MailboxException("Exception caught", e);
- }
+ public Mono<QuotaSizeUsage> getCurrentStorage(QuotaRoot quotaRoot) {
+ return Mono.fromCallable(() -> quotaCache.get(quotaRoot).get().size())
+ .onErrorMap(this::wrapAsMailboxException);
+ }
+
+ private Mono<Void> updateQuota(QuotaRoot quotaRoot, UnaryOperator<CurrentQuotas> quotaFunction) {
+ return Mono.fromCallable(() -> quotaCache.get(quotaRoot).updateAndGet(quotaFunction))
+ .onErrorMap(this::wrapAsMailboxException)
+ .then();
}
- private void updateQuota(QuotaRoot quotaRoot, UnaryOperator<CurrentQuotas> quotaFunction) throws MailboxException {
- try {
- quotaCache.get(quotaRoot).updateAndGet(quotaFunction);
- } catch (ExecutionException e) {
- throw new MailboxException("Exception caught", e);
- }
+ private Throwable wrapAsMailboxException(Throwable throwable) {
+ return new MailboxException("Exception caught", throwable);
}
}
diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java
index 2014706..26e2b1f 100644
--- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java
+++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/quota/InMemoryCurrentQuotaManagerTest.java
@@ -56,7 +56,7 @@ class InMemoryCurrentQuotaManagerTest {
when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null))
.thenReturn(CURRENT_QUOTAS);
- assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(18));
+ assertThat(testee.getCurrentMessageCount(QUOTA_ROOT).block()).isEqualTo(QuotaCountUsage.count(18));
}
@Test
@@ -64,7 +64,7 @@ class InMemoryCurrentQuotaManagerTest {
when(mockedCurrentQuotaCalculator.recalculateCurrentQuotas(QUOTA_ROOT, null))
.thenReturn(CURRENT_QUOTAS);
- assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(512));
+ assertThat(testee.getCurrentStorage(QUOTA_ROOT).block()).isEqualTo(QuotaSizeUsage.size(512));
}
@Test
@@ -73,9 +73,9 @@ class InMemoryCurrentQuotaManagerTest {
.thenReturn(CURRENT_QUOTAS);
QuotaOperation quotaOperation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100));
- testee.increase(quotaOperation);
+ testee.increase(quotaOperation).block();
- assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(28));
- assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(612));
+ assertThat(testee.getCurrentMessageCount(QUOTA_ROOT).block()).isEqualTo(QuotaCountUsage.count(28));
+ assertThat(testee.getCurrentStorage(QUOTA_ROOT).block()).isEqualTo(QuotaSizeUsage.size(612));
}
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
index bf96d76..c8bc83a 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
@@ -40,6 +40,8 @@ import org.apache.james.mailbox.store.event.EventFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Mono;
+
public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailboxListener, QuotaUpdater {
public static class ListeningCurrentQuotaUpdaterGroup extends Group {
@@ -89,36 +91,34 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
private void handleExpungedEvent(Expunged expunged, QuotaRoot quotaRoot) {
computeQuotaOperation(expunged, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
- currentQuotaManager.decrease(quotaOperation);
-
- eventBus.dispatch(
- EventFactory.quotaUpdated()
- .randomEventId()
- .user(expunged.getUsername())
- .quotaRoot(quotaRoot)
- .quotaCount(quotaManager.getMessageQuota(quotaRoot))
- .quotaSize(quotaManager.getStorageQuota(quotaRoot))
- .instant(Instant.now())
- .build(),
- NO_REGISTRATION_KEYS)
+ currentQuotaManager.decrease(quotaOperation)
+ .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
+ EventFactory.quotaUpdated()
+ .randomEventId()
+ .user(expunged.getUsername())
+ .quotaRoot(quotaRoot)
+ .quotaCount(quotaManager.getMessageQuota(quotaRoot))
+ .quotaSize(quotaManager.getStorageQuota(quotaRoot))
+ .instant(Instant.now())
+ .build(),
+ NO_REGISTRATION_KEYS)).sneakyThrow()))
.block();
}).sneakyThrow());
}
private void handleAddedEvent(Added added, QuotaRoot quotaRoot) {
computeQuotaOperation(added, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
- currentQuotaManager.increase(quotaOperation);
-
- eventBus.dispatch(
- EventFactory.quotaUpdated()
- .randomEventId()
- .user(added.getUsername())
- .quotaRoot(quotaRoot)
- .quotaCount(quotaManager.getMessageQuota(quotaRoot))
- .quotaSize(quotaManager.getStorageQuota(quotaRoot))
- .instant(Instant.now())
- .build(),
- NO_REGISTRATION_KEYS)
+ currentQuotaManager.increase(quotaOperation)
+ .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
+ EventFactory.quotaUpdated()
+ .randomEventId()
+ .user(added.getUsername())
+ .quotaRoot(quotaRoot)
+ .quotaCount(quotaManager.getMessageQuota(quotaRoot))
+ .quotaSize(quotaManager.getStorageQuota(quotaRoot))
+ .instant(Instant.now())
+ .build(),
+ NO_REGISTRATION_KEYS)).sneakyThrow()))
.block();
}).sneakyThrow());
}
@@ -144,8 +144,9 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
boolean mailboxContainedMessages = mailboxDeletionEvent.getDeletedMessageCount().asLong() > 0;
if (mailboxContainedMessages) {
currentQuotaManager.decrease(new QuotaOperation(mailboxDeletionEvent.getQuotaRoot(),
- mailboxDeletionEvent.getDeletedMessageCount(),
- mailboxDeletionEvent.getTotalDeletedSize()));
+ mailboxDeletionEvent.getDeletedMessageCount(),
+ mailboxDeletionEvent.getTotalDeletedSize()))
+ .block();
}
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java
index d785f89..5d6cb08 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManager.java
@@ -19,14 +19,15 @@
package org.apache.james.mailbox.store.quota;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.QuotaOperation;
import org.apache.james.mailbox.quota.CurrentQuotaManager;
+import reactor.core.publisher.Mono;
+
public interface StoreCurrentQuotaManager extends CurrentQuotaManager {
- void increase(QuotaOperation quotaOperation) throws MailboxException;
+ Mono<Void> increase(QuotaOperation quotaOperation);
- void decrease(QuotaOperation quotaOperation) throws MailboxException;
+ Mono<Void> decrease(QuotaOperation quotaOperation);
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java
index 82d227d..a731e91 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/StoreQuotaManager.java
@@ -35,6 +35,8 @@ import org.apache.james.mailbox.quota.CurrentQuotaManager;
import org.apache.james.mailbox.quota.MaxQuotaManager;
import org.apache.james.mailbox.quota.QuotaManager;
+import reactor.core.publisher.Mono;
+
/**
* Default implementation for the Quota Manager.
*
@@ -54,7 +56,7 @@ public class StoreQuotaManager implements QuotaManager {
public Quota<QuotaCountLimit, QuotaCountUsage> getMessageQuota(QuotaRoot quotaRoot) throws MailboxException {
Map<Scope, QuotaCountLimit> maxMessageDetails = maxQuotaManager.listMaxMessagesDetails(quotaRoot);
return Quota.<QuotaCountLimit, QuotaCountUsage>builder()
- .used(currentQuotaManager.getCurrentMessageCount(quotaRoot))
+ .used(Mono.from(currentQuotaManager.getCurrentMessageCount(quotaRoot)).block())
.computedLimit(maxQuotaManager.getMaxMessage(maxMessageDetails).orElse(QuotaCountLimit.unlimited()))
.limitsByScope(maxMessageDetails)
.build();
@@ -65,7 +67,7 @@ public class StoreQuotaManager implements QuotaManager {
public Quota<QuotaSizeLimit, QuotaSizeUsage> getStorageQuota(QuotaRoot quotaRoot) throws MailboxException {
Map<Scope, QuotaSizeLimit> maxStorageDetails = maxQuotaManager.listMaxStorageDetails(quotaRoot);
return Quota.<QuotaSizeLimit, QuotaSizeUsage>builder()
- .used(currentQuotaManager.getCurrentStorage(quotaRoot))
+ .used(Mono.from(currentQuotaManager.getCurrentStorage(quotaRoot)).block())
.computedLimit(maxQuotaManager.getMaxStorage(maxStorageDetails).orElse(QuotaSizeLimit.unlimited()))
.limitsByScope(maxStorageDetails)
.build();
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java
index 6825773..a43ad52 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdaterTest.java
@@ -96,6 +96,7 @@ class ListeningCurrentQuotaUpdaterTest {
when(added.getUids()).thenReturn(Lists.newArrayList(MessageUid.of(36), MessageUid.of(38)));
when(added.getUsername()).thenReturn(USERNAME_BENWA);
when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT);
+ when(mockedCurrentQuotaManager.increase(QUOTA)).thenAnswer(any -> Mono.empty());
testee.event(added);
@@ -111,6 +112,7 @@ class ListeningCurrentQuotaUpdaterTest {
when(expunged.getMailboxId()).thenReturn(MAILBOX_ID);
when(expunged.getUsername()).thenReturn(USERNAME_BENWA);
when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT);
+ when(mockedCurrentQuotaManager.decrease(QUOTA)).thenAnswer(any -> Mono.empty());
testee.event(expunged);
@@ -145,6 +147,8 @@ class ListeningCurrentQuotaUpdaterTest {
@Test
void mailboxDeletionEventShouldDecreaseCurrentQuotaValues() throws Exception {
+ QuotaOperation operation = new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(5));
+
MailboxListener.MailboxDeletion deletion = mock(MailboxListener.MailboxDeletion.class);
when(deletion.getQuotaRoot()).thenReturn(QUOTA_ROOT);
when(deletion.getDeletedMessageCount()).thenReturn(QuotaCountUsage.count(10));
@@ -152,10 +156,11 @@ class ListeningCurrentQuotaUpdaterTest {
when(deletion.getMailboxId()).thenReturn(MAILBOX_ID);
when(deletion.getUsername()).thenReturn(USERNAME_BENWA);
when(mockedQuotaRootResolver.getQuotaRoot(eq(MAILBOX_ID))).thenReturn(QUOTA_ROOT);
+ when(mockedCurrentQuotaManager.decrease(operation)).thenAnswer(any -> Mono.empty());
testee.event(deletion);
- verify(mockedCurrentQuotaManager).decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(5)));
+ verify(mockedCurrentQuotaManager).decrease(operation);
}
@Test
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java
index df39dd2..02701fb 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreCurrentQuotaManagerTest.java
@@ -30,6 +30,8 @@ import org.apache.james.mailbox.model.QuotaRoot;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
public abstract class StoreCurrentQuotaManagerTest {
private static final QuotaRoot QUOTA_ROOT = QuotaRoot.quotaRoot("benwa", Optional.empty());
@@ -44,32 +46,32 @@ public abstract class StoreCurrentQuotaManagerTest {
@Test
void getCurrentStorageShouldReturnZeroByDefault() throws Exception {
- assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(0));
+ assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(0));
}
@Test
void increaseShouldWork() throws Exception {
- testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)));
+ testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block();
- assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(10));
- assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(100));
+ assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(10));
+ assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(100));
}
@Test
void decreaseShouldWork() throws Exception {
- testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(20), QuotaSizeUsage.size(200)));
+ testee.increase(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(20), QuotaSizeUsage.size(200))).block();
- testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)));
+ testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block();
- assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(10));
- assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(100));
+ assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(10));
+ assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(100));
}
@Test
void decreaseShouldNotFailWhenItLeadsToNegativeValues() throws Exception {
- testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100)));
+ testee.decrease(new QuotaOperation(QUOTA_ROOT, QuotaCountUsage.count(10), QuotaSizeUsage.size(100))).block();
- assertThat(testee.getCurrentMessageCount(QUOTA_ROOT)).isEqualTo(QuotaCountUsage.count(-10));
- assertThat(testee.getCurrentStorage(QUOTA_ROOT)).isEqualTo(QuotaSizeUsage.size(-100));
+ assertThat(Mono.from(testee.getCurrentMessageCount(QUOTA_ROOT)).block()).isEqualTo(QuotaCountUsage.count(-10));
+ assertThat(Mono.from(testee.getCurrentStorage(QUOTA_ROOT)).block()).isEqualTo(QuotaSizeUsage.size(-100));
}
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java
index fbaba67..205f827 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/StoreQuotaManagerTest.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.quota.MaxQuotaManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
class StoreQuotaManagerTest {
StoreQuotaManager testee;
@@ -57,7 +59,9 @@ class StoreQuotaManagerTest {
@Test
void getMessageQuotaShouldWorkWithNumericValues() throws Exception {
when(mockedMaxQuotaManager.getMaxMessage(any(Map.class))).thenReturn(Optional.of(QuotaCountLimit.count(360L)));
- when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenReturn(QuotaCountUsage.count(36L));
+ when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenAnswer(any ->
+ Mono.fromCallable(() -> QuotaCountUsage.count(36L)));
+
assertThat(testee.getMessageQuota(quotaRoot)).isEqualTo(
Quota.<QuotaCountLimit, QuotaCountUsage>builder().used(QuotaCountUsage.count(36)).computedLimit(QuotaCountLimit.count(360)).build());
}
@@ -66,7 +70,9 @@ class StoreQuotaManagerTest {
@Test
void getStorageQuotaShouldWorkWithNumericValues() throws Exception {
when(mockedMaxQuotaManager.getMaxStorage(any(Map.class))).thenReturn(Optional.of(QuotaSizeLimit.size(360L)));
- when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenReturn(QuotaSizeUsage.size(36L));
+ when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenAnswer(any ->
+ Mono.fromCallable(() -> QuotaSizeUsage.size(36L)));
+
assertThat(testee.getStorageQuota(quotaRoot)).isEqualTo(
Quota.<QuotaSizeLimit, QuotaSizeUsage>builder().used(QuotaSizeUsage.size(36)).computedLimit(QuotaSizeLimit.size(360)).build());
}
@@ -75,7 +81,8 @@ class StoreQuotaManagerTest {
@Test
void getStorageQuotaShouldCalculateCurrentQuotaWhenUnlimited() throws Exception {
when(mockedMaxQuotaManager.getMaxStorage(any(Map.class))).thenReturn(Optional.of(QuotaSizeLimit.unlimited()));
- when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenReturn(QuotaSizeUsage.size(36L));
+ when(mockedCurrentQuotaManager.getCurrentStorage(quotaRoot)).thenAnswer(any ->
+ Mono.fromCallable(() -> QuotaSizeUsage.size(36L)));
assertThat(testee.getStorageQuota(quotaRoot)).isEqualTo(
Quota.<QuotaSizeLimit, QuotaSizeUsage>builder().used(QuotaSizeUsage.size(36)).computedLimit(QuotaSizeLimit.unlimited()).build());
@@ -85,7 +92,8 @@ class StoreQuotaManagerTest {
@Test
void getMessageQuotaShouldCalculateCurrentQuotaWhenUnlimited() throws Exception {
when(mockedMaxQuotaManager.getMaxMessage(any(Map.class))).thenReturn(Optional.of(QuotaCountLimit.unlimited()));
- when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenReturn(QuotaCountUsage.count(36L));
+ when(mockedCurrentQuotaManager.getCurrentMessageCount(quotaRoot)).thenAnswer(any ->
+ Mono.fromCallable(() -> QuotaCountUsage.count(36L)));
assertThat(testee.getMessageQuota(quotaRoot)).isEqualTo(
Quota.<QuotaCountLimit, QuotaCountUsage>builder().used(QuotaCountUsage.count(36)).computedLimit(QuotaCountLimit.unlimited()).build());
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java
index e320c5a..d656dcc 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserQuotaRoutesTest.java
@@ -852,7 +852,7 @@ class UserQuotaRoutesTest {
maxQuotaManager.setMaxStorage(userQuotaRootResolver.forUser(BOB), QuotaSizeLimit.size(80));
maxQuotaManager.setMaxMessage(userQuotaRootResolver.forUser(BOB), QuotaCountLimit.count(100));
- currentQuotaManager.increase(quotaIncrease);
+ currentQuotaManager.increase(quotaIncrease).block();
JsonPath jsonPath =
when()
@@ -881,7 +881,7 @@ class UserQuotaRoutesTest {
maxQuotaManager.setMaxStorage(userQuotaRootResolver.forUser(BOB), QuotaSizeLimit.unlimited());
maxQuotaManager.setMaxMessage(userQuotaRootResolver.forUser(BOB), QuotaCountLimit.unlimited());
- currentQuotaManager.increase(quotaIncrease);
+ currentQuotaManager.increase(quotaIncrease).block();
JsonPath jsonPath =
when()
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org