You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/03 10:27:38 UTC

[james-project] 02/08: [Performance] Rely on reactor for Cassandra max quota definition

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0f513dd32611a815b332227ba6521f1d2a4f8247
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 1 17:03:39 2020 +0700

    [Performance] Rely on reactor for Cassandra max quota definition
---
 .../quota/CassandraGlobalMaxQuotaDao.java          | 52 +++++++--------
 .../quota/CassandraPerDomainMaxQuotaDao.java       | 49 ++++++--------
 .../quota/CassandraPerUserMaxQuotaDao.java         | 49 ++++++--------
 .../quota/CassandraPerUserMaxQuotaManager.java     | 76 ++++++++++++----------
 4 files changed, 108 insertions(+), 118 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
index fba2c52..74fd9bf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraGlobalMaxQuotaDao.java
@@ -25,24 +25,24 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
-import java.util.Optional;
-
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.quota.QuotaCountLimit;
 import org.apache.james.core.quota.QuotaSizeLimit;
 import org.apache.james.mailbox.cassandra.table.CassandraGlobalMaxQuota;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.Delete;
 import com.datastax.driver.core.querybuilder.Insert;
 import com.datastax.driver.core.querybuilder.Select;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraGlobalMaxQuotaDao {
 
-    private final Session session;
+    private final CassandraAsyncExecutor queryExecutor;
     private final PreparedStatement setGlobalMaxStorageStatement;
     private final PreparedStatement setGlobalMaxMessageStatement;
     private final PreparedStatement getGlobalMaxStatement;
@@ -50,7 +50,7 @@ public class CassandraGlobalMaxQuotaDao {
 
     @Inject
     public CassandraGlobalMaxQuotaDao(Session session) {
-        this.session = session;
+        this.queryExecutor = new CassandraAsyncExecutor(session);
         this.getGlobalMaxStatement = session.prepare(getGlobalMaxStatement());
         this.setGlobalMaxMessageStatement = session.prepare(setGlobalMaxMessageStatement());
         this.setGlobalMaxStorageStatement = session.prepare(setGlobalMaxStorageStatement());
@@ -81,39 +81,33 @@ public class CassandraGlobalMaxQuotaDao {
             .where(eq(CassandraGlobalMaxQuota.TYPE, bindMarker(CassandraGlobalMaxQuota.TYPE)));
     }
 
-    public void setGlobalMaxStorage(QuotaSizeLimit globalMaxStorage) {
-        session.execute(setGlobalMaxStorageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxStorage)));
+    Mono<Void> setGlobalMaxStorage(QuotaSizeLimit globalMaxStorage) {
+        return queryExecutor.executeVoid(setGlobalMaxStorageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxStorage)));
     }
 
-    public void setGlobalMaxMessage(QuotaCountLimit globalMaxMessageCount) {
-        session.execute(setGlobalMaxMessageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxMessageCount)));
+    Mono<Void> setGlobalMaxMessage(QuotaCountLimit globalMaxMessageCount) {
+        return queryExecutor.executeVoid(setGlobalMaxMessageStatement.bind(QuotaCodec.quotaValueToLong(globalMaxMessageCount)));
     }
 
-    public Optional<QuotaSizeLimit> getGlobalMaxStorage() {
-        ResultSet resultSet = session.execute(getGlobalMaxStatement.bind()
-            .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE));
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        Long maxStorage = resultSet.one().get(CassandraGlobalMaxQuota.VALUE, Long.class);
-        return QuotaCodec.longToQuotaSize(maxStorage);
+    Mono<QuotaSizeLimit> getGlobalMaxStorage() {
+        return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
+                .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.STORAGE))
+            .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
+            .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
     }
 
-    public Optional<QuotaCountLimit> getGlobalMaxMessage() {
-        ResultSet resultSet = session.execute(getGlobalMaxStatement.bind()
-            .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE));
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        Long maxMessages = resultSet.one().get(CassandraGlobalMaxQuota.VALUE, Long.class);
-        return QuotaCodec.longToQuotaCount(maxMessages);
+    Mono<QuotaCountLimit> getGlobalMaxMessage() {
+        return queryExecutor.executeSingleRow(getGlobalMaxStatement.bind()
+            .setString(CassandraGlobalMaxQuota.TYPE, CassandraGlobalMaxQuota.MESSAGE))
+            .flatMap(row -> Mono.justOrEmpty(row.get(CassandraGlobalMaxQuota.VALUE, Long.class)))
+            .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
     }
 
