You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/05 07:51:02 UTC

[james-project] 03/07: JAMES-3155 ListeningCurrentQuotaUpdater can be reactive

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9183c5f83bb009d14b8ad3f1af3ca0b513bbf29d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Apr 25 16:58:54 2020 +0700

    JAMES-3155 ListeningCurrentQuotaUpdater can be reactive
---
 .../store/quota/ListeningCurrentQuotaUpdater.java  | 99 ++++++++++++----------
 .../modules/mailbox/CassandraQuotaModule.java      |  2 +-
 .../james/modules/mailbox/JpaQuotaModule.java      |  2 +-
 .../james/modules/mailbox/MemoryQuotaModule.java   |  2 +-
 4 files changed, 58 insertions(+), 47 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
index ac76eb6..f564242 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
@@ -19,11 +19,13 @@
 package org.apache.james.mailbox.store.quota;
 
 import java.time.Instant;
-import java.util.Optional;
 
 import javax.inject.Inject;
 
+import org.apache.james.core.Username;
+import org.apache.james.core.quota.QuotaCountLimit;
 import org.apache.james.core.quota.QuotaCountUsage;
+import org.apache.james.core.quota.QuotaSizeLimit;
 import org.apache.james.core.quota.QuotaSizeUsage;
 import org.apache.james.mailbox.events.Event;
 import org.apache.james.mailbox.events.EventBus;
@@ -31,19 +33,22 @@ import org.apache.james.mailbox.events.Group;
 import org.apache.james.mailbox.events.MailboxListener;
 import org.apache.james.mailbox.events.RegistrationKey;
 import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.Quota;
 import org.apache.james.mailbox.model.QuotaOperation;
 import org.apache.james.mailbox.model.QuotaRoot;
 import org.apache.james.mailbox.quota.CurrentQuotaManager;
 import org.apache.james.mailbox.quota.QuotaManager;
 import org.apache.james.mailbox.quota.QuotaRootResolver;
 import org.apache.james.mailbox.store.event.EventFactory;
+import org.reactivestreams.Publisher;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
 
-public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailboxListener, QuotaUpdater {
+public class ListeningCurrentQuotaUpdater implements MailboxListener.ReactiveGroupMailboxListener, QuotaUpdater {
     public static class ListeningCurrentQuotaUpdaterGroup extends Group {
 
     }
@@ -75,63 +80,69 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
     }
 
     @Override
-    public void event(Event event) throws MailboxException {
+    public Publisher<Void> reactiveEvent(Event event) {
+        try {
             if (event instanceof Added) {
                 Added addedEvent = (Added) event;
                 QuotaRoot quotaRoot = quotaRootResolver.getQuotaRoot(addedEvent.getMailboxId());
-                handleAddedEvent(addedEvent, quotaRoot);
+                return handleAddedEvent(addedEvent, quotaRoot);
             } else if (event instanceof Expunged) {
                 Expunged expungedEvent = (Expunged) event;
                 QuotaRoot quotaRoot = quotaRootResolver.getQuotaRoot(expungedEvent.getMailboxId());
-                handleExpungedEvent(expungedEvent, quotaRoot);
+                return handleExpungedEvent(expungedEvent, quotaRoot);
             } else if (event instanceof MailboxDeletion) {
                 MailboxDeletion mailboxDeletionEvent = (MailboxDeletion) event;
-                handleMailboxDeletionEvent(mailboxDeletionEvent);
+                return handleMailboxDeletionEvent(mailboxDeletionEvent);
             }
+            return Mono.empty();
+        } catch (MailboxException e) {
+            return Mono.error(e);
+        }
+    }
+
+    private Mono<Void> handleExpungedEvent(Expunged expunged, QuotaRoot quotaRoot) {
+        return computeQuotaOperation(expunged, quotaRoot)
+            .flatMap(quotaOperation ->
+                Mono.from(currentQuotaManager.decrease(quotaOperation))
+                    .then(dispatchNewQuota(quotaRoot, expunged.getUsername())));
     }
 
-    private void handleExpungedEvent(Expunged expunged, QuotaRoot quotaRoot) {
-        computeQuotaOperation(expunged, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
-            Mono.from(currentQuotaManager.decrease(quotaOperation))
-                .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
-                    EventFactory.quotaUpdated()
-                        .randomEventId()
-                        .user(expunged.getUsername())
-                        .quotaRoot(quotaRoot)
-                        .quotaCount(quotaManager.getMessageQuota(quotaRoot))
-                        .quotaSize(quotaManager.getStorageQuota(quotaRoot))
-                        .instant(Instant.now())
-                        .build(),
-                    NO_REGISTRATION_KEYS)).sneakyThrow()))
-                .block();
-        }).sneakyThrow());
+    private Mono<Void> handleAddedEvent(Added added, QuotaRoot quotaRoot) {
+        return computeQuotaOperation(added, quotaRoot)
+            .flatMap(quotaOperation ->
+                Mono.from(currentQuotaManager.increase(quotaOperation))
+                    .then(dispatchNewQuota(quotaRoot, added.getUsername())));
     }
 
