You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/01/17 06:50:23 UTC
[07/27] james-project git commit: JAMES-2641 EventBus caller should
block to await dispatching to be done
JAMES-2641 EventBus caller should block to await dispatching to be done
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/24fe28f0
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/24fe28f0
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/24fe28f0
Branch: refs/heads/master
Commit: 24fe28f024c2989cd7ce5584359a5b93c537872b
Parents: 4c14a75
Author: Benoit Tellier <bt...@linagora.com>
Authored: Thu Jan 10 16:29:53 2019 +0700
Committer: Benoit Tellier <bt...@linagora.com>
Committed: Thu Jan 17 10:23:41 2019 +0700
----------------------------------------------------------------------
.../QuotaThresholdListenersTestSystem.java | 2 +-
.../mailbox/store/StoreMailboxManager.java | 12 ++-
.../mailbox/store/StoreMessageIdManager.java | 36 +++++---
.../mailbox/store/StoreMessageManager.java | 93 +++++++++++---------
.../james/mailbox/store/StoreRightManager.java | 6 +-
.../quota/ListeningCurrentQuotaUpdater.java | 6 +-
6 files changed, 91 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
----------------------------------------------------------------------
diff --git a/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java b/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
index c399025..ae4e936 100644
--- a/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
+++ b/mailbox/plugin/quota-mailing/src/test/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdListenersTestSystem.java
@@ -53,6 +53,6 @@ class QuotaThresholdListenersTestSystem {
}
void event(Event event) {
- eventBus.dispatch(event, NO_KEYS);
+ eventBus.dispatch(event, NO_KEYS).block();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index df08dee..7e46527 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -348,7 +348,8 @@ public class StoreMailboxManager implements MailboxManager {
.mailboxSession(mailboxSession)
.mailbox(m)
.build(),
- new MailboxIdRegistrationKey(m.getMailboxId()));
+ new MailboxIdRegistrationKey(m.getMailboxId()))
+ .block();
} catch (MailboxExistsException e) {
LOGGER.info("{} mailbox was created concurrently", m.generateAssociatedPath());
}
@@ -400,7 +401,8 @@ public class StoreMailboxManager implements MailboxManager {
.quotaCount(QuotaCount.count(messageCount))
.quotaSize(QuotaSize.size(totalSize))
.build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .block();
return m;
});
@@ -446,7 +448,8 @@ public class StoreMailboxManager implements MailboxManager {
.oldPath(from)
.newPath(to)
.build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .block();
// rename submailboxes
MailboxPath children = new MailboxPath(from.getNamespace(), from.getUser(), from.getName() + getDelimiter() + "%");
@@ -465,7 +468,8 @@ public class StoreMailboxManager implements MailboxManager {
.oldPath(fromPath)
.newPath(sub.generateAssociatedPath())
.build(),
- new MailboxIdRegistrationKey(sub.getMailboxId()));
+ new MailboxIdRegistrationKey(sub.getMailboxId()))
+ .block();
LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName);
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 04478cd..55a82fe 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -75,6 +75,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
public class StoreMessageIdManager implements MessageIdManager {
private static class MetadataWithMailboxId {
@@ -218,15 +221,18 @@ public class StoreMessageIdManager implements MessageIdManager {
MailboxMessage::getMailboxId)));
MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
- for (MetadataWithMailboxId metadataWithMailboxId : metadataWithMailbox) {
- eventBus.dispatch(EventFactory.expunged()
- .randomEventId()
- .mailboxSession(mailboxSession)
- .mailbox(mailboxMapper.findMailboxById(metadataWithMailboxId.mailboxId))
- .addMetaData(metadataWithMailboxId.messageMetaData)
- .build(),
- new MailboxIdRegistrationKey(metadataWithMailboxId.mailboxId));
- }
+ Flux.fromIterable(metadataWithMailbox)
+ .flatMap(Throwing.<StoreMessageIdManager.MetadataWithMailboxId, Mono<Void>>function(
+ metadataWithMailboxId -> eventBus.dispatch(EventFactory.expunged()
+ .randomEventId()
+ .mailboxSession(mailboxSession)
+ .mailbox(mailboxMapper.findMailboxById(metadataWithMailboxId.mailboxId))
+ .addMetaData(metadataWithMailboxId.messageMetaData)
+ .build(),
+ new MailboxIdRegistrationKey(metadataWithMailboxId.mailboxId)))
+ .sneakyThrow())
+ .then()
+ .block();
}
@Override
@@ -295,7 +301,8 @@ public class StoreMessageIdManager implements MessageIdManager {
.build(),
messageMoves.impactedMailboxIds()
.map(MailboxIdRegistrationKey::new)
- .collect(Guavate.toImmutableSet()));
+ .collect(Guavate.toImmutableSet()))
+ .block();
}
private void removeMessageFromMailboxes(MailboxMessage message, Set<MailboxId> mailboxesToRemove, MailboxSession mailboxSession) throws MailboxException {
@@ -311,7 +318,8 @@ public class StoreMessageIdManager implements MessageIdManager {
.mailbox(mailboxMapper.findMailboxById(mailboxId))
.addMetaData(eventPayload)
.build(),
- new MailboxIdRegistrationKey(mailboxId));
+ new MailboxIdRegistrationKey(mailboxId))
+ .block();
}
}
@@ -329,7 +337,8 @@ public class StoreMessageIdManager implements MessageIdManager {
.mailbox(mailbox)
.updatedFlag(updatedFlags)
.build(),
- new MailboxIdRegistrationKey(mailboxId));
+ new MailboxIdRegistrationKey(mailboxId))
+ .block();
}
}
@@ -397,7 +406,8 @@ public class StoreMessageIdManager implements MessageIdManager {
.mailbox(mailboxMapper.findMailboxById(mailboxId))
.addMessage(copy)
.build(),
- new MailboxIdRegistrationKey(mailboxId));
+ new MailboxIdRegistrationKey(mailboxId))
+ .block();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index e38a291..1b4b20e 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -96,6 +96,8 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
+import reactor.core.publisher.Flux;
+
/**
* Base class for {@link org.apache.james.mailbox.MessageManager}
* implementations.
@@ -264,7 +266,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
.mailbox(getMailboxEntity())
.metaData(ImmutableSortedMap.copyOf(uids))
.build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .block();
return uids.keySet().iterator();
}
@@ -412,7 +415,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
.mailbox(mailbox)
.addMessage(copy)
.build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .block();
return new ComposedMessageId(mailbox.getMailboxId(), data.getMessageId(), data.getUid());
}, true);
}
@@ -575,7 +579,8 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
.mailbox(getMailboxEntity())
.updatedFlags(updatedFlags)
.build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .block();
return updatedFlags.stream().collect(Guavate.toImmutableMap(
UpdatedFlags::getUid,
@@ -734,22 +739,24 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
messageIds.add(message.getMessageId());
}
- eventBus.dispatch(EventFactory.added()
- .randomEventId()
- .mailboxSession(session)
- .mailbox(to.getMailboxEntity())
- .metaData(copiedUids)
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
- eventBus.dispatch(EventFactory.moved()
- .session(session)
- .messageMoves(MessageMoves.builder()
- .previousMailboxIds(getMailboxEntity().getMailboxId())
- .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId())
- .build())
- .messageId(messageIds.build())
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ Flux.merge(
+ eventBus.dispatch(EventFactory.added()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(to.getMailboxEntity())
+ .metaData(copiedUids)
+ .build(),
+ new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
+ eventBus.dispatch(EventFactory.moved()
+ .session(session)
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+ .targetMailboxIds(to.getMailboxEntity().getMailboxId(), getMailboxEntity().getMailboxId())
+ .build())
+ .messageId(messageIds.build())
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId())))
+ .then().block();
return copiedUids;
}
@@ -765,29 +772,31 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana
messageIds.add(message.getMessageId());
}
- eventBus.dispatch(EventFactory.added()
- .randomEventId()
- .mailboxSession(session)
- .mailbox(to.getMailboxEntity())
- .metaData(moveUids)
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
- eventBus.dispatch(EventFactory.expunged()
- .randomEventId()
- .mailboxSession(session)
- .mailbox(getMailboxEntity())
- .addMetaData(moveResult.getOriginalMessages())
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
- eventBus.dispatch(EventFactory.moved()
- .messageMoves(MessageMoves.builder()
- .previousMailboxIds(getMailboxEntity().getMailboxId())
- .targetMailboxIds(to.getMailboxEntity().getMailboxId())
- .build())
- .messageId(messageIds.build())
- .session(session)
- .build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ Flux.merge(
+ eventBus.dispatch(EventFactory.added()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(to.getMailboxEntity())
+ .metaData(moveUids)
+ .build(),
+ new MailboxIdRegistrationKey(to.getMailboxEntity().getMailboxId())),
+ eventBus.dispatch(EventFactory.expunged()
+ .randomEventId()
+ .mailboxSession(session)
+ .mailbox(getMailboxEntity())
+ .addMetaData(moveResult.getOriginalMessages())
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId())),
+ eventBus.dispatch(EventFactory.moved()
+ .messageMoves(MessageMoves.builder()
+ .previousMailboxIds(getMailboxEntity().getMailboxId())
+ .targetMailboxIds(to.getMailboxEntity().getMailboxId())
+ .build())
+ .messageId(messageIds.build())
+ .session(session)
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId())))
+ .then().block();
return moveUids;
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
index 25a8bdc..6cb00f4 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
@@ -146,7 +146,8 @@ public class StoreRightManager implements RightManager {
.mailbox(mailbox)
.aclDiff(aclDiff)
.build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .block();
}
private void assertSharesBelongsToUserDomain(String user, ACLCommand mailboxACLCommand) throws DifferentDomainException {
@@ -230,7 +231,8 @@ public class StoreRightManager implements RightManager {
.mailbox(mailbox)
.aclDiff(aclDiff)
.build(),
- new MailboxIdRegistrationKey(mailbox.getMailboxId()));
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .block();
}
/**
http://git-wip-us.apache.org/repos/asf/james-project/blob/24fe28f0/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
----------------------------------------------------------------------
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
index 272ced8..26fea63 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/ListeningCurrentQuotaUpdater.java
@@ -105,7 +105,8 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
.quotaSize(quotaManager.getStorageQuota(quotaRoot))
.instant(Instant.now())
.build(),
- NO_REGISTRATION_KEYS);
+ NO_REGISTRATION_KEYS)
+ .block();
}
private void handleAddedEvent(Added added, QuotaRoot quotaRoot) throws MailboxException {
@@ -128,7 +129,8 @@ public class ListeningCurrentQuotaUpdater implements MailboxListener.GroupMailbo
.quotaSize(quotaManager.getStorageQuota(quotaRoot))
.instant(Instant.now())
.build(),
- NO_REGISTRATION_KEYS);
+ NO_REGISTRATION_KEYS)
+ .block();
}
private void handleMailboxDeletionEvent(MailboxDeletion mailboxDeletionEvent) throws MailboxException {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org