You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/27 02:47:44 UTC

[james-project] branch master updated: JAMES-3737 Reactive IMAP RENAME (#1016)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ff216e8d0 JAMES-3737 Reactive IMAP RENAME (#1016)
5ff216e8d0 is described below

commit 5ff216e8d0ba0dca572310e5f04f9d5e379bddca
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Fri May 27 09:47:40 2022 +0700

    JAMES-3737 Reactive IMAP RENAME (#1016)
---
 .../org/apache/james/mailbox/MailboxManager.java   |   8 ++
 .../james/mailbox/store/StoreMailboxManager.java   | 133 ++++++++++++---------
 .../james/imap/processor/RenameProcessor.java      |  69 +++++++----
 3 files changed, 128 insertions(+), 82 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index 3b6acbe0d2..376b92be6d 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -249,6 +249,10 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot
         return renameMailbox(from, to, RenameOption.NONE, session);
     }
 
+    default Publisher<List<MailboxRenamedResult>> renameMailboxReactive(MailboxPath from, MailboxPath to, RenameOption option, MailboxSession session) {
+        return Mono.fromCallable(() -> renameMailbox(from, to, option, session));
+    }
+
     /**
      * Renames a mailbox.
      *
@@ -271,6 +275,10 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot
         return renameMailbox(mailboxId, newMailboxPath, RenameOption.NONE, session);
     }
 
+    default Publisher<List<MailboxRenamedResult>> renameMailboxReactive(MailboxId mailboxId, MailboxPath to, RenameOption option, MailboxSession session) {
+        return Mono.fromCallable(() -> renameMailbox(mailboxId, to, option, session));
+    }
+
     /**
      * Copy the given {@link MessageRange} from one Mailbox to the other. 
      * 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 3ef21f12e3..35ab6492c5 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -20,7 +20,6 @@
 package org.apache.james.mailbox.store;
 
 import static org.apache.james.mailbox.store.MailboxReactorUtils.block;
-import static org.apache.james.mailbox.store.MailboxReactorUtils.blockOptional;
 import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
 
 import java.time.Duration;
@@ -40,7 +39,6 @@ import org.apache.james.events.EventBus;
 import org.apache.james.mailbox.MailboxAnnotationManager;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxPathLocker;
-import org.apache.james.mailbox.MailboxPathLocker.LockAwareExecution;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.MetadataWithMailboxId;
@@ -52,7 +50,6 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxExistsException;
 import org.apache.james.mailbox.exception.MailboxNameException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
-import org.apache.james.mailbox.exception.SubscriptionException;
 import org.apache.james.mailbox.exception.UnsupportedRightException;
 import org.apache.james.mailbox.extension.PreDeletionHook;
 import org.apache.james.mailbox.model.Mailbox;
@@ -503,53 +500,80 @@ public class StoreMailboxManager implements MailboxManager {
     @Override
     public List<MailboxRenamedResult> renameMailbox(MailboxPath from, MailboxPath to, RenameOption option,
                                                     MailboxSession session) throws MailboxException {
-        LOGGER.debug("renameMailbox {} to {}", from, to);
-        MailboxPath sanitizedMailboxPath = to.sanitize(session.getPathDelimiter());
-        validateDestinationPath(sanitizedMailboxPath, session);
+        return MailboxReactorUtils.block(renameMailboxReactive(from, to, option, session));
+    }
 
-        assertIsOwner(session, from);
+    @Override
+    public Mono<List<MailboxRenamedResult>> renameMailboxReactive(MailboxPath from, MailboxPath to, RenameOption option, MailboxSession session) {
+        LOGGER.debug("renameMailbox {} to {}", from, to);
         MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session);
 
-        return mapper.execute(() -> {
-            Mailbox mailbox = blockOptional(mapper.findMailboxByPath(from))
-                .orElseThrow(() -> new MailboxNotFoundException(from));
-            return renameSubscriptionsIfNeeded(
-                doRenameMailbox(mailbox, sanitizedMailboxPath, session, mapper), option, session);
+        return sanitizedPath(from, to, session)
+        .flatMap(sanitizedPath -> mapper.executeReactive(
+            mapper.findMailboxByPath(from)
+                .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(from)))
+                .flatMap(mailbox -> doRenameMailbox(mailbox, sanitizedPath, session, mapper)
+                    .flatMap(renamedResults -> renameSubscriptionsIfNeeded(renamedResults, option, session)))));
+    }
+
+    private Mono<MailboxPath> sanitizedPath(MailboxPath from, MailboxPath to, MailboxSession session) {
+        return Mono.fromCallable(() -> {
+            MailboxPath sanitizedMailboxPath = to.sanitize(session.getPathDelimiter());
+            validateDestinationPath(sanitizedMailboxPath, session);
+            assertIsOwner(session, from);
+            return sanitizedMailboxPath;
         });
     }
 
-    private List<MailboxRenamedResult> renameSubscriptionsIfNeeded(List<MailboxRenamedResult> renamedResults,
-                                                                   RenameOption option, MailboxSession session) throws SubscriptionException {
+    private Mono<MailboxPath> sanitizedPath(MailboxPath to, MailboxSession session) {
+        return Mono.fromCallable(() -> {
+            MailboxPath sanitizedMailboxPath = to.sanitize(session.getPathDelimiter());
+            validateDestinationPath(sanitizedMailboxPath, session);
+            return sanitizedMailboxPath;
+        });
+    }
+
+    private Mono<List<MailboxRenamedResult>> renameSubscriptionsIfNeeded(List<MailboxRenamedResult> renamedResults,
+                                                                   RenameOption option, MailboxSession session) {
         if (option == RenameOption.RENAME_SUBSCRIPTIONS) {
             SubscriptionMapper subscriptionMapper = mailboxSessionMapperFactory.getSubscriptionMapper(session);
-            List<Subscription> subscriptionsForUser = subscriptionMapper.findSubscriptionsForUser(session.getUser());
-            renamedResults.forEach(Throwing.<MailboxRenamedResult>consumer(renamedResult -> {
-                Subscription subscription = new Subscription(session.getUser(), renamedResult.getOriginPath().getName());
-                if (subscriptionsForUser.contains(subscription)) {
-                    subscriptionMapper.delete(subscription);
-                    subscriptionMapper.save(new Subscription(session.getUser(), renamedResult.getDestinationPath().getName()));
-                }
-            }).sneakyThrow());
+
+            return subscriptionMapper.findSubscriptionsForUserReactive(session.getUser())
+                .collectList()
+                .flatMap(subscriptions -> Flux.fromIterable(renamedResults)
+                    .flatMap(renamedResult -> {
+                        Subscription subscription = new Subscription(session.getUser(), renamedResult.getOriginPath().getName());
+                        if (subscriptions.contains(subscription)) {
+                            return subscriptionMapper.deleteReactive(subscription)
+                                .then(subscriptionMapper.saveReactive(new Subscription(session.getUser(), renamedResult.getDestinationPath().getName())));
+                        }
+                        return Mono.empty();
+                    })
+                    .then()
+                    .thenReturn(renamedResults));
         }
-        return renamedResults;
+        return Mono.just(renamedResults);
     }
 
     @Override
     public List<MailboxRenamedResult> renameMailbox(MailboxId mailboxId, MailboxPath newMailboxPath, RenameOption option,
                                                     MailboxSession session) throws MailboxException {
-        LOGGER.debug("renameMailbox {} to {}", mailboxId, newMailboxPath);
-        MailboxPath sanitizedMailboxPath = newMailboxPath.sanitize(session.getPathDelimiter());
-        validateDestinationPath(sanitizedMailboxPath, session);
+        return MailboxReactorUtils.block(renameMailboxReactive(mailboxId, newMailboxPath, option, session));
+    }
 
+    @Override
+    public Mono<List<MailboxRenamedResult>> renameMailboxReactive(MailboxId mailboxId, MailboxPath newMailboxPath, RenameOption option,
+                                                    MailboxSession session) {
+        LOGGER.debug("renameMailbox {} to {}", mailboxId, newMailboxPath);
         MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session);
 
-        return mapper.execute(() -> {
-            Mailbox mailbox = mapper.findMailboxById(mailboxId).blockOptional()
-                .orElseThrow(() -> new MailboxNotFoundException(mailboxId));
-            assertIsOwner(session, mailbox.generateAssociatedPath());
-            return renameSubscriptionsIfNeeded(
-                doRenameMailbox(mailbox, sanitizedMailboxPath, session, mapper), option, session);
-        });
+        return sanitizedPath(newMailboxPath, session)
+            .flatMap(sanitizedPath -> mapper.executeReactive(
+                mapper.findMailboxById(mailboxId)
+                    .doOnNext(Throwing.<Mailbox>consumer(mailbox -> assertIsOwner(session, mailbox.generateAssociatedPath())).sneakyThrow())
+                    .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(mailboxId)))
+                    .flatMap(mailbox -> doRenameMailbox(mailbox, sanitizedPath, session, mapper)
+                        .flatMap(renamedResults -> renameSubscriptionsIfNeeded(renamedResults, option, session)))));
     }
 
     private void validateDestinationPath(MailboxPath newMailboxPath, MailboxSession session) throws MailboxException {
@@ -567,7 +591,7 @@ public class StoreMailboxManager implements MailboxManager {
         }
     }
 
-    private List<MailboxRenamedResult> doRenameMailbox(Mailbox mailbox, MailboxPath newMailboxPath, MailboxSession session, MailboxMapper mapper) throws MailboxException {
+    private Mono<List<MailboxRenamedResult>> doRenameMailbox(Mailbox mailbox, MailboxPath newMailboxPath, MailboxSession session, MailboxMapper mapper) {
         // TODO put this into a serilizable transaction
 
         ImmutableList.Builder<MailboxRenamedResult> resultBuilder = ImmutableList.builder();
@@ -576,22 +600,19 @@ public class StoreMailboxManager implements MailboxManager {
         mailbox.setNamespace(newMailboxPath.getNamespace());
         mailbox.setUser(newMailboxPath.getUser());
         mailbox.setName(newMailboxPath.getName());
+        // Find submailboxes
+        MailboxQuery.UserBound query = MailboxQuery.builder()
+            .userAndNamespaceFrom(from)
+            .expression(new PrefixedWildcard(from.getName() + getDelimiter()))
+            .build()
+            .asUserBound();
 
-        try {
-            block(mapper.rename(mailbox)
-                .map(mailboxId -> {
-                    resultBuilder.add(new MailboxRenamedResult(mailboxId, from, newMailboxPath));
-                    return mailboxId;
-                }));
-
-            // rename submailboxes
-            MailboxQuery.UserBound query = MailboxQuery.builder()
-                .userAndNamespaceFrom(from)
-                .expression(new PrefixedWildcard(from.getName() + getDelimiter()))
-                .build()
-                .asUserBound();
-            locker.executeWithLock(from, (LockAwareExecution<Void>) () -> {
-                block(mapper.findMailboxWithPathLike(query)
+        return mapper.rename(mailbox)
+            .map(mailboxId -> {
+                resultBuilder.add(new MailboxRenamedResult(mailboxId, from, newMailboxPath));
+                return mailboxId;
+            })
+            .then(Mono.from(locker.executeReactiveWithLockReactive(from, mapper.findMailboxWithPathLike(query)
                     .flatMap(sub -> {
                         String subOriginalName = sub.getName();
                         String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length());
@@ -605,14 +626,8 @@ public class StoreMailboxManager implements MailboxManager {
                             .retryWhen(Retry.backoff(5, Duration.ofMillis(10)))
                             .then(Mono.fromRunnable(() -> LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName)));
                     }, LOW_CONCURRENCY)
-                    .then());
-
-                return null;
-
-            }, MailboxPathLocker.LockType.Write);
-            return resultBuilder.build();
-        } finally {
-            Flux.fromIterable(resultBuilder.build())
+                    .then(), MailboxPathLocker.LockType.Write)))
+            .then(Mono.defer(() -> Flux.fromIterable(resultBuilder.build())
                 .concatMap(result -> eventBus.dispatch(EventFactory.mailboxRenamed()
                         .randomEventId()
                         .mailboxSession(session)
@@ -621,8 +636,8 @@ public class StoreMailboxManager implements MailboxManager {
                         .newPath(result.getDestinationPath())
                         .build(),
                     new MailboxIdRegistrationKey(result.getMailboxId())))
-                .blockLast();
-        }
+                .then()))
+            .then(Mono.fromCallable(resultBuilder::build));
     }
 
     @Override
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java
index 6ef6d63964..5990b4cdc5 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.imap.processor;
 
+import static org.apache.james.util.ReactorUtils.logOnError;
+
 import org.apache.james.imap.api.ImapConstants;
 import org.apache.james.imap.api.display.HumanReadableText;
 import org.apache.james.imap.api.message.response.StatusResponseFactory;
@@ -48,33 +50,54 @@ public class RenameProcessor extends AbstractMailboxProcessor<RenameRequest> {
     }
 
     @Override
-    protected void processRequest(RenameRequest request, ImapSession session, Responder responder) {
-        PathConverter pathConverter = PathConverter.forSession(session);
-        MailboxPath existingPath = pathConverter.buildFullPath(request.getExistingName());
-        MailboxPath newPath = pathConverter.buildFullPath(request.getNewName());
+    protected Mono<Void> processRequestReactive(RenameRequest request, ImapSession session, Responder responder) {
         try {
-            final MailboxManager mailboxManager = getMailboxManager();
+            PathConverter pathConverter = PathConverter.forSession(session);
+            MailboxPath existingPath = pathConverter.buildFullPath(request.getExistingName());
+            MailboxPath newPath = pathConverter.buildFullPath(request.getNewName());
+            MailboxManager mailboxManager = getMailboxManager();
             MailboxSession mailboxsession = session.getMailboxSession();
-            mailboxManager.renameMailbox(existingPath, newPath, mailboxsession);
 
-            if (existingPath.getName().equalsIgnoreCase(ImapConstants.INBOX_NAME) && !Mono.from(mailboxManager.mailboxExists(existingPath, mailboxsession)).block()) {
-                mailboxManager.createMailbox(existingPath, mailboxsession);
-            }
-            okComplete(request, responder);
-            unsolicitedResponses(session, responder, false).block();
-        } catch (MailboxExistsException e) {
-            LOGGER.debug("Rename from {} to {} failed because the target mailbox exists", existingPath, newPath, e);
-            no(request, responder, HumanReadableText.FAILURE_MAILBOX_EXISTS);
-        } catch (MailboxNotFoundException e) {
-            LOGGER.debug("Rename from {} to {} failed because the source mailbox doesn't exist", existingPath, newPath, e);
-            no(request, responder, HumanReadableText.MAILBOX_NOT_FOUND);
-        } catch (TooLongMailboxNameException e) {
-            LOGGER.debug("The mailbox name length is over limit: {}", newPath.getName(), e);
-            taggedBad(request, responder, HumanReadableText.FAILURE_MAILBOX_NAME);
-        } catch (MailboxException e) {
-            LOGGER.error("Rename from {} to {} failed", existingPath, newPath, e);
-            no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
+            return Mono.from(mailboxManager.renameMailboxReactive(existingPath, newPath, MailboxManager.RenameOption.NONE, mailboxsession))
+                .then(createInboxIfNeeded(existingPath, mailboxsession))
+                .then(Mono.fromRunnable(() -> okComplete(request, responder)))
+                .doOnEach(logOnError(MailboxExistsException.class, e -> LOGGER.debug("Rename from {} to {} failed because the target mailbox exists", existingPath, newPath, e)))
+                .onErrorResume(MailboxExistsException.class, e -> {
+                    no(request, responder, HumanReadableText.MAILBOX_EXISTS);
+                    return Mono.empty();
+                })
+                .doOnEach(logOnError(MailboxNotFoundException.class, e ->  LOGGER.debug("Rename from {} to {} failed because the source mailbox doesn't exist", existingPath, newPath, e)))
+                .onErrorResume(MailboxNotFoundException.class, e -> {
+                    no(request, responder, HumanReadableText.MAILBOX_NOT_FOUND);
+                    return Mono.empty();
+                })
+                .doOnEach(logOnError(TooLongMailboxNameException.class, e -> LOGGER.debug("The mailbox name length is over limit: {}", newPath.getName(), e)))
+                .onErrorResume(TooLongMailboxNameException.class, e -> {
+                    taggedBad(request, responder, HumanReadableText.FAILURE_MAILBOX_NAME);
+                    return Mono.empty();
+                })
+                .doOnEach(logOnError(MailboxException.class, e -> LOGGER.error("Rename from {} to {} failed", existingPath, newPath, e)))
+                .onErrorResume(TooLongMailboxNameException.class, e -> {
+                    no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
+                    return Mono.empty();
+                })
+                .then(unsolicitedResponses(session, responder, false));
+        } catch (Exception e) {
+            return Mono.error(e);
+        }
+    }
+
+    private Mono<Void> createInboxIfNeeded(MailboxPath existingPath, MailboxSession session) {
+        if (!existingPath.getName().equalsIgnoreCase(ImapConstants.INBOX_NAME)) {
+            return Mono.empty();
         }
+        return Mono.from(getMailboxManager().mailboxExists(existingPath, session))
+            .flatMap(exisits -> {
+                if (exisits) {
+                    return Mono.empty();
+                }
+                return Mono.from(getMailboxManager().createMailboxReactive(existingPath, session));
+            }).then();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org