You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/08 00:40:02 UTC
[james-project] 02/08: JAMES-3575 MessageIdManager::delete should
be reactive
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit da19351ec92eecb9d2eedc0b80d5539c46ecd5fa
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 1 22:19:23 2021 +0700
JAMES-3575 MessageIdManager::delete should be reactive
End to end reactive chain for JMAP RFC-8621
deletes...
---
.../org/apache/james/mailbox/MessageIdManager.java | 8 +++--
.../james/vault/DeletedMessageVaultHookTest.java | 6 ++--
.../james/mailbox/store/StoreMessageIdManager.java | 38 +++++++++++++---------
.../store/AbstractCombinationManagerTest.java | 6 +++-
.../AbstractMessageIdManagerSideEffectTest.java | 7 ++--
.../store/AbstractMessageIdManagerStorageTest.java | 2 +-
.../methods/SetMessagesDestructionProcessor.java | 7 +++-
.../jmap/method/EmailSetDeletePerformer.scala | 2 +-
8 files changed, 51 insertions(+), 25 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 26a53cb..6fef5d4 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -38,6 +38,8 @@ import org.reactivestreams.Publisher;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public interface MessageIdManager {
default Publisher<ComposedMessageIdWithMetaData> messageMetadata(MessageId id, MailboxSession session) {
@@ -62,7 +64,7 @@ public interface MessageIdManager {
DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
- DeleteResult delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException;
+ Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException;
void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
@@ -71,7 +73,9 @@ public interface MessageIdManager {
}
default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) throws MailboxException {
- return delete(ImmutableList.of(messageId), mailboxSession);
+ return Mono.from(delete(ImmutableList.of(messageId), mailboxSession))
+ .subscribeOn(Schedulers.elastic())
+ .block();
}
}
diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 7251a98..79e919d 100644
--- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -65,6 +65,8 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
class DeletedMessageVaultHookTest {
@@ -159,7 +161,7 @@ class DeletedMessageVaultHookTest {
MessageId messageId = composedId.getMessageId();
long messageSize = messageSize(messageManager, composedId);
- messageIdManager.delete(ImmutableList.of(messageId), aliceSession);
+ messageIdManager.delete(messageId, aliceSession);
DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize);
assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).blockFirst())
@@ -175,7 +177,7 @@ class DeletedMessageVaultHookTest {
.mapToObj(Throwing.intFunction(i -> appendMessage(messageManager).getMessageId()))
.collect(Guavate.toImmutableList());
- assertThatCode(() -> messageIdManager.delete(ids, aliceSession))
+ assertThatCode(() -> Mono.from(messageIdManager.delete(ids, aliceSession)).subscribeOn(Schedulers.elastic()).block())
.doesNotThrowAnyException();
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index d34b4a7..116e91b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -187,10 +187,14 @@ public class StoreMessageIdManager implements MessageIdManager {
}
private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) throws MailboxException {
- return MailboxReactorUtils.block(Flux.fromStream(idList)
+ return MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, rights));
+ }
+
+ private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
+ return Flux.fromStream(idList)
.distinct()
.filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY)
- .collect(Guavate.toImmutableSet()));
+ .collect(Guavate.toImmutableSet());
}
@Override
@@ -215,14 +219,21 @@ public class StoreMessageIdManager implements MessageIdManager {
}
@Override
- public DeleteResult delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
+ public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
- List<MailboxMessage> messageList = messageIdMapper.find(messageIds, MessageMapper.FetchType.Metadata);
- ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, messageList
- .stream()
- .map(MailboxMessage::getMailboxId), Right.DeleteMessages);
+ return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata)
+ .collectList()
+ .flatMap(messageList ->
+ getAllowedMailboxIdsReactive(mailboxSession,
+ messageList
+ .stream()
+ .map(MailboxMessage::getMailboxId),
+ Right.DeleteMessages)
+ .flatMap(allowedMailboxIds -> deleteInAllowedMailboxes(messageIds, mailboxSession, messageIdMapper, messageList, allowedMailboxIds)));
+ }
+ private Mono<DeleteResult> deleteInAllowedMailboxes(List<MessageId> messageIds, MailboxSession mailboxSession, MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, ImmutableSet<MailboxId> allowedMailboxIds) {
List<MailboxMessage> accessibleMessages = messageList.stream()
.filter(message -> allowedMailboxIds.contains(message.getMailboxId()))
.collect(Guavate.toImmutableList());
@@ -232,14 +243,11 @@ public class StoreMessageIdManager implements MessageIdManager {
.collect(Guavate.toImmutableSet());
Sets.SetView<MessageId> nonAccessibleMessages = Sets.difference(ImmutableSet.copyOf(messageIds), accessibleMessageIds);
- deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession)
- .subscribeOn(Schedulers.elastic())
- .block();
-
- return DeleteResult.builder()
- .addDestroyed(accessibleMessageIds)
- .addNotFound(nonAccessibleMessages)
- .build();
+ return deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession)
+ .thenReturn(DeleteResult.builder()
+ .addDestroyed(accessibleMessageIds)
+ .addNotFound(nonAccessibleMessages)
+ .build());
}
private Mono<Void> deleteWithPreHooks(MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, MailboxSession mailboxSession) {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index bdddff1..56c1ba3 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -53,6 +53,8 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public abstract class AbstractCombinationManagerTest {
@@ -534,7 +536,9 @@ public abstract class AbstractCombinationManagerTest {
.appendMessage(MessageManager.AppendCommand.from(mailContent), session)
.getId().getMessageId();
- messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session);
+ Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session))
+ .subscribeOn(Schedulers.elastic())
+ .block();
SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
assertThat(Flux.from(messageManager1.search(searchQuery, session)).toStream()).isEmpty();
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
index 7a1d087..a6011de 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
@@ -83,6 +83,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public abstract class AbstractMessageIdManagerSideEffectTest {
private static final Quota<QuotaCountLimit, QuotaCountUsage> OVER_QUOTA = Quota.<QuotaCountLimit, QuotaCountUsage>builder()
@@ -169,7 +170,9 @@ public abstract class AbstractMessageIdManagerSideEffectTest {
MessageMetaData simpleMessageMetaData2 = messageResult2.messageMetaData();
eventBus.register(eventCollector);
- messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session);
+ Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session))
+ .subscribeOn(Schedulers.elastic())
+ .block();
AbstractListAssert<?, List<? extends Expunged>, Expunged, ObjectAssert<Expunged>> events =
assertThat(eventCollector.getEvents())
@@ -523,7 +526,7 @@ public abstract class AbstractMessageIdManagerSideEffectTest {
MessageId messageId = testingData.createNotUsedMessageId();
eventBus.register(eventCollector);
- messageIdManager.delete(ImmutableList.of(messageId), session);
+ messageIdManager.delete(messageId, session);
assertThat(eventCollector.getEvents()).isEmpty();
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
index d9beb5b..a154ae9 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
@@ -363,7 +363,7 @@ public abstract class AbstractMessageIdManagerStorageTest {
void deleteAllShouldReturnNotDeleteWhenDeletingOnOtherSession() throws Exception {
MessageId messageId = testingData.persist(bobMailbox1.getMailboxId(), messageUid1, FLAGS, bobSession);
- messageIdManager.delete(ImmutableList.of(messageId), aliceSession);
+ messageIdManager.delete(messageId, aliceSession);
assertThat(messageIdManager.getMessage(messageId, FetchGroup.MINIMAL, bobSession)).hasSize(1);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
index a961a73..5a90a6b 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
@@ -40,6 +40,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SetMessagesCreationProcessor.class);
@@ -70,7 +73,9 @@ public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
if (toBeDestroyed.isEmpty()) {
return Stream.empty();
}
- DeleteResult deleteResult = messageIdManager.delete(toBeDestroyed, mailboxSession);
+ DeleteResult deleteResult = Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
+ .subscribeOn(Schedulers.elastic())
+ .block();
Stream<SetMessagesResponse> destroyed = deleteResult.getDestroyed().stream()
.map(messageId -> SetMessagesResponse.builder().destroyed(messageId).build());
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
index 14f106a..6908f7c 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
@@ -85,7 +85,7 @@ class EmailSetDeletePerformer @Inject()(messageIdManager: MessageIdManager,
case _ => None
}
- SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
+ SMono(messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
.map(DestroyResult.from)
.subscribeOn(Schedulers.elastic())
.onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e))))
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org