You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/17 06:50:23 UTC

[07/27] james-project git commit: JAMES-2641 EventBus caller should block to await dispatching to be done

JAMES-2641 EventBus caller should block to await dispatching to be done


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/24fe28f0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/24fe28f0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/24fe28f0

Branch: refs/heads/master
Commit: 24fe28f024c2989cd7ce5584359a5b93c537872b
Parents: 4c14a75
Author: Benoit Tellier <bt...@linagora.com>
Authored: Thu Jan 10 16:29:53 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Jan 17 10:23:41 2019 +0700

----------------------------------------------------------------------
 .../QuotaThresholdListenersTestSystem.java      |  2 +-
 .../mailbox/store/StoreMailboxManager.java      | 12 ++-
 .../mailbox/store/StoreMessageIdManager.java    | 36 +++++---
 .../mailbox/store/StoreMessageManager.java      | 93 +++++++++++---------
 .../james/mailbox/store/StoreRightManager.java  |  6 +-
 .../quota/ListeningCurrentQuotaUpdater.java     |  6 +-
 6 files changed, 91 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java b/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
index c399025..ae4e936 100644
--- a/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
+++ b/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
@@ -53,6 +53,6 @@ class QuotaThresholdListenersTestSystem {
     }
 
     void event(Event event) {
-        eventBus.dispatch(event, NO_KEYS);
+        eventBus.dispatch(event, NO_KEYS).block();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index df08dee..7e46527 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -348,7 +348,8 @@ public class StoreMailboxManager implements MailboxManager {
                                 .mailboxSession(mailboxSession)
                                 .mailbox(m)
                                 .build(),
-                                new MailboxIdRegistrationKey(m.getMailboxId()));
+                                new MailboxIdRegistrationKey(m.getMailboxId()))
+                                .block();
                         } catch (MailboxExistsException e) {
                             LOGGER.info("{} mailbox was created concurrently", m.generateAssociatedPath());
                         }
@@ -400,7 +401,8 @@ public class StoreMailboxManager implements MailboxManager {
                 .quotaCount(QuotaCount.count(messageCount))
                 .quotaSize(QuotaSize.size(totalSize))
                 .build(),
-                new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+                new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                .block();
             return m;
         });
 
@@ -446,7 +448,8 @@ public class StoreMailboxManager implements MailboxManager {
             .oldPath(from)
             .newPath(to)
             .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .block();
 
         // rename submailboxes
         MailboxPath children = new MailboxPath(from.getNamespace(), from.getUser(), from.getName() + getDelimiter() + "%");
@@ -465,7 +468,8 @@ public class StoreMailboxManager implements MailboxManager {
                     .oldPath(fromPath)
                     .newPath(sub.generateAssociatedPath())
                     .build(),
-                    new MailboxIdRegistrationKey(sub.getMailboxId()));
+                    new MailboxIdRegistrationKey(sub.getMailboxId()))
+                    .block();
 
                 LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName);
             }

http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 04478cd..55a82fe 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -75,6 +75,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class StoreMessageIdManager implements MessageIdManager {
 
     private static class MetadataWithMailboxId {
@@ -218,15 +221,18 @@ public class StoreMessageIdManager implements MessageIdManager {
                     MailboxMessage::getMailboxId)));
 
         MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
-        for (MetadataWithMailboxId metadataWithMailboxId : metadataWithMailbox) {
-            eventBus.dispatch(EventFactory.expunged()
-                .randomEventId()
-                .mailboxSession(mailboxSession)
-                .mailbox(mailboxMapper.findMailboxById(metadataWithMailboxId.mailboxId))
-                .addMetaData(metadataWithMailboxId.messageMetaData)
-                .build(),
-                new MailboxIdRegistrationKey(metadataWithMailboxId.mailboxId));
-        }
+        Flux.fromIterable(metadataWithMailbox)
+            .flatMap(Throwing.<StoreMessageIdManager.MetadataWithMailboxId, Mono<Void>>function(
+                metadataWithMailboxId -> eventBus.dispatch(EventFactory.expunged()
+                    .randomEventId()
+                    .mailboxSession(mailboxSession)
+                    .mailbox(mailboxMapper.findMailboxById(metadataWithMailboxId.mailboxId))
+                    .addMetaData(metadataWithMailboxId.messageMetaData)
+                    .build(),
+                new MailboxIdRegistrationKey(metadataWithMailboxId.mailboxId)))
+                .sneakyThrow())
+            .then()
+            .block();
     }
 
     @Override
@@ -295,7 +301,8 @@ public class StoreMessageIdManager implements MessageIdManager {
             .build(),
             messageMoves.impactedMailboxIds()
                 .map(MailboxIdRegistrationKey::new)
-                .collect(Guavate.toImmutableSet()));
+                .collect(Guavate.toImmutableSet()))
+            .block();
     }
 
     private void removeMessageFromMailboxes(MailboxMessage message, Set<MailboxId> mailboxesToRemove, MailboxSession mailboxSession) throws MailboxException {
@@ -311,7 +318,8 @@ public class StoreMessageIdManager implements MessageIdManager {
                 .mailbox(mailboxMapper.findMailboxById(mailboxId))
                 .addMetaData(eventPayload)
                 .build(),
-                new MailboxIdRegistrationKey(mailboxId));
+                new MailboxIdRegistrationKey(mailboxId))
+            .block();
         }
     }
 
@@ -329,7 +337,8 @@ public class StoreMessageIdManager implements MessageIdManager {
                 .mailbox(mailbox)
                 .updatedFlag(updatedFlags)
                 .build(),
-                new MailboxIdRegistrationKey(mailboxId));
+                new MailboxIdRegistrationKey(mailboxId))
+                .block();
         }
     }
 