-    public void removeGlobaltMaxStorage() {
-        session.execute(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.STORAGE));
+    Mono<Void> removeGlobaltMaxStorage() {
+        return queryExecutor.executeVoid(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.STORAGE));
     }
 
-    public void removeGlobalMaxMessage() {
-        session.execute(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.MESSAGE));
+    Mono<Void> removeGlobalMaxMessage() {
+        return queryExecutor.executeVoid(removeGlobalMaxQuotaStatement.bind(CassandraGlobalMaxQuota.MESSAGE));
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
index cccbdc2..cb49d3a 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerDomainMaxQuotaDao.java
@@ -25,25 +25,25 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
-import java.util.Optional;
-
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.Domain;
 import org.apache.james.core.quota.QuotaCountLimit;
 import org.apache.james.core.quota.QuotaSizeLimit;
 import org.apache.james.mailbox.cassandra.table.CassandraDomainMaxQuota;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.Delete;
 import com.datastax.driver.core.querybuilder.Insert;
 import com.datastax.driver.core.querybuilder.Select;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraPerDomainMaxQuotaDao {
 
-    private final Session session;
+    private final CassandraAsyncExecutor queryExecutor;
     private final PreparedStatement setMaxStorageStatement;
     private final PreparedStatement setMaxMessageStatement;
     private final PreparedStatement getMaxStorageStatement;
@@ -53,7 +53,7 @@ public class CassandraPerDomainMaxQuotaDao {
 
     @Inject
     public CassandraPerDomainMaxQuotaDao(Session session) {
-        this.session = session;
+        this.queryExecutor = new CassandraAsyncExecutor(session);
         this.setMaxStorageStatement = session.prepare(setMaxStorageStatement());
         this.setMaxMessageStatement = session.prepare(setMaxMessageStatement());
         this.getMaxStorageStatement = session.prepare(getMaxStorageStatement());
@@ -98,38 +98,31 @@ public class CassandraPerDomainMaxQuotaDao {
             .value(CassandraDomainMaxQuota.STORAGE, bindMarker());
     }
 
-    public void setMaxStorage(Domain domain, QuotaSizeLimit maxStorageQuota) {
-        session.execute(setMaxStorageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxStorageQuota)));
+    Mono<Void> setMaxStorage(Domain domain, QuotaSizeLimit maxStorageQuota) {
+        return queryExecutor.executeVoid(setMaxStorageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxStorageQuota)));
     }
 
-    public void setMaxMessage(Domain domain, QuotaCountLimit maxMessageCount) {
-        session.execute(setMaxMessageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxMessageCount)));
+    Mono<Void> setMaxMessage(Domain domain, QuotaCountLimit maxMessageCount) {
+        return queryExecutor.executeVoid(setMaxMessageStatement.bind(domain.asString(), QuotaCodec.quotaValueToLong(maxMessageCount)));
     }
 
-    public Optional<QuotaSizeLimit> getMaxStorage(Domain domain) {
-        ResultSet resultSet = session.execute(getMaxStorageStatement.bind(domain.asString()));
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        Long maxStorage = resultSet.one().get(CassandraDomainMaxQuota.STORAGE, Long.class);
-        return QuotaCodec.longToQuotaSize(maxStorage);
+    Mono<QuotaSizeLimit> getMaxStorage(Domain domain) {
+        return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(domain.asString()))
+            .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.STORAGE, Long.class)))
+            .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
     }
 
-    public Optional<QuotaCountLimit> getMaxMessage(Domain domain) {
-        ResultSet resultSet = session.execute(getMaxMessageStatement.bind(domain.asString()));
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        Long maxMessages = resultSet.one().get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class);
-        return QuotaCodec.longToQuotaCount(maxMessages);
+    Mono<QuotaCountLimit> getMaxMessage(Domain domain) {
+        return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(domain.asString()))
+            .flatMap(row -> Mono.justOrEmpty(row.get(CassandraDomainMaxQuota.MESSAGE_COUNT, Long.class)))
+            .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
     }
 
