You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/27 02:47:44 UTC
[james-project] branch master updated: JAMES-3737 Reactive IMAP RENAME (#1016)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 5ff216e8d0 JAMES-3737 Reactive IMAP RENAME (#1016)
5ff216e8d0 is described below
commit 5ff216e8d0ba0dca572310e5f04f9d5e379bddca
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Fri May 27 09:47:40 2022 +0700
JAMES-3737 Reactive IMAP RENAME (#1016)
---
.../org/apache/james/mailbox/MailboxManager.java | 8 ++
.../james/mailbox/store/StoreMailboxManager.java | 133 ++++++++++++---------
.../james/imap/processor/RenameProcessor.java | 69 +++++++----
3 files changed, 128 insertions(+), 82 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index 3b6acbe0d2..376b92be6d 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -249,6 +249,10 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot
return renameMailbox(from, to, RenameOption.NONE, session);
}
+ default Publisher<List<MailboxRenamedResult>> renameMailboxReactive(MailboxPath from, MailboxPath to, RenameOption option, MailboxSession session) {
+ return Mono.fromCallable(() -> renameMailbox(from, to, option, session));
+ }
+
/**
* Renames a mailbox.
*
@@ -271,6 +275,10 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot
return renameMailbox(mailboxId, newMailboxPath, RenameOption.NONE, session);
}
+ default Publisher<List<MailboxRenamedResult>> renameMailboxReactive(MailboxId mailboxId, MailboxPath to, RenameOption option, MailboxSession session) {
+ return Mono.fromCallable(() -> renameMailbox(mailboxId, to, option, session));
+ }
+
/**
* Copy the given {@link MessageRange} from one Mailbox to the other.
*
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 3ef21f12e3..35ab6492c5 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -20,7 +20,6 @@
package org.apache.james.mailbox.store;
import static org.apache.james.mailbox.store.MailboxReactorUtils.block;
-import static org.apache.james.mailbox.store.MailboxReactorUtils.blockOptional;
import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED;
import java.time.Duration;
@@ -40,7 +39,6 @@ import org.apache.james.events.EventBus;
import org.apache.james.mailbox.MailboxAnnotationManager;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxPathLocker;
-import org.apache.james.mailbox.MailboxPathLocker.LockAwareExecution;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageManager;
import org.apache.james.mailbox.MetadataWithMailboxId;
@@ -52,7 +50,6 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.exception.MailboxExistsException;
import org.apache.james.mailbox.exception.MailboxNameException;
import org.apache.james.mailbox.exception.MailboxNotFoundException;
-import org.apache.james.mailbox.exception.SubscriptionException;
import org.apache.james.mailbox.exception.UnsupportedRightException;
import org.apache.james.mailbox.extension.PreDeletionHook;
import org.apache.james.mailbox.model.Mailbox;
@@ -503,53 +500,80 @@ public class StoreMailboxManager implements MailboxManager {
@Override
public List<MailboxRenamedResult> renameMailbox(MailboxPath from, MailboxPath to, RenameOption option,
MailboxSession session) throws MailboxException {
- LOGGER.debug("renameMailbox {} to {}", from, to);
- MailboxPath sanitizedMailboxPath = to.sanitize(session.getPathDelimiter());
- validateDestinationPath(sanitizedMailboxPath, session);
+ return MailboxReactorUtils.block(renameMailboxReactive(from, to, option, session));
+ }
- assertIsOwner(session, from);
+ @Override
+ public Mono<List<MailboxRenamedResult>> renameMailboxReactive(MailboxPath from, MailboxPath to, RenameOption option, MailboxSession session) {
+ LOGGER.debug("renameMailbox {} to {}", from, to);
MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session);
- return mapper.execute(() -> {
- Mailbox mailbox = blockOptional(mapper.findMailboxByPath(from))
- .orElseThrow(() -> new MailboxNotFoundException(from));
- return renameSubscriptionsIfNeeded(
- doRenameMailbox(mailbox, sanitizedMailboxPath, session, mapper), option, session);
+ return sanitizedPath(from, to, session)
+ .flatMap(sanitizedPath -> mapper.executeReactive(
+ mapper.findMailboxByPath(from)
+ .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(from)))
+ .flatMap(mailbox -> doRenameMailbox(mailbox, sanitizedPath, session, mapper)
+ .flatMap(renamedResults -> renameSubscriptionsIfNeeded(renamedResults, option, session)))));
+ }
+
+ private Mono<MailboxPath> sanitizedPath(MailboxPath from, MailboxPath to, MailboxSession session) {
+ return Mono.fromCallable(() -> {
+ MailboxPath sanitizedMailboxPath = to.sanitize(session.getPathDelimiter());
+ validateDestinationPath(sanitizedMailboxPath, session);
+ assertIsOwner(session, from);
+ return sanitizedMailboxPath;
});
}
- private List<MailboxRenamedResult> renameSubscriptionsIfNeeded(List<MailboxRenamedResult> renamedResults,
- RenameOption option, MailboxSession session) throws SubscriptionException {
+ private Mono<MailboxPath> sanitizedPath(MailboxPath to, MailboxSession session) {
+ return Mono.fromCallable(() -> {
+ MailboxPath sanitizedMailboxPath = to.sanitize(session.getPathDelimiter());
+ validateDestinationPath(sanitizedMailboxPath, session);
+ return sanitizedMailboxPath;
+ });
+ }
+
+ private Mono<List<MailboxRenamedResult>> renameSubscriptionsIfNeeded(List<MailboxRenamedResult> renamedResults,
+ RenameOption option, MailboxSession session) {
if (option == RenameOption.RENAME_SUBSCRIPTIONS) {
SubscriptionMapper subscriptionMapper = mailboxSessionMapperFactory.getSubscriptionMapper(session);
- List<Subscription> subscriptionsForUser = subscriptionMapper.findSubscriptionsForUser(session.getUser());
- renamedResults.forEach(Throwing.<MailboxRenamedResult>consumer(renamedResult -> {
- Subscription subscription = new Subscription(session.getUser(), renamedResult.getOriginPath().getName());
- if (subscriptionsForUser.contains(subscription)) {
- subscriptionMapper.delete(subscription);
- subscriptionMapper.save(new Subscription(session.getUser(), renamedResult.getDestinationPath().getName()));
- }
- }).sneakyThrow());
+
+ return subscriptionMapper.findSubscriptionsForUserReactive(session.getUser())
+ .collectList()
+ .flatMap(subscriptions -> Flux.fromIterable(renamedResults)
+ .flatMap(renamedResult -> {
+ Subscription subscription = new Subscription(session.getUser(), renamedResult.getOriginPath().getName());
+ if (subscriptions.contains(subscription)) {
+ return subscriptionMapper.deleteReactive(subscription)
+ .then(subscriptionMapper.saveReactive(new Subscription(session.getUser(), renamedResult.getDestinationPath().getName())));
+ }
+ return Mono.empty();
+ })
+ .then()
+ .thenReturn(renamedResults));
}
- return renamedResults;
+ return Mono.just(renamedResults);
}
@Override
public List<MailboxRenamedResult> renameMailbox(MailboxId mailboxId, MailboxPath newMailboxPath, RenameOption option,
MailboxSession session) throws MailboxException {
- LOGGER.debug("renameMailbox {} to {}", mailboxId, newMailboxPath);
- MailboxPath sanitizedMailboxPath = newMailboxPath.sanitize(session.getPathDelimiter());
- validateDestinationPath(sanitizedMailboxPath, session);
+ return MailboxReactorUtils.block(renameMailboxReactive(mailboxId, newMailboxPath, option, session));
+ }
+ @Override
+ public Mono<List<MailboxRenamedResult>> renameMailboxReactive(MailboxId mailboxId, MailboxPath newMailboxPath, RenameOption option,
+ MailboxSession session) {
+ LOGGER.debug("renameMailbox {} to {}", mailboxId, newMailboxPath);
MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session);
- return mapper.execute(() -> {
- Mailbox mailbox = mapper.findMailboxById(mailboxId).blockOptional()
- .orElseThrow(() -> new MailboxNotFoundException(mailboxId));
- assertIsOwner(session, mailbox.generateAssociatedPath());
- return renameSubscriptionsIfNeeded(
- doRenameMailbox(mailbox, sanitizedMailboxPath, session, mapper), option, session);
- });
+ return sanitizedPath(newMailboxPath, session)
+ .flatMap(sanitizedPath -> mapper.executeReactive(
+ mapper.findMailboxById(mailboxId)
+ .doOnNext(Throwing.<Mailbox>consumer(mailbox -> assertIsOwner(session, mailbox.generateAssociatedPath())).sneakyThrow())
+ .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(mailboxId)))
+ .flatMap(mailbox -> doRenameMailbox(mailbox, sanitizedPath, session, mapper)
+ .flatMap(renamedResults -> renameSubscriptionsIfNeeded(renamedResults, option, session)))));
}
private void validateDestinationPath(MailboxPath newMailboxPath, MailboxSession session) throws MailboxException {
@@ -567,7 +591,7 @@ public class StoreMailboxManager implements MailboxManager {
}
}
- private List<MailboxRenamedResult> doRenameMailbox(Mailbox mailbox, MailboxPath newMailboxPath, MailboxSession session, MailboxMapper mapper) throws MailboxException {
+ private Mono<List<MailboxRenamedResult>> doRenameMailbox(Mailbox mailbox, MailboxPath newMailboxPath, MailboxSession session, MailboxMapper mapper) {
// TODO put this into a serilizable transaction
ImmutableList.Builder<MailboxRenamedResult> resultBuilder = ImmutableList.builder();
@@ -576,22 +600,19 @@ public class StoreMailboxManager implements MailboxManager {
mailbox.setNamespace(newMailboxPath.getNamespace());
mailbox.setUser(newMailboxPath.getUser());
mailbox.setName(newMailboxPath.getName());
+ // Find submailboxes
+ MailboxQuery.UserBound query = MailboxQuery.builder()
+ .userAndNamespaceFrom(from)
+ .expression(new PrefixedWildcard(from.getName() + getDelimiter()))
+ .build()
+ .asUserBound();
- try {
- block(mapper.rename(mailbox)
- .map(mailboxId -> {
- resultBuilder.add(new MailboxRenamedResult(mailboxId, from, newMailboxPath));
- return mailboxId;
- }));
-
- // rename submailboxes
- MailboxQuery.UserBound query = MailboxQuery.builder()
- .userAndNamespaceFrom(from)
- .expression(new PrefixedWildcard(from.getName() + getDelimiter()))
- .build()
- .asUserBound();
- locker.executeWithLock(from, (LockAwareExecution<Void>) () -> {
- block(mapper.findMailboxWithPathLike(query)
+ return mapper.rename(mailbox)
+ .map(mailboxId -> {
+ resultBuilder.add(new MailboxRenamedResult(mailboxId, from, newMailboxPath));
+ return mailboxId;
+ })
+ .then(Mono.from(locker.executeReactiveWithLockReactive(from, mapper.findMailboxWithPathLike(query)
.flatMap(sub -> {
String subOriginalName = sub.getName();
String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length());
@@ -605,14 +626,8 @@ public class StoreMailboxManager implements MailboxManager {
.retryWhen(Retry.backoff(5, Duration.ofMillis(10)))
.then(Mono.fromRunnable(() -> LOGGER.debug("Rename mailbox sub-mailbox {} to {}", subOriginalName, subNewName)));
}, LOW_CONCURRENCY)
- .then());
-
- return null;
-
- }, MailboxPathLocker.LockType.Write);
- return resultBuilder.build();
- } finally {
- Flux.fromIterable(resultBuilder.build())
+ .then(), MailboxPathLocker.LockType.Write)))
+ .then(Mono.defer(() -> Flux.fromIterable(resultBuilder.build())
.concatMap(result -> eventBus.dispatch(EventFactory.mailboxRenamed()
.randomEventId()
.mailboxSession(session)
@@ -621,8 +636,8 @@ public class StoreMailboxManager implements MailboxManager {
.newPath(result.getDestinationPath())
.build(),
new MailboxIdRegistrationKey(result.getMailboxId())))
- .blockLast();
- }
+ .then()))
+ .then(Mono.fromCallable(resultBuilder::build));
}
@Override
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java
index 6ef6d63964..5990b4cdc5 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/RenameProcessor.java
@@ -19,6 +19,8 @@
package org.apache.james.imap.processor;
+import static org.apache.james.util.ReactorUtils.logOnError;
+
import org.apache.james.imap.api.ImapConstants;
import org.apache.james.imap.api.display.HumanReadableText;
import org.apache.james.imap.api.message.response.StatusResponseFactory;
@@ -48,33 +50,54 @@ public class RenameProcessor extends AbstractMailboxProcessor<RenameRequest> {
}
@Override
- protected void processRequest(RenameRequest request, ImapSession session, Responder responder) {
- PathConverter pathConverter = PathConverter.forSession(session);
- MailboxPath existingPath = pathConverter.buildFullPath(request.getExistingName());
- MailboxPath newPath = pathConverter.buildFullPath(request.getNewName());
+ protected Mono<Void> processRequestReactive(RenameRequest request, ImapSession session, Responder responder) {
try {
- final MailboxManager mailboxManager = getMailboxManager();
+ PathConverter pathConverter = PathConverter.forSession(session);
+ MailboxPath existingPath = pathConverter.buildFullPath(request.getExistingName());
+ MailboxPath newPath = pathConverter.buildFullPath(request.getNewName());
+ MailboxManager mailboxManager = getMailboxManager();
MailboxSession mailboxsession = session.getMailboxSession();
- mailboxManager.renameMailbox(existingPath, newPath, mailboxsession);
- if (existingPath.getName().equalsIgnoreCase(ImapConstants.INBOX_NAME) && !Mono.from(mailboxManager.mailboxExists(existingPath, mailboxsession)).block()) {
- mailboxManager.createMailbox(existingPath, mailboxsession);
- }
- okComplete(request, responder);
- unsolicitedResponses(session, responder, false).block();
- } catch (MailboxExistsException e) {
- LOGGER.debug("Rename from {} to {} failed because the target mailbox exists", existingPath, newPath, e);
- no(request, responder, HumanReadableText.FAILURE_MAILBOX_EXISTS);
- } catch (MailboxNotFoundException e) {
- LOGGER.debug("Rename from {} to {} failed because the source mailbox doesn't exist", existingPath, newPath, e);
- no(request, responder, HumanReadableText.MAILBOX_NOT_FOUND);
- } catch (TooLongMailboxNameException e) {
- LOGGER.debug("The mailbox name length is over limit: {}", newPath.getName(), e);
- taggedBad(request, responder, HumanReadableText.FAILURE_MAILBOX_NAME);
- } catch (MailboxException e) {
- LOGGER.error("Rename from {} to {} failed", existingPath, newPath, e);
- no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
+ return Mono.from(mailboxManager.renameMailboxReactive(existingPath, newPath, MailboxManager.RenameOption.NONE, mailboxsession))
+ .then(createInboxIfNeeded(existingPath, mailboxsession))
+ .then(Mono.fromRunnable(() -> okComplete(request, responder)))
+ .doOnEach(logOnError(MailboxExistsException.class, e -> LOGGER.debug("Rename from {} to {} failed because the target mailbox exists", existingPath, newPath, e)))
+ .onErrorResume(MailboxExistsException.class, e -> {
+ no(request, responder, HumanReadableText.MAILBOX_EXISTS);
+ return Mono.empty();
+ })
+ .doOnEach(logOnError(MailboxNotFoundException.class, e -> LOGGER.debug("Rename from {} to {} failed because the source mailbox doesn't exist", existingPath, newPath, e)))
+ .onErrorResume(MailboxNotFoundException.class, e -> {
+ no(request, responder, HumanReadableText.MAILBOX_NOT_FOUND);
+ return Mono.empty();
+ })
+ .doOnEach(logOnError(TooLongMailboxNameException.class, e -> LOGGER.debug("The mailbox name length is over limit: {}", newPath.getName(), e)))
+ .onErrorResume(TooLongMailboxNameException.class, e -> {
+ taggedBad(request, responder, HumanReadableText.FAILURE_MAILBOX_NAME);
+ return Mono.empty();
+ })
+ .doOnEach(logOnError(MailboxException.class, e -> LOGGER.error("Rename from {} to {} failed", existingPath, newPath, e)))
+ .onErrorResume(TooLongMailboxNameException.class, e -> {
+ no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
+ return Mono.empty();
+ })
+ .then(unsolicitedResponses(session, responder, false));
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<Void> createInboxIfNeeded(MailboxPath existingPath, MailboxSession session) {
+ if (!existingPath.getName().equalsIgnoreCase(ImapConstants.INBOX_NAME)) {
+ return Mono.empty();
}
+ return Mono.from(getMailboxManager().mailboxExists(existingPath, session))
+ .flatMap(exisits -> {
+ if (exisits) {
+ return Mono.empty();
+ }
+ return Mono.from(getMailboxManager().createMailboxReactive(existingPath, session));
+ }).then();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org