You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/08 00:40:02 UTC

[james-project] 02/08: JAMES-3575 MessageIdManager::delete should be reactive

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit da19351ec92eecb9d2eedc0b80d5539c46ecd5fa
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 1 22:19:23 2021 +0700

    JAMES-3575 MessageIdManager::delete should be reactive
    
    End to end reactive chain for JMAP RFC-8621
    deletes...
---
 .../org/apache/james/mailbox/MessageIdManager.java |  8 +++--
 .../james/vault/DeletedMessageVaultHookTest.java   |  6 ++--
 .../james/mailbox/store/StoreMessageIdManager.java | 38 +++++++++++++---------
 .../store/AbstractCombinationManagerTest.java      |  6 +++-
 .../AbstractMessageIdManagerSideEffectTest.java    |  7 ++--
 .../store/AbstractMessageIdManagerStorageTest.java |  2 +-
 .../methods/SetMessagesDestructionProcessor.java   |  7 +++-
 .../jmap/method/EmailSetDeletePerformer.scala      |  2 +-
 8 files changed, 51 insertions(+), 25 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 26a53cb..6fef5d4 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -38,6 +38,8 @@ import org.reactivestreams.Publisher;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface MessageIdManager {
     default Publisher<ComposedMessageIdWithMetaData> messageMetadata(MessageId id, MailboxSession session) {
@@ -62,7 +64,7 @@ public interface MessageIdManager {
 
     DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
-    DeleteResult delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException;
+    Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException;
 
     void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
@@ -71,7 +73,9 @@ public interface MessageIdManager {
     }
 
     default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) throws MailboxException {
-        return delete(ImmutableList.of(messageId), mailboxSession);
+        return Mono.from(delete(ImmutableList.of(messageId), mailboxSession))
+            .subscribeOn(Schedulers.elastic())
+            .block();
     }
 
 }
diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 7251a98..79e919d 100644
--- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -65,6 +65,8 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 class DeletedMessageVaultHookTest {
 
@@ -159,7 +161,7 @@ class DeletedMessageVaultHookTest {
         MessageId messageId = composedId.getMessageId();
         long messageSize = messageSize(messageManager, composedId);
 
-        messageIdManager.delete(ImmutableList.of(messageId), aliceSession);
+        messageIdManager.delete(messageId, aliceSession);
 
         DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize);
         assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).blockFirst())
@@ -175,7 +177,7 @@ class DeletedMessageVaultHookTest {
             .mapToObj(Throwing.intFunction(i -> appendMessage(messageManager).getMessageId()))
             .collect(Guavate.toImmutableList());
 
-        assertThatCode(() -> messageIdManager.delete(ids, aliceSession))
+        assertThatCode(() -> Mono.from(messageIdManager.delete(ids, aliceSession)).subscribeOn(Schedulers.elastic()).block())
             .doesNotThrowAnyException();
     }
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index d34b4a7..116e91b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -187,10 +187,14 @@ public class StoreMessageIdManager implements MessageIdManager {
     }
 
     private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) throws MailboxException {
-        return MailboxReactorUtils.block(Flux.fromStream(idList)
+        return MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, rights));
+    }
+
+    private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
+        return Flux.fromStream(idList)
             .distinct()
             .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY)
-            .collect(Guavate.toImmutableSet()));
+            .collect(Guavate.toImmutableSet());
     }
 
     @Override
@@ -215,14 +219,21 @@ public class StoreMessageIdManager implements MessageIdManager {
     }
 
     @Override
-    public DeleteResult delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
+    public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
-        List<MailboxMessage> messageList = messageIdMapper.find(messageIds, MessageMapper.FetchType.Metadata);
-        ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, messageList
-            .stream()
-            .map(MailboxMessage::getMailboxId), Right.DeleteMessages);
+        return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata)
+            .collectList()
+            .flatMap(messageList ->
+                getAllowedMailboxIdsReactive(mailboxSession,
+                    messageList
+                        .stream()
+                        .map(MailboxMessage::getMailboxId),
+                    Right.DeleteMessages)
+                .flatMap(allowedMailboxIds -> deleteInAllowedMailboxes(messageIds, mailboxSession, messageIdMapper, messageList, allowedMailboxIds)));
+    }
 