-    public void removeMaxMessage(Domain domain) {
-        session.execute(removeMaxMessageStatement.bind(domain.asString()));
+    Mono<Void> removeMaxMessage(Domain domain) {
+        return queryExecutor.executeVoid(removeMaxMessageStatement.bind(domain.asString()));
     }
 
-    public void removeMaxStorage(Domain domain) {
-        session.execute(removeMaxStorageStatement.bind(domain.asString()));
+    Mono<Void> removeMaxStorage(Domain domain) {
+        return queryExecutor.executeVoid(removeMaxStorageStatement.bind(domain.asString()));
     }
-
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
index 52beb42..cff560d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaDao.java
@@ -25,25 +25,25 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
-import java.util.Optional;
-
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.quota.QuotaCountLimit;
 import org.apache.james.core.quota.QuotaSizeLimit;
 import org.apache.james.mailbox.cassandra.table.CassandraMaxQuota;
 import org.apache.james.mailbox.model.QuotaRoot;
 
 import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.querybuilder.Delete;
 import com.datastax.driver.core.querybuilder.Insert;
 import com.datastax.driver.core.querybuilder.Select;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraPerUserMaxQuotaDao {
 
-    private final Session session;
+    private final CassandraAsyncExecutor queryExecutor;
     private final PreparedStatement setMaxStorageStatement;
     private final PreparedStatement setMaxMessageStatement;
     private final PreparedStatement getMaxStorageStatement;
@@ -53,7 +53,7 @@ public class CassandraPerUserMaxQuotaDao {
 
     @Inject
     public CassandraPerUserMaxQuotaDao(Session session) {
-        this.session = session;
+        this.queryExecutor = new CassandraAsyncExecutor(session);
         this.setMaxStorageStatement = session.prepare(setMaxStorageStatement());
         this.setMaxMessageStatement = session.prepare(setMaxMessageStatement());
         this.getMaxStorageStatement = session.prepare(getMaxStorageStatement());
@@ -98,38 +98,31 @@ public class CassandraPerUserMaxQuotaDao {
             .value(CassandraMaxQuota.STORAGE, bindMarker());
     }
 
-    public void setMaxStorage(QuotaRoot quotaRoot, QuotaSizeLimit maxStorageQuota) {
-        session.execute(setMaxStorageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxStorageQuota)));
+    Mono<Void> setMaxStorage(QuotaRoot quotaRoot, QuotaSizeLimit maxStorageQuota) {
+        return queryExecutor.executeVoid(setMaxStorageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxStorageQuota)));
     }
 
-    public void setMaxMessage(QuotaRoot quotaRoot, QuotaCountLimit maxMessageCount) {
-        session.execute(setMaxMessageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxMessageCount)));
+    Mono<Void> setMaxMessage(QuotaRoot quotaRoot, QuotaCountLimit maxMessageCount) {
+        return queryExecutor.executeVoid(setMaxMessageStatement.bind(quotaRoot.getValue(), QuotaCodec.quotaValueToLong(maxMessageCount)));
     }
 
-    public Optional<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) {
-        ResultSet resultSet = session.execute(getMaxStorageStatement.bind(quotaRoot.getValue()));
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        Long maxStorage = resultSet.one().get(CassandraMaxQuota.STORAGE, Long.class);
-        return QuotaCodec.longToQuotaSize(maxStorage);
+    Mono<QuotaSizeLimit> getMaxStorage(QuotaRoot quotaRoot) {
+        return queryExecutor.executeSingleRow(getMaxStorageStatement.bind(quotaRoot.getValue()))
+            .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.STORAGE, Long.class)))
+            .flatMap(maxStorage -> Mono.justOrEmpty(QuotaCodec.longToQuotaSize(maxStorage)));
     }
 