@@ -397,7 +406,8 @@ public class StoreMessageIdManager implements MessageIdManager {
                 .mailbox(mailboxMapper.findMailboxById(mailboxId))
                 .addMessage(copy)
                 .build(),
-                new MailboxIdRegistrationKey(mailboxId));
+                new MailboxIdRegistrationKey(mailboxId))
+                .block();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index e38a291..1b4b20e 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -96,6 +96,8 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSortedMap;
 
+import reactor.core.publisher.Flux;
+
 /**
  * Base class for {@link org.apache.james.mailbox.MessageManager}
  * implementations.
@@ -264,7 +266,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
             .mailbox(getMailboxEntity())
             .metaData(ImmutableSortedMap.copyOf(uids))
             .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .block();
         return uids.keySet().iterator();
     }
 
@@ -412,7 +415,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
                             .mailbox(mailbox)
                             .addMessage(copy)
                             .build(),
-                            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+                            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+                            .block();
                         return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid());
                     }, true);
                 }
@@ -575,7 +579,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
             .mailbox(getMailboxEntity())
             .updatedFlags(updatedFlags)
             .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .block();
 
         return updatedFlags.stream().collect(Guavate.toImmutableMap(
             UpdatedFlags::getUid,
@@ -734,22 +739,24 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
             messageIds.add(message.getMessageId());
         }
 
-        eventBus.dispatch(EventFactory.added()
-            .randomEventId()
-            .mailboxSession(session)
-            .mailbox(to.getMailboxEntity())
-            .metaData(copiedUids)
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
-        eventBus.dispatch(EventFactory.moved()
-            .session(session)
-            .messageMoves(MessageMoves.builder()
-                .previousMailboxIds(getMailboxEntity().getMailboxId())
-                .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId())
-                .build())
-            .messageId(messageIds.build())
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+        Flux.merge(
+            eventBus.dispatch(EventFactory.added()
+                    .randomEventId()
+                    .mailboxSession(session)
+                    .mailbox(to.getMailboxEntity())
+                    .metaData(copiedUids)
+                    .build(),
+                new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
+            eventBus.dispatch(EventFactory.moved()
+                    .session(session)
+                    .messageMoves(MessageMoves.builder()
+                        .previousMailboxIds(getMailboxEntity().getMailboxId())
+                        .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId())
+                        .build())
+                    .messageId(messageIds.build())
+                    .build(),
+                new MailboxIdRegistrationKey(mailbox.getMailboxId())))
+            .then().block();
 
         return copiedUids;
     }
@@ -765,29 +772,31 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
             messageIds.add(message.getMessageId());
         }
 
-        eventBus.dispatch(EventFactory.added()
-            .randomEventId()
-            .mailboxSession(session)
-            .mailbox(to.getMailboxEntity())
-            .metaData(moveUids)
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
-        eventBus.dispatch(EventFactory.expunged()
-            .randomEventId()
-            .mailboxSession(session)
-            .mailbox(getMailboxEntity())
-            .addMetaData(moveResult.getOriginalMessages())
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
-        eventBus.dispatch(EventFactory.moved()
-            .messageMoves(MessageMoves.builder()
-                .previousMailboxIds(getMailboxEntity().getMailboxId())
-                .targetMailboxIds(to.getMailboxEntity().getMailboxId())
-                .build())
-            .messageId(messageIds.build())
-            .session(session)
-            .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+        Flux.merge(
+            eventBus.dispatch(EventFactory.added()
+                    .randomEventId()
+                    .mailboxSession(session)
+                    .mailbox(to.getMailboxEntity())
+                    .metaData(moveUids)
+                    .build(),
+                new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
+            eventBus.dispatch(EventFactory.expunged()
+                    .randomEventId()
+                    .mailboxSession(session)
+                    .mailbox(getMailboxEntity())
+                    .addMetaData(moveResult.getOriginalMessages())
+                    .build(),
+                new MailboxIdRegistrationKey(mailbox.getMailboxId())),
+            eventBus.dispatch(EventFactory.moved()
+                    .messageMoves(MessageMoves.builder()
+                        .previousMailboxIds(getMailboxEntity().getMailboxId())
+                        .targetMailboxIds(to.getMailboxEntity().getMailboxId())
+                        .build())
+                    .messageId(messageIds.build())
+                    .session(session)
+                    .build(),
+                new MailboxIdRegistrationKey(mailbox.getMailboxId())))
+            .then().block();
 
         return moveUids;
     }

http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
index 25a8bdc..6cb00f4 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
@@ -146,7 +146,8 @@ public class StoreRightManager implements RightManager {
             .mailbox(mailbox)
             .aclDiff(aclDiff)
             .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .block();
     }
 
     private void assertSharesBelongsToUserDomain(String user, ACLCommand mailboxACLCommand) throws DifferentDomainException {
@@ -230,7 +231,8 @@ public class StoreRightManager implements RightManager {
             .mailbox(mailbox)
             .aclDiff(aclDiff)
             .build(),
-            new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .block();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
index 272ced8..26fea63 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
@@ -105,7 +105,8 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
                 .quotaSize(quotaManager.getStorageQuota(quotaRoot))
                 .instant(Instant.now())
                 .build(),
-            NO_REGISTRATION_KEYS);
+            NO_REGISTRATION_KEYS)
+            .block();
     }
 
     private void handleAddedEvent(Added added, QuotaRoot quotaRoot) throws MailboxException {
@@ -128,7 +129,8 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
                 .quotaSize(quotaManager.getStorageQuota(quotaRoot))
                 .instant(Instant.now())
                 .build(),
-            NO_REGISTRATION_KEYS);
+            NO_REGISTRATION_KEYS)
+            .block();
     }
 
     private void handleMailboxDeletionEvent(MailboxDeletion mailboxDeletionEvent) throws MailboxException {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org