You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:16:19 UTC

[james-project] 22/29: [REACTOR] CassandraUserMailboxRightsDAO should merge flux on updates

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4a681409bfa7ca403d10c4deb1db7b5b5d63fac6
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Feb 27 14:39:15 2020 +0700

    [REACTOR] CassandraUserMailboxRightsDAO should merge flux on updates
    
    Before needless synchronisation was performed, waiting for all insert to
    complete, all delete to complete, and all insert
    
    Now merge can eagerly subscribe each streams insides.
---
 .../cassandra/mail/CassandraUserMailboxRightsDAO.java    | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
index 127b5be..7586c96 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUserMailboxRightsDAO.java
@@ -101,29 +101,27 @@ public class CassandraUserMailboxRightsDAO {
     public Mono<Void> update(CassandraId cassandraId, ACLDiff aclDiff) {
         PositiveUserACLDiff userACLDiff = new PositiveUserACLDiff(aclDiff);
         return Flux.merge(
-            addAll(cassandraId, userACLDiff.addedEntries()),
-            removeAll(cassandraId, userACLDiff.removedEntries()),
-            addAll(cassandraId, userACLDiff.changedEntries()))
+                addAll(cassandraId, userACLDiff.addedEntries()),
+                removeAll(cassandraId, userACLDiff.removedEntries()),
+                addAll(cassandraId, userACLDiff.changedEntries()))
             .then();
     }
 
-    private Mono<Void> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) {
+    private Flux<Void> removeAll(CassandraId cassandraId, Stream<MailboxACL.Entry> removedEntries) {
         return Flux.fromStream(removedEntries)
             .flatMap(entry -> cassandraAsyncExecutor.executeVoid(
                 delete.bind()
                     .setString(USER_NAME, entry.getKey().getName())
-                    .setUUID(MAILBOX_ID, cassandraId.asUuid())))
-            .then();
+                    .setUUID(MAILBOX_ID, cassandraId.asUuid())));
     }
 
-    private Mono<Void> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) {
+    private Flux<Void> addAll(CassandraId cassandraId, Stream<MailboxACL.Entry> addedEntries) {
         return Flux.fromStream(addedEntries)
             .flatMap(entry -> cassandraAsyncExecutor.executeVoid(
                 insert.bind()
                     .setString(USER_NAME, entry.getKey().getName())
                     .setUUID(MAILBOX_ID, cassandraId.asUuid())
-                    .setString(RIGHTS, entry.getValue().serialize())))
-            .then();
+                    .setString(RIGHTS, entry.getValue().serialize())));
     }
 
     public Mono<Optional<Rfc4314Rights>> retrieve(Username userName, CassandraId mailboxId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org