-    private void handleAddedEvent(Added added, QuotaRoot quotaRoot) {
-        computeQuotaOperation(added, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
-            Mono.from(currentQuotaManager.increase(quotaOperation))
-                .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
-                    EventFactory.quotaUpdated()
-                        .randomEventId()
-                        .user(added.getUsername())
-                        .quotaRoot(quotaRoot)
-                        .quotaCount(quotaManager.getMessageQuota(quotaRoot))
-                        .quotaSize(quotaManager.getStorageQuota(quotaRoot))
-                        .instant(Instant.now())
-                        .build(),
-                    NO_REGISTRATION_KEYS)).sneakyThrow()))
-                .block();
-        }).sneakyThrow());
+    private Mono<Void> dispatchNewQuota(QuotaRoot quotaRoot, Username username) {
+        Mono<Quota<QuotaCountLimit, QuotaCountUsage>> messageQuota = Mono.fromCallable(() -> quotaManager.getMessageQuota(quotaRoot));
+        Mono<Quota<QuotaSizeLimit, QuotaSizeUsage>> storageQuota = Mono.fromCallable(() -> quotaManager.getStorageQuota(quotaRoot));
+
+        Mono<Tuple2<Quota<QuotaCountLimit, QuotaCountUsage>, Quota<QuotaSizeLimit, QuotaSizeUsage>>> quotasMono =
+            messageQuota.zipWith(storageQuota)
+                .subscribeOn(Schedulers.elastic());
+
+        return quotasMono
+            .flatMap(quotas -> eventBus.dispatch(
+                EventFactory.quotaUpdated()
+                    .randomEventId()
+                    .user(username)
+                    .quotaRoot(quotaRoot)
+                    .quotaCount(quotas.getT1())
+                    .quotaSize(quotas.getT2())
+                    .instant(Instant.now())
+                    .build(),
+                NO_REGISTRATION_KEYS));
     }
 
-    private Optional<QuotaOperation> computeQuotaOperation(MetaDataHoldingEvent metaDataHoldingEvent, QuotaRoot quotaRoot) {
+    private Mono<QuotaOperation> computeQuotaOperation(MetaDataHoldingEvent metaDataHoldingEvent, QuotaRoot quotaRoot) {
         long size = totalSize(metaDataHoldingEvent);
         long count = Integer.toUnsignedLong(metaDataHoldingEvent.getUids().size());
 
         if (count != 0 && size != 0) {
-            return Optional.of(new QuotaOperation(quotaRoot, QuotaCountUsage.count(count), QuotaSizeUsage.size(size)));
+            return Mono.just(new QuotaOperation(quotaRoot, QuotaCountUsage.count(count), QuotaSizeUsage.size(size)));
         }
-        return Optional.empty();
+        return Mono.empty();
     }
 
     private long totalSize(MetaDataHoldingEvent metaDataHoldingEvent) {
@@ -141,14 +152,14 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
             .sum();
     }
 
-    private void handleMailboxDeletionEvent(MailboxDeletion mailboxDeletionEvent) throws MailboxException {
+    private Mono<Void> handleMailboxDeletionEvent(MailboxDeletion mailboxDeletionEvent) throws MailboxException {
         boolean mailboxContainedMessages = mailboxDeletionEvent.getDeletedMessageCount().asLong() > 0;
         if (mailboxContainedMessages) {
-            Mono.from(currentQuotaManager.decrease(new QuotaOperation(mailboxDeletionEvent.getQuotaRoot(),
+            return Mono.from(currentQuotaManager.decrease(new QuotaOperation(mailboxDeletionEvent.getQuotaRoot(),
                     mailboxDeletionEvent.getDeletedMessageCount(),
-                    mailboxDeletionEvent.getTotalDeletedSize())))
-                .block();
+                    mailboxDeletionEvent.getTotalDeletedSize())));
         }
+        return Mono.empty();
     }
 
 }
\ No newline at end of file
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java
index 61e7f8f..5d0e42b 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java
@@ -65,7 +65,7 @@ public class CassandraQuotaModule extends AbstractModule {
 
         bind(ListeningCurrentQuotaUpdater.class).in(Scopes.SINGLETON);
         bind(QuotaUpdater.class).to(ListeningCurrentQuotaUpdater.class);
-        Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class)
+        Multibinder.newSetBinder(binder(), MailboxListener.ReactiveGroupMailboxListener.class)
             .addBinding()
             .to(ListeningCurrentQuotaUpdater.class);
     }
diff --git a/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java b/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java
index c8a4c31..6b16549 100644
--- a/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java
+++ b/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java
@@ -55,7 +55,7 @@ public class JpaQuotaModule extends AbstractModule {
 
         bind(ListeningCurrentQuotaUpdater.class).in(Scopes.SINGLETON);
         bind(QuotaUpdater.class).to(ListeningCurrentQuotaUpdater.class);
-        Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class)
+        Multibinder.newSetBinder(binder(), MailboxListener.ReactiveGroupMailboxListener.class)
             .addBinding()
             .to(ListeningCurrentQuotaUpdater.class);
     }
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java
index f4e3b33..3e49d3e 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java
@@ -55,7 +55,7 @@ public class MemoryQuotaModule extends AbstractModule {
 
         bind(ListeningCurrentQuotaUpdater.class).in(Scopes.SINGLETON);
         bind(QuotaUpdater.class).to(ListeningCurrentQuotaUpdater.class);
-        Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class)
+        Multibinder.newSetBinder(binder(), MailboxListener.ReactiveGroupMailboxListener.class)
             .addBinding()
             .to(ListeningCurrentQuotaUpdater.class);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org