+    private Mono<DeleteResult> deleteInAllowedMailboxes(List<MessageId> messageIds, MailboxSession mailboxSession, MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, ImmutableSet<MailboxId> allowedMailboxIds) {
         List<MailboxMessage> accessibleMessages = messageList.stream()
             .filter(message -> allowedMailboxIds.contains(message.getMailboxId()))
             .collect(Guavate.toImmutableList());
@@ -232,14 +243,11 @@ public class StoreMessageIdManager implements MessageIdManager {
             .collect(Guavate.toImmutableSet());
         Sets.SetView<MessageId> nonAccessibleMessages = Sets.difference(ImmutableSet.copyOf(messageIds), accessibleMessageIds);
 
-        deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession)
-            .subscribeOn(Schedulers.elastic())
-            .block();
-
-        return DeleteResult.builder()
-            .addDestroyed(accessibleMessageIds)
-            .addNotFound(nonAccessibleMessages)
-            .build();
+        return deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession)
+            .thenReturn(DeleteResult.builder()
+                .addDestroyed(accessibleMessageIds)
+                .addNotFound(nonAccessibleMessages)
+                .build());
     }
 
     private Mono<Void> deleteWithPreHooks(MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, MailboxSession mailboxSession) {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index bdddff1..56c1ba3 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -53,6 +53,8 @@ import org.junit.jupiter.api.Test;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public abstract class AbstractCombinationManagerTest {
 
@@ -534,7 +536,9 @@ public abstract class AbstractCombinationManagerTest {
             .appendMessage(MessageManager.AppendCommand.from(mailContent), session)
             .getId().getMessageId();
 
-        messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session);
+        Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session))
+            .subscribeOn(Schedulers.elastic())
+            .block();
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
         assertThat(Flux.from(messageManager1.search(searchQuery, session)).toStream()).isEmpty();
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
index 7a1d087..a6011de 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
@@ -83,6 +83,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public abstract class AbstractMessageIdManagerSideEffectTest {
     private static final Quota<QuotaCountLimit, QuotaCountUsage> OVER_QUOTA = Quota.<QuotaCountLimit, QuotaCountUsage>builder()
@@ -169,7 +170,9 @@ public abstract class AbstractMessageIdManagerSideEffectTest {
         MessageMetaData simpleMessageMetaData2 = messageResult2.messageMetaData();
 
         eventBus.register(eventCollector);
-        messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session);
+        Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session))
+            .subscribeOn(Schedulers.elastic())
+            .block();
 
         AbstractListAssert<?, List<? extends Expunged>, Expunged, ObjectAssert<Expunged>> events =
             assertThat(eventCollector.getEvents())
@@ -523,7 +526,7 @@ public abstract class AbstractMessageIdManagerSideEffectTest {
         MessageId messageId = testingData.createNotUsedMessageId();
 
         eventBus.register(eventCollector);
-        messageIdManager.delete(ImmutableList.of(messageId), session);
+        messageIdManager.delete(messageId, session);
 
         assertThat(eventCollector.getEvents()).isEmpty();
     }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
index d9beb5b..a154ae9 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
@@ -363,7 +363,7 @@ public abstract class AbstractMessageIdManagerStorageTest {
     void deleteAllShouldReturnNotDeleteWhenDeletingOnOtherSession() throws Exception {
         MessageId messageId = testingData.persist(bobMailbox1.getMailboxId(), messageUid1, FLAGS, bobSession);
 
-        messageIdManager.delete(ImmutableList.of(messageId), aliceSession);
+        messageIdManager.delete(messageId, aliceSession);
 
         assertThat(messageIdManager.getMessage(messageId, FetchGroup.MINIMAL, bobSession)).hasSize(1);
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
index a961a73..5a90a6b 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
@@ -40,6 +40,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SetMessagesCreationProcessor.class);
@@ -70,7 +73,9 @@ public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
             if (toBeDestroyed.isEmpty()) {
                 return Stream.empty();
             }
-            DeleteResult deleteResult = messageIdManager.delete(toBeDestroyed, mailboxSession);
+            DeleteResult deleteResult = Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
+                .subscribeOn(Schedulers.elastic())
+                .block();
 
             Stream<SetMessagesResponse> destroyed = deleteResult.getDestroyed().stream()
                 .map(messageId -> SetMessagesResponse.builder().destroyed(messageId).build());
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
index 14f106a..6908f7c 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
@@ -85,7 +85,7 @@ class EmailSetDeletePerformer @Inject()(messageIdManager: MessageIdManager,
         case _ => None
       }
 
-      SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
+      SMono(messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
         .map(DestroyResult.from)
         .subscribeOn(Schedulers.elastic())
         .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e))))

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org