You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/05 07:51:02 UTC
[james-project] 03/07: JAMES-3155 ListeningCurrentQuotaUpdater can
be reactive
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9183c5f83bb009d14b8ad3f1af3ca0b513bbf29d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Apr 25 16:58:54 2020 +0700
JAMES-3155 ListeningCurrentQuotaUpdater can be reactive
---
.../store/quota/ListeningCurrentQuotaUpdater.java | 99 ++++++++++++----------
.../modules/mailbox/CassandraQuotaModule.java | 2 +-
.../james/modules/mailbox/JpaQuotaModule.java | 2 +-
.../james/modules/mailbox/MemoryQuotaModule.java | 2 +-
4 files changed, 58 insertions(+), 47 deletions(-)
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
index ac76eb6..f564242 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
@@ -19,11 +19,13 @@
package org.apache.james.mailbox.store.quota;
import java.time.Instant;
-import java.util.Optional;
import javax.inject.Inject;
+import org.apache.james.core.Username;
+import org.apache.james.core.quota.QuotaCountLimit;
import org.apache.james.core.quota.QuotaCountUsage;
+import org.apache.james.core.quota.QuotaSizeLimit;
import org.apache.james.core.quota.QuotaSizeUsage;
import org.apache.james.mailbox.events.Event;
import org.apache.james.mailbox.events.EventBus;
@@ -31,19 +33,22 @@ import org.apache.james.mailbox.events.Group;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.mailbox.events.RegistrationKey;
import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.Quota;
import org.apache.james.mailbox.model.QuotaOperation;
import org.apache.james.mailbox.model.QuotaRoot;
import org.apache.james.mailbox.quota.CurrentQuotaManager;
import org.apache.james.mailbox.quota.QuotaManager;
import org.apache.james.mailbox.quota.QuotaRootResolver;
import org.apache.james.mailbox.store.event.EventFactory;
+import org.reactivestreams.Publisher;
-import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
-public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailboxListener, QuotaUpdater {
+public class ListeningCurrentQuotaUpdater implements MailboxListener.ReactiveGroupMailboxListener, QuotaUpdater {
public static class ListeningCurrentQuotaUpdaterGroup extends Group {
}
@@ -75,63 +80,69 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
}
@Override
- public void event(Event event) throws MailboxException {
+ public Publisher<Void> reactiveEvent(Event event) {
+ try {
if (event instanceof Added) {
Added addedEvent = (Added) event;
QuotaRoot quotaRoot = quotaRootResolver.getQuotaRoot(addedEvent.getMailboxId());
- handleAddedEvent(addedEvent, quotaRoot);
+ return handleAddedEvent(addedEvent, quotaRoot);
} else if (event instanceof Expunged) {
Expunged expungedEvent = (Expunged) event;
QuotaRoot quotaRoot = quotaRootResolver.getQuotaRoot(expungedEvent.getMailboxId());
- handleExpungedEvent(expungedEvent, quotaRoot);
+ return handleExpungedEvent(expungedEvent, quotaRoot);
} else if (event instanceof MailboxDeletion) {
MailboxDeletion mailboxDeletionEvent = (MailboxDeletion) event;
- handleMailboxDeletionEvent(mailboxDeletionEvent);
+ return handleMailboxDeletionEvent(mailboxDeletionEvent);
}
+ return Mono.empty();
+ } catch (MailboxException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<Void> handleExpungedEvent(Expunged expunged, QuotaRoot quotaRoot) {
+ return computeQuotaOperation(expunged, quotaRoot)
+ .flatMap(quotaOperation ->
+ Mono.from(currentQuotaManager.decrease(quotaOperation))
+ .then(dispatchNewQuota(quotaRoot, expunged.getUsername())));
}
- private void handleExpungedEvent(Expunged expunged, QuotaRoot quotaRoot) {
- computeQuotaOperation(expunged, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
- Mono.from(currentQuotaManager.decrease(quotaOperation))
- .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
- EventFactory.quotaUpdated()
- .randomEventId()
- .user(expunged.getUsername())
- .quotaRoot(quotaRoot)
- .quotaCount(quotaManager.getMessageQuota(quotaRoot))
- .quotaSize(quotaManager.getStorageQuota(quotaRoot))
- .instant(Instant.now())
- .build(),
- NO_REGISTRATION_KEYS)).sneakyThrow()))
- .block();
- }).sneakyThrow());
+ private Mono<Void> handleAddedEvent(Added added, QuotaRoot quotaRoot) {
+ return computeQuotaOperation(added, quotaRoot)
+ .flatMap(quotaOperation ->
+ Mono.from(currentQuotaManager.increase(quotaOperation))
+ .then(dispatchNewQuota(quotaRoot, added.getUsername())));
}
- private void handleAddedEvent(Added added, QuotaRoot quotaRoot) {
- computeQuotaOperation(added, quotaRoot).ifPresent(Throwing.<QuotaOperation>consumer(quotaOperation -> {
- Mono.from(currentQuotaManager.increase(quotaOperation))
- .then(Mono.defer(Throwing.supplier(() -> eventBus.dispatch(
- EventFactory.quotaUpdated()
- .randomEventId()
- .user(added.getUsername())
- .quotaRoot(quotaRoot)
- .quotaCount(quotaManager.getMessageQuota(quotaRoot))
- .quotaSize(quotaManager.getStorageQuota(quotaRoot))
- .instant(Instant.now())
- .build(),
- NO_REGISTRATION_KEYS)).sneakyThrow()))
- .block();
- }).sneakyThrow());
+ private Mono<Void> dispatchNewQuota(QuotaRoot quotaRoot, Username username) {
+ Mono<Quota<QuotaCountLimit, QuotaCountUsage>> messageQuota = Mono.fromCallable(() -> quotaManager.getMessageQuota(quotaRoot));
+ Mono<Quota<QuotaSizeLimit, QuotaSizeUsage>> storageQuota = Mono.fromCallable(() -> quotaManager.getStorageQuota(quotaRoot));
+
+ Mono<Tuple2<Quota<QuotaCountLimit, QuotaCountUsage>, Quota<QuotaSizeLimit, QuotaSizeUsage>>> quotasMono =
+ messageQuota.zipWith(storageQuota)
+ .subscribeOn(Schedulers.elastic());
+
+ return quotasMono
+ .flatMap(quotas -> eventBus.dispatch(
+ EventFactory.quotaUpdated()
+ .randomEventId()
+ .user(username)
+ .quotaRoot(quotaRoot)
+ .quotaCount(quotas.getT1())
+ .quotaSize(quotas.getT2())
+ .instant(Instant.now())
+ .build(),
+ NO_REGISTRATION_KEYS));
}
- private Optional<QuotaOperation> computeQuotaOperation(MetaDataHoldingEvent metaDataHoldingEvent, QuotaRoot quotaRoot) {
+ private Mono<QuotaOperation> computeQuotaOperation(MetaDataHoldingEvent metaDataHoldingEvent, QuotaRoot quotaRoot) {
long size = totalSize(metaDataHoldingEvent);
long count = Integer.toUnsignedLong(metaDataHoldingEvent.getUids().size());
if (count != 0 && size != 0) {
- return Optional.of(new QuotaOperation(quotaRoot, QuotaCountUsage.count(count), QuotaSizeUsage.size(size)));
+ return Mono.just(new QuotaOperation(quotaRoot, QuotaCountUsage.count(count), QuotaSizeUsage.size(size)));
}
- return Optional.empty();
+ return Mono.empty();
}
private long totalSize(MetaDataHoldingEvent metaDataHoldingEvent) {
@@ -141,14 +152,14 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
.sum();
}
- private void handleMailboxDeletionEvent(MailboxDeletion mailboxDeletionEvent) throws MailboxException {
+ private Mono<Void> handleMailboxDeletionEvent(MailboxDeletion mailboxDeletionEvent) throws MailboxException {
boolean mailboxContainedMessages = mailboxDeletionEvent.getDeletedMessageCount().asLong() > 0;
if (mailboxContainedMessages) {
- Mono.from(currentQuotaManager.decrease(new QuotaOperation(mailboxDeletionEvent.getQuotaRoot(),
+ return Mono.from(currentQuotaManager.decrease(new QuotaOperation(mailboxDeletionEvent.getQuotaRoot(),
mailboxDeletionEvent.getDeletedMessageCount(),
- mailboxDeletionEvent.getTotalDeletedSize())))
- .block();
+ mailboxDeletionEvent.getTotalDeletedSize())));
}
+ return Mono.empty();
}
}
\ No newline at end of file
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java
index 61e7f8f..5d0e42b 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraQuotaModule.java
@@ -65,7 +65,7 @@ public class CassandraQuotaModule extends AbstractModule {
bind(ListeningCurrentQuotaUpdater.class).in(Scopes.SINGLETON);
bind(QuotaUpdater.class).to(ListeningCurrentQuotaUpdater.class);
- Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class)
+ Multibinder.newSetBinder(binder(), MailboxListener.ReactiveGroupMailboxListener.class)
.addBinding()
.to(ListeningCurrentQuotaUpdater.class);
}
diff --git a/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java b/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java
index c8a4c31..6b16549 100644
--- a/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java
+++ b/server/container/guice/jpa-guice/src/main/java/org/apache/james/modules/mailbox/JpaQuotaModule.java
@@ -55,7 +55,7 @@ public class JpaQuotaModule extends AbstractModule {
bind(ListeningCurrentQuotaUpdater.class).in(Scopes.SINGLETON);
bind(QuotaUpdater.class).to(ListeningCurrentQuotaUpdater.class);
- Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class)
+ Multibinder.newSetBinder(binder(), MailboxListener.ReactiveGroupMailboxListener.class)
.addBinding()
.to(ListeningCurrentQuotaUpdater.class);
}
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java
index f4e3b33..3e49d3e 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/modules/mailbox/MemoryQuotaModule.java
@@ -55,7 +55,7 @@ public class MemoryQuotaModule extends AbstractModule {
bind(ListeningCurrentQuotaUpdater.class).in(Scopes.SINGLETON);
bind(QuotaUpdater.class).to(ListeningCurrentQuotaUpdater.class);
- Multibinder.newSetBinder(binder(), MailboxListener.GroupMailboxListener.class)
+ Multibinder.newSetBinder(binder(), MailboxListener.ReactiveGroupMailboxListener.class)
.addBinding()
.to(ListeningCurrentQuotaUpdater.class);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org