-    public Optional<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) {
-        ResultSet resultSet = session.execute(getMaxMessageStatement.bind(quotaRoot.getValue()));
-        if (resultSet.isExhausted()) {
-            return Optional.empty();
-        }
-        Long maxMessages = resultSet.one().get(CassandraMaxQuota.MESSAGE_COUNT, Long.class);
-        return QuotaCodec.longToQuotaCount(maxMessages);
+    Mono<QuotaCountLimit> getMaxMessage(QuotaRoot quotaRoot) {
+        return queryExecutor.executeSingleRow(getMaxMessageStatement.bind(quotaRoot.getValue()))
+            .flatMap(row -> Mono.justOrEmpty(row.get(CassandraMaxQuota.MESSAGE_COUNT, Long.class)))
+            .flatMap(maxMessages -> Mono.justOrEmpty(QuotaCodec.longToQuotaCount(maxMessages)));
     }
 
-    public void removeMaxMessage(QuotaRoot quotaRoot) {
-        session.execute(removeMaxMessageStatement.bind(quotaRoot.getValue()));
+    Mono<Void> removeMaxMessage(QuotaRoot quotaRoot) {
+        return queryExecutor.executeVoid(removeMaxMessageStatement.bind(quotaRoot.getValue()));
     }
 
-    public void removeMaxStorage(QuotaRoot quotaRoot) {
-        session.execute(removeMaxStorageStatement.bind(quotaRoot.getValue()));
+    Mono<Void> removeMaxStorage(QuotaRoot quotaRoot) {
+        return queryExecutor.executeVoid(removeMaxStorageStatement.bind(quotaRoot.getValue()));
     }
-
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java
index 671e707..12d90c1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/quota/CassandraPerUserMaxQuotaManager.java
@@ -21,8 +21,6 @@ package org.apache.james.mailbox.cassandra.quota;
 
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -34,9 +32,11 @@ import org.apache.james.mailbox.model.Quota;
 import org.apache.james.mailbox.model.QuotaRoot;
 import org.apache.james.mailbox.quota.MaxQuotaManager;
 
-import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class CassandraPerUserMaxQuotaManager implements MaxQuotaManager {
 
     private final CassandraPerUserMaxQuotaDao perUserQuota;
@@ -54,103 +54,113 @@ public class CassandraPerUserMaxQuotaManager implements MaxQuotaManager {
 
     @Override
     public void setMaxStorage(QuotaRoot quotaRoot, QuotaSizeLimit maxStorageQuota) {
-        perUserQuota.setMaxStorage(quotaRoot, maxStorageQuota);
+        perUserQuota.setMaxStorage(quotaRoot, maxStorageQuota).block();
     }
 
     @Override
     public void setMaxMessage(QuotaRoot quotaRoot, QuotaCountLimit maxMessageCount) {
-        perUserQuota.setMaxMessage(quotaRoot, maxMessageCount);
+        perUserQuota.setMaxMessage(quotaRoot, maxMessageCount).block();
     }
 
     @Override
     public void setDomainMaxMessage(Domain domain, QuotaCountLimit count) {
-        perDomainQuota.setMaxMessage(domain, count);
+        perDomainQuota.setMaxMessage(domain, count).block();
     }
 
     @Override
     public void setDomainMaxStorage(Domain domain, QuotaSizeLimit size) {
-        perDomainQuota.setMaxStorage(domain, size);
+        perDomainQuota.setMaxStorage(domain, size).block();
     }
 
     @Override
     public void removeDomainMaxMessage(Domain domain) {
-        perDomainQuota.removeMaxMessage(domain);
+        perDomainQuota.removeMaxMessage(domain).block();
     }
 
     @Override
     public void removeDomainMaxStorage(Domain domain) {
-        perDomainQuota.removeMaxStorage(domain);
+        perDomainQuota.removeMaxStorage(domain).block();
     }
 
     @Override
     public Optional<QuotaCountLimit> getDomainMaxMessage(Domain domain) {
-        return perDomainQuota.getMaxMessage(domain);
+        return perDomainQuota.getMaxMessage(domain).blockOptional();
     }
 
     @Override
     public Optional<QuotaSizeLimit> getDomainMaxStorage(Domain domain) {
-        return perDomainQuota.getMaxStorage(domain);
+        return perDomainQuota.getMaxStorage(domain).blockOptional();
     }
 
     @Override
     public void removeMaxMessage(QuotaRoot quotaRoot) {
-        perUserQuota.removeMaxMessage(quotaRoot);
+        perUserQuota.removeMaxMessage(quotaRoot).block();
     }
 
     @Override
     public void removeMaxStorage(QuotaRoot quotaRoot) {
-        perUserQuota.removeMaxStorage(quotaRoot);
+        perUserQuota.removeMaxStorage(quotaRoot).block();
     }
 
     @Override
     public void setGlobalMaxStorage(QuotaSizeLimit globalMaxStorage) {
-        globalQuota.setGlobalMaxStorage(globalMaxStorage);
+        globalQuota.setGlobalMaxStorage(globalMaxStorage).block();
     }
 
     @Override
     public void removeGlobalMaxStorage() {
-        globalQuota.removeGlobaltMaxStorage();
+        globalQuota.removeGlobaltMaxStorage().block();
     }
 
     @Override
     public void setGlobalMaxMessage(QuotaCountLimit globalMaxMessageCount) {
-        globalQuota.setGlobalMaxMessage(globalMaxMessageCount);
+        globalQuota.setGlobalMaxMessage(globalMaxMessageCount).block();
     }
 
     @Override
     public void removeGlobalMaxMessage() {
-        globalQuota.removeGlobalMaxMessage();
+        globalQuota.removeGlobalMaxMessage().block();
     }
 
     @Override
     public Optional<QuotaSizeLimit> getGlobalMaxStorage() {
-        return globalQuota.getGlobalMaxStorage();
+        return globalQuota.getGlobalMaxStorage().blockOptional();
     }
 
     @Override
     public Optional<QuotaCountLimit> getGlobalMaxMessage() {
-        return globalQuota.getGlobalMaxMessage();
+        return globalQuota.getGlobalMaxMessage().blockOptional();
     }
 
     @Override
     public Map<Quota.Scope, QuotaCountLimit> listMaxMessagesDetails(QuotaRoot quotaRoot) {
-        Function<Domain, Optional<QuotaCountLimit>> domainQuotaSupplier = Throwing.function(this::getDomainMaxMessage).sneakyThrow();
-        return Stream.of(
-                Pair.of(Quota.Scope.User, perUserQuota.getMaxMessage(quotaRoot)),
-                Pair.of(Quota.Scope.Domain, quotaRoot.getDomain().flatMap(domainQuotaSupplier)),
-                Pair.of(Quota.Scope.Global, globalQuota.getGlobalMaxMessage()))
-            .filter(pair -> pair.getValue().isPresent())
-            .collect(Guavate.toImmutableMap(Pair::getKey, value -> value.getValue().get()));
+        return Flux.merge(
+                perUserQuota.getMaxMessage(quotaRoot)
+                    .map(limit -> Pair.of(Quota.Scope.User, limit)),
+                Mono.justOrEmpty(quotaRoot.getDomain())
+                    .flatMap(perDomainQuota::getMaxMessage)
+                    .map(limit -> Pair.of(Quota.Scope.Domain, limit)),
+                globalQuota.getGlobalMaxMessage()
+                    .map(limit -> Pair.of(Quota.Scope.Global, limit)))
+            .collect(Guavate.toImmutableMap(
+                Pair::getKey,
+                Pair::getValue))
+            .block();
     }
 
     @Override
     public Map<Quota.Scope, QuotaSizeLimit> listMaxStorageDetails(QuotaRoot quotaRoot) {
-        Function<Domain, Optional<QuotaSizeLimit>> domainQuotaSupplier = Throwing.function(this::getDomainMaxStorage).sneakyThrow();
-        return Stream.of(
-                Pair.of(Quota.Scope.User, perUserQuota.getMaxStorage(quotaRoot)),
-                Pair.of(Quota.Scope.Domain, quotaRoot.getDomain().flatMap(domainQuotaSupplier)),
-                Pair.of(Quota.Scope.Global, globalQuota.getGlobalMaxStorage()))
-            .filter(pair -> pair.getValue().isPresent())
-            .collect(Guavate.toImmutableMap(Pair::getKey, value -> value.getValue().get()));
+        return Flux.merge(
+                perUserQuota.getMaxStorage(quotaRoot)
+                    .map(limit -> Pair.of(Quota.Scope.User, limit)),
+                Mono.justOrEmpty(quotaRoot.getDomain())
+                    .flatMap(perDomainQuota::getMaxStorage)
+                    .map(limit -> Pair.of(Quota.Scope.Domain, limit)),
+                globalQuota.getGlobalMaxStorage()
+                    .map(limit -> Pair.of(Quota.Scope.Global, limit)))
+            .collect(Guavate.toImmutableMap(
+                Pair::getKey,
+                Pair::getValue))
+            .block();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org