You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2023/04/17 06:49:27 UTC

[james-project] branch master updated (47825dcad1 -> 96fa023649)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 47825dcad1 [REFACTORING] Merge ReactorUtils::toChunks and DataChunker::chunkStream (#1518)
     new 3037a2b2c3 AnnotationMapper support more reactive api & Cassandra implement
     new bbdd764273 MailboxAnnotationListener implement reactive group event listener
     new 4e217b2bdb MailboxAnnotationManager support more reactive api
     new 96fa023649 IMAP - Reactive GetMetadata & SetMetadata processor

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../james/mailbox/MailboxAnnotationManager.java    |  11 ++
 .../apache/james/mailbox/MailboxManagerTest.java   |   6 +-
 .../cassandra/mail/CassandraAnnotationMapper.java  | 156 +++++++++++++--------
 .../store/StoreMailboxAnnotationManager.java       | 139 +++++++++++-------
 .../james/mailbox/store/StoreMailboxManager.java   |  25 ++++
 .../store/event/MailboxAnnotationListener.java     |  23 ++-
 .../james/mailbox/store/mail/AnnotationMapper.java |  36 +++++
 .../store/StoreMailboxManagerAnnotationTest.java   |  38 +++--
 .../store/event/MailboxAnnotationListenerTest.java |  29 ++--
 .../james/imap/processor/GetMetadataProcessor.java |  83 +++++------
 .../james/imap/processor/SetMetadataProcessor.java |  34 ++---
 .../imap/processor/GetAnnotationProcessorTest.java |  73 ++++++----
 .../imap/processor/SetMetadataProcessorTest.java   |  21 +--
 13 files changed, 416 insertions(+), 258 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/04: AnnotationMapper support more reactive api & Cassandra implement

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3037a2b2c348dbfafdd49f47566c654970acc13d
Author: Tung Tran <vt...@linagora.com>
AuthorDate: Thu Apr 6 08:08:21 2023 +0700

    AnnotationMapper support more reactive api & Cassandra implement
---
 .../cassandra/mail/CassandraAnnotationMapper.java  | 156 +++++++++++++--------
 .../james/mailbox/store/mail/AnnotationMapper.java |  36 +++++
 2 files changed, 132 insertions(+), 60 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
index cce1d2b520..179cc060ac 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAnnotationMapper.java
@@ -26,11 +26,11 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
 import static com.datastax.oss.driver.api.querybuilder.relation.Relation.column;
 
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.table.CassandraAnnotationTable;
 import org.apache.james.mailbox.model.MailboxAnnotation;
@@ -46,21 +46,25 @@ import com.datastax.oss.driver.api.querybuilder.select.Select;
 import com.google.common.base.Ascii;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class CassandraAnnotationMapper extends NonTransactionalMapper implements AnnotationMapper {
 
-    private final CqlSession session;
+    private final CassandraAsyncExecutor asyncExecutor;
     private final PreparedStatement delete;
     private final PreparedStatement insert;
+    private final PreparedStatement getStoredAnnotationsQuery;
+    private final PreparedStatement countStoredAnnotationsQuery;
     private final PreparedStatement getStoredAnnotationsQueryForKeys;
     private final PreparedStatement getStoredAnnotationsQueryLikeKey;
     private final PreparedStatement getStoredAnnotationsQueryByKey;
 
     @Inject
     public CassandraAnnotationMapper(CqlSession session) {
-        this.session = session;
+        this.asyncExecutor = new CassandraAsyncExecutor(session);
         this.delete = session.prepare(deleteFrom(CassandraAnnotationTable.TABLE_NAME)
             .where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID)),
                 column(CassandraAnnotationTable.KEY).isEqualTo(bindMarker(CassandraAnnotationTable.KEY)))
@@ -72,63 +76,109 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements
             .value(CassandraAnnotationTable.VALUE, bindMarker(CassandraAnnotationTable.VALUE))
             .build());
 
-        this.getStoredAnnotationsQueryForKeys = getStoredAnnotationsQueryForKeys();
-        this.getStoredAnnotationsQueryLikeKey = getStoredAnnotationsQueryLikeKey();
-        this.getStoredAnnotationsQueryByKey = getStoredAnnotationsQueryByKey();
+        this.getStoredAnnotationsQuery = session.prepare(getStoredAnnotationsQuery().build());
+
+        this.countStoredAnnotationsQuery = session.prepare(selectFrom(CassandraAnnotationTable.TABLE_NAME)
+            .countAll()
+            .where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID)))
+            .build());
+
+        this.getStoredAnnotationsQueryForKeys =
+            session.prepare(getStoredAnnotationsQuery()
+                .where(column(CassandraAnnotationTable.KEY).in(bindMarker(CassandraAnnotationTable.KEY)))
+                .build());
+
+        this.getStoredAnnotationsQueryLikeKey =
+            session.prepare(getStoredAnnotationsQuery()
+                .where(column(CassandraAnnotationTable.KEY)
+                        .isGreaterThanOrEqualTo(bindMarker(CassandraAnnotationTable.GREATER_BIND_KEY)),
+                    column(CassandraAnnotationTable.KEY)
+                        .isLessThanOrEqualTo(bindMarker(CassandraAnnotationTable.LESSER_BIND_KEY)))
+                .build());
+
+        this.getStoredAnnotationsQueryByKey =
+            session.prepare(getStoredAnnotationsQuery()
+                .where(column(CassandraAnnotationTable.KEY)
+                    .isEqualTo(bindMarker(CassandraAnnotationTable.KEY)))
+                .build());
     }
 
     @Override
     public List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return Flux.from(session.executeReactive(session.prepare(getStoredAnnotationsQuery().build()).bind()
-                .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid())))
-            .map(this::toAnnotation)
+        return getAllAnnotationsReactive(mailboxId)
             .collectList()
             .block();
     }
 
+    @Override
+    public Flux<MailboxAnnotation> getAllAnnotationsReactive(MailboxId mailboxId) {
+        return asyncExecutor.executeRows(getStoredAnnotationsQuery.bind()
+                .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid()))
+            .map(this::toAnnotation);
+    }
+
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return Flux.from(session.executeReactive(getStoredAnnotationsQueryForKeys.bind()
-                .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid())
-                .setList(CassandraAnnotationTable.KEY, keys.stream()
-                    .map(MailboxAnnotationKey::asString)
-                    .collect(ImmutableList.toImmutableList()), String.class)))
-            .map(this::toAnnotation)
+        return getAnnotationsByKeysReactive(mailboxId, keys)
             .collectList()
             .block();
     }
 
+    @Override
+    public Flux<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
+        return asyncExecutor.executeRows(getStoredAnnotationsQueryForKeys.bind()
+                .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid())
+                .setList(CassandraAnnotationTable.KEY, keys.stream()
+                    .map(MailboxAnnotationKey::asString)
+                    .collect(ImmutableList.toImmutableList()), String.class))
+            .map(this::toAnnotation);
+    }
+
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return Flux.fromIterable(keys)
-            .flatMap(annotation -> getAnnotationsByKeyWithOneDepth(cassandraId, annotation))
-            .collectList()
+        return getAnnotationsByKeysWithOneDepthReactive(mailboxId, keys).collectList()
             .block();
     }
 
     @Override
-    public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
+    public Flux<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
         return Flux.fromIterable(keys)
-            .flatMap(annotation -> getAnnotationsByKeyWithAllDepth(cassandraId, annotation))
-            .collectList()
+            .flatMap(annotation -> getAnnotationsByKeyWithOneDepth((CassandraId) mailboxId, annotation));
+    }
+
+    @Override
+    public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
+        return getAnnotationsByKeysWithAllDepthReactive(mailboxId, keys).collectList()
             .block();
     }
 
+    @Override
+    public Flux<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
+        return Flux.fromIterable(keys)
+            .flatMap(annotation -> getAnnotationsByKeyWithAllDepth((CassandraId) mailboxId, annotation));
+    }
+
     @Override
     public void deleteAnnotation(MailboxId mailboxId, MailboxAnnotationKey key) {
-        session.execute(delete.bind()
+        deleteAnnotationReactive(mailboxId, key).block();
+    }
+
+    @Override
+    public Mono<Void> deleteAnnotationReactive(MailboxId mailboxId, MailboxAnnotationKey key) {
+        return asyncExecutor.executeVoid(delete.bind()
             .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid())
             .setString(CassandraAnnotationTable.KEY, key.asString()));
     }
 
     @Override
     public void insertAnnotation(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
+        insertAnnotationReactive(mailboxId, mailboxAnnotation).block();
+    }
+
+    @Override
+    public Mono<Void> insertAnnotationReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
         Preconditions.checkArgument(!mailboxAnnotation.isNil());
-        session.execute(insert.bind()
+        return asyncExecutor.executeVoid(insert.bind()
             .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid())
             .setString(CassandraAnnotationTable.KEY, mailboxAnnotation.getKey().asString())
             .setString(CassandraAnnotationTable.VALUE, mailboxAnnotation.getValue().get()));
@@ -136,20 +186,25 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements
 
     @Override
     public boolean exist(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        Optional<Row> row = Optional.ofNullable(
-            session.execute(getStoredAnnotationsQueryByKey.bind()
-                    .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid())
-                    .setString(CassandraAnnotationTable.KEY, mailboxAnnotation.getKey().asString()))
-                .one());
-        return row.isPresent();
+        return existReactive(mailboxId, mailboxAnnotation).block();
+    }
+
+    @Override
+    public Mono<Boolean> existReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
+        return asyncExecutor.executeReturnExists(getStoredAnnotationsQueryByKey.bind()
+            .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid())
+            .setString(CassandraAnnotationTable.KEY, mailboxAnnotation.getKey().asString()));
     }
 
     @Override
     public int countAnnotations(MailboxId mailboxId) {
-        CassandraId cassandraId = (CassandraId) mailboxId;
-        return session.execute(session.prepare(getStoredAnnotationsQuery().build()).bind()
-            .setUuid(CassandraAnnotationTable.MAILBOX_ID, cassandraId.asUuid())).getAvailableWithoutFetching();
+        return countAnnotationsReactive(mailboxId).block();
+    }
+
+    public Mono<Integer> countAnnotationsReactive(MailboxId mailboxId) {
+        return asyncExecutor.executeSingleRow(countStoredAnnotationsQuery.bind()
+                .setUuid(CassandraAnnotationTable.MAILBOX_ID, ((CassandraId) mailboxId).asUuid()))
+            .map(row -> Ints.checkedCast(row.getLong(0)));
     }
 
     private MailboxAnnotation toAnnotation(Row row) {
@@ -163,43 +218,24 @@ public class CassandraAnnotationMapper extends NonTransactionalMapper implements
             .where(column(CassandraAnnotationTable.MAILBOX_ID).isEqualTo(bindMarker(CassandraAnnotationTable.MAILBOX_ID)));
     }
 
-    private PreparedStatement getStoredAnnotationsQueryForKeys() {
-        return session.prepare(getStoredAnnotationsQuery()
-            .where(column(CassandraAnnotationTable.KEY).in(bindMarker(CassandraAnnotationTable.KEY)))
-            .build());
-    }
-
-    private PreparedStatement getStoredAnnotationsQueryLikeKey() {
-        return session.prepare(getStoredAnnotationsQuery()
-            .where(column(CassandraAnnotationTable.KEY).isGreaterThanOrEqualTo(bindMarker(CassandraAnnotationTable.GREATER_BIND_KEY)),
-                column(CassandraAnnotationTable.KEY).isLessThanOrEqualTo(bindMarker(CassandraAnnotationTable.LESSER_BIND_KEY)))
-            .build());
-    }
-
-    private PreparedStatement getStoredAnnotationsQueryByKey() {
-        return session.prepare(getStoredAnnotationsQuery()
-            .where(column(CassandraAnnotationTable.KEY).isEqualTo(bindMarker(CassandraAnnotationTable.KEY)))
-            .build());
-    }
-
     private String buildNextKey(String key) {
         return key + MailboxAnnotationKey.SLASH_CHARACTER + Ascii.MAX;
     }
 
     private Flux<MailboxAnnotation> getAnnotationsByKeyWithAllDepth(CassandraId mailboxId, MailboxAnnotationKey key) {
-        return Flux.from(session.executeReactive(getStoredAnnotationsQueryLikeKey.bind()
+        return asyncExecutor.executeRows(getStoredAnnotationsQueryLikeKey.bind()
                 .setUuid(CassandraAnnotationTable.MAILBOX_ID, mailboxId.asUuid())
                 .setString(CassandraAnnotationTable.GREATER_BIND_KEY, key.asString())
-                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString()))))
+                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString())))
             .map(this::toAnnotation)
             .filter(annotation -> key.isAncestorOrIsEqual(annotation.getKey()));
     }
 
     private Flux<MailboxAnnotation> getAnnotationsByKeyWithOneDepth(CassandraId mailboxId, MailboxAnnotationKey key) {
-        return Flux.from(session.executeReactive(getStoredAnnotationsQueryLikeKey.bind()
+        return asyncExecutor.executeRows(getStoredAnnotationsQueryLikeKey.bind()
                 .setUuid(CassandraAnnotationTable.MAILBOX_ID, mailboxId.asUuid())
                 .setString(CassandraAnnotationTable.GREATER_BIND_KEY, key.asString())
-                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString()))))
+                .setString(CassandraAnnotationTable.LESSER_BIND_KEY, buildNextKey(key.asString())))
             .map(this::toAnnotation)
             .filter(annotation -> key.isParentOrIsEqual(annotation.getKey()));
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
index ec775de6c6..da3146fcc9 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AnnotationMapper.java
@@ -26,6 +26,10 @@ import org.apache.james.mailbox.model.MailboxAnnotation;
 import org.apache.james.mailbox.model.MailboxAnnotationKey;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.transaction.Mapper;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public interface AnnotationMapper extends Mapper {
     /**
@@ -37,6 +41,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAllAnnotations(MailboxId mailboxId);
 
+    default Publisher<MailboxAnnotation> getAllAnnotationsReactive(MailboxId mailboxId) {
+        return Flux.fromIterable(getAllAnnotations(mailboxId));
+    }
+
     /**
      * Search all the <code>MailboxAnnotation</code> of selected mailbox by the set of annotation's keys. The result is not ordered and should not
      * contain duplicate by key
@@ -47,6 +55,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAnnotationsByKeys(MailboxId mailboxId, Set<MailboxAnnotationKey> keys);
 
+    default Publisher<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
+        return Flux.fromIterable(getAnnotationsByKeys(mailboxId, keys));
+    }
+
     /**
      * Search all the <code>MailboxAnnotation</code> of selected mailbox by the set of annotation's keys as well as its children entries
      * The result is not ordered and should not contain duplicate by key
@@ -57,6 +69,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys);
 
+    default Publisher<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
+        return Flux.fromIterable(getAnnotationsByKeysWithOneDepth(mailboxId, keys));
+    }
+
     /**
      * Search all the <code>MailboxAnnotation</code> of selected mailbox by the set of annotation's keys and entries below the keys
      * The result is not ordered and should not contain duplicate by key
@@ -67,6 +83,10 @@ public interface AnnotationMapper extends Mapper {
      */
     List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxId mailboxId, Set<MailboxAnnotationKey> keys);
 
+    default Publisher<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxId mailboxId, Set<MailboxAnnotationKey> keys) {
+        return Flux.fromIterable(getAnnotationsByKeysWithAllDepth(mailboxId, keys));
+    }
+
     /**
      * Delete the annotation of selected mailbox by its key.
      *
@@ -75,6 +95,10 @@ public interface AnnotationMapper extends Mapper {
      */
     void deleteAnnotation(MailboxId mailboxId, MailboxAnnotationKey key);
 
+    default Publisher<Void> deleteAnnotationReactive(MailboxId mailboxId, MailboxAnnotationKey key) {
+        return Mono.fromRunnable(() -> deleteAnnotation(mailboxId, key));
+    }
+
     /**
      * - Insert new annotation if it does not exist on store
      * - Update the new value for existed annotation
@@ -84,6 +108,10 @@ public interface AnnotationMapper extends Mapper {
      */
     void insertAnnotation(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation);
 
+    default Publisher<Void> insertAnnotationReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
+        return Mono.fromRunnable(() -> insertAnnotation(mailboxId, mailboxAnnotation));
+    }
+
     /**
      * Checking the current annotation of selected mailbox exists on store or not. It's checked by annotation key, not by its value.
      *
@@ -93,9 +121,17 @@ public interface AnnotationMapper extends Mapper {
      */
     boolean exist(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation);
 
+    default Publisher<Boolean> existReactive(MailboxId mailboxId, MailboxAnnotation mailboxAnnotation) {
+        return Mono.fromCallable(() -> exist(mailboxId, mailboxAnnotation));
+    }
+
     /**
      * Getting total number of current annotation on mailbox
      *
      */
     int countAnnotations(MailboxId mailboxId);
+
+    default Publisher<Integer> countAnnotationsReactive(MailboxId mailboxId) {
+        return Mono.fromCallable(() -> countAnnotations(mailboxId));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/04: MailboxAnnotationManager support more reactive api

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4e217b2bdb8ccc5176e5407e43540055312f54d4
Author: Tung Tran <vt...@linagora.com>
AuthorDate: Thu Apr 6 08:10:07 2023 +0700

    MailboxAnnotationManager support more reactive api
---
 .../james/mailbox/MailboxAnnotationManager.java    |  11 ++
 .../apache/james/mailbox/MailboxManagerTest.java   |   6 +-
 .../store/StoreMailboxAnnotationManager.java       | 139 +++++++++++++--------
 .../james/mailbox/store/StoreMailboxManager.java   |  25 ++++
 .../store/StoreMailboxManagerAnnotationTest.java   |  38 +++---
 5 files changed, 148 insertions(+), 71 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxAnnotationManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxAnnotationManager.java
index 740f7ef872..ea47542b9e 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxAnnotationManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxAnnotationManager.java
@@ -27,6 +27,7 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxAnnotation;
 import org.apache.james.mailbox.model.MailboxAnnotationKey;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.reactivestreams.Publisher;
 
 /**
  * <p>
@@ -49,6 +50,8 @@ public interface MailboxAnnotationManager {
      */
     List<MailboxAnnotation> getAllAnnotations(MailboxPath mailboxPath, MailboxSession session) throws MailboxException;
 
+    Publisher<MailboxAnnotation> getAllAnnotationsReactive(MailboxPath mailboxPath, MailboxSession session);
+
     /**
      * Return all mailbox's annotation filter by the list of the keys without order and
      * do not contain any two annotations with the same key
@@ -61,6 +64,8 @@ public interface MailboxAnnotationManager {
      */
     List<MailboxAnnotation> getAnnotationsByKeys(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) throws MailboxException;
 
+    Publisher<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys);
+
     /**
      * Return all mailbox's annotation by the list of the keys and its children entries without order and
      * do not contain any two annotations with the same key
@@ -73,6 +78,8 @@ public interface MailboxAnnotationManager {
      */
     List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) throws MailboxException;
 
+    Publisher<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys);
+
     /**
      * Return all mailbox's annotation by the list of the keys and its below entries without order and
      * do not contain any two annotations with the same key
@@ -85,6 +92,8 @@ public interface MailboxAnnotationManager {
      */
     List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) throws MailboxException;
 
+    Publisher<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys);
+
     /**
      * Update the mailbox's annotations. This method can:
      * - Insert new annotation if it does not exist
@@ -97,4 +106,6 @@ public interface MailboxAnnotationManager {
      * @throws MailboxException in case of selected mailbox does not exist
      */
     void updateAnnotations(MailboxPath mailboxPath, MailboxSession session, List<MailboxAnnotation> mailboxAnnotations) throws MailboxException, AnnotationException;
+
+    Publisher<Void> updateAnnotationsReactive(MailboxPath mailboxPath, MailboxSession session, List<MailboxAnnotation> mailboxAnnotations);
 }
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
index 960a365b6c..a6532130c8 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
@@ -612,7 +612,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
         }
 
         @Test
-        void updateAnnotationsShouldThrowExceptionIfMailboxDoesNotExist() throws Exception {
+        void updateAnnotationsShouldThrowExceptionIfMailboxDoesNotExist() {
             assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Annotation));
             session = mailboxManager.createSystemSession(USER_2);
             MailboxPath inbox = MailboxPath.inbox(session);
@@ -677,7 +677,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
         }
 
         @Test
-        void getAnnotationsByKeysWithOneDepthShouldRetriveAnnotationsWithOneDepth() throws Exception {
+        void getAnnotationsByKeysWithOneDepthShouldRetrieveAnnotationsWithOneDepth() throws Exception {
             assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Annotation));
             session = mailboxManager.createSystemSession(USER_2);
             MailboxPath inbox = MailboxPath.inbox(session);
@@ -700,7 +700,7 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
         }
 
         @Test
-        void getAnnotationsByKeysWithAllDepthShouldRetriveAnnotationsWithAllDepth() throws Exception {
+        void getAnnotationsByKeysWithAllDepthShouldRetrieveAnnotationsWithAllDepth() throws Exception {
             assumeTrue(mailboxManager.hasCapability(MailboxCapabilities.Annotation));
             session = mailboxManager.createSystemSession(USER_2);
             MailboxPath inbox = MailboxPath.inbox(session);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxAnnotationManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxAnnotationManager.java
index b189778798..3ecc60c971 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxAnnotationManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxAnnotationManager.java
@@ -21,6 +21,7 @@ package org.apache.james.mailbox.store;
 
 import java.util.List;
 import java.util.Set;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -30,7 +31,6 @@ import org.apache.james.mailbox.exception.AnnotationException;
 import org.apache.james.mailbox.exception.InsufficientRightsException;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
-import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxACL.Right;
 import org.apache.james.mailbox.model.MailboxAnnotation;
 import org.apache.james.mailbox.model.MailboxAnnotationKey;
@@ -38,13 +38,14 @@ import org.apache.james.mailbox.model.MailboxConstants;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.store.mail.AnnotationMapper;
-import org.apache.james.mailbox.store.mail.MailboxMapper;
-import org.apache.james.mailbox.store.transaction.Mapper;
+import org.apache.james.util.FunctionalUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class StoreMailboxAnnotationManager implements MailboxAnnotationManager {
 
     private final MailboxSessionMapperFactory mailboxSessionMapperFactory;
-
     private final StoreRightManager rightManager;
     private final int limitOfAnnotations;
     private final int limitAnnotationSize;
@@ -68,82 +69,114 @@ public class StoreMailboxAnnotationManager implements MailboxAnnotationManager {
         this.limitAnnotationSize = limitAnnotationSize;
     }
 
-    public MailboxId checkThenGetMailboxId(MailboxPath path, MailboxSession session) throws MailboxException {
-        MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
-        Mailbox mailbox = mailboxMapper.findMailboxByPath(path)
-            .blockOptional()
-            .orElseThrow(() -> new MailboxNotFoundException(path));
-        if (!rightManager.hasRight(mailbox, Right.Read, session)) {
-            throw new InsufficientRightsException("Not enough rights on " + path);
-        }
-        return mailbox.getMailboxId();
+    @Override
+    public List<MailboxAnnotation> getAllAnnotations(MailboxPath mailboxPath, MailboxSession session) throws MailboxException {
+        return MailboxReactorUtils.block(getAllAnnotationsReactive(mailboxPath, session).collectList());
     }
 
     @Override
-    public List<MailboxAnnotation> getAllAnnotations(MailboxPath mailboxPath, MailboxSession session) throws MailboxException {
+    public Flux<MailboxAnnotation> getAllAnnotationsReactive(MailboxPath mailboxPath, MailboxSession session) {
         AnnotationMapper annotationMapper = mailboxSessionMapperFactory.getAnnotationMapper(session);
-
-        MailboxId mailboxId = checkThenGetMailboxId(mailboxPath, session);
-
-        return annotationMapper.execute(
-            () -> annotationMapper.getAllAnnotations(mailboxId));
+        return checkThenGetMailboxId(mailboxPath, session)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxPath)))
+            .flatMapMany(mailboxId -> annotationMapper.executeReactive(Flux.from(annotationMapper.getAllAnnotationsReactive(mailboxId))
+                    .collectList())
+                .flatMapIterable(Function.identity()));
     }
 
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeys(MailboxPath mailboxPath, MailboxSession session, final Set<MailboxAnnotationKey> keys)
-            throws MailboxException {
+        throws MailboxException {
+        return MailboxReactorUtils.block(getAnnotationsByKeysReactive(mailboxPath, session, keys).collectList());
+    }
+
+    @Override
+    public Flux<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) {
         AnnotationMapper annotationMapper = mailboxSessionMapperFactory.getAnnotationMapper(session);
-        MailboxId mailboxId = checkThenGetMailboxId(mailboxPath, session);
 
-        return annotationMapper.execute(
-            () -> annotationMapper.getAnnotationsByKeys(mailboxId, keys));
+        return checkThenGetMailboxId(mailboxPath, session)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxPath)))
+            .flatMapMany(mailboxId -> annotationMapper.executeReactive(Flux.from(annotationMapper.getAnnotationsByKeysReactive(mailboxId, keys))
+                    .collectList())
+                .flatMapIterable(Function.identity()));
     }
 
     @Override
     public void updateAnnotations(MailboxPath mailboxPath, MailboxSession session, List<MailboxAnnotation> mailboxAnnotations)
-            throws MailboxException {
+        throws MailboxException {
+        MailboxReactorUtils.block(updateAnnotationsReactive(mailboxPath, session, mailboxAnnotations));
+    }
+
+    @Override
+    public Mono<Void> updateAnnotationsReactive(MailboxPath mailboxPath, MailboxSession session, List<MailboxAnnotation> mailboxAnnotations) {
         AnnotationMapper annotationMapper = mailboxSessionMapperFactory.getAnnotationMapper(session);
-        MailboxId mailboxId = checkThenGetMailboxId(mailboxPath, session);
-
-        annotationMapper.execute(Mapper.toTransaction(() -> {
-            for (MailboxAnnotation annotation : mailboxAnnotations) {
-                if (annotation.isNil()) {
-                    annotationMapper.deleteAnnotation(mailboxId, annotation.getKey());
-                } else if (canInsertOrUpdate(mailboxId, annotation, annotationMapper)) {
-                    annotationMapper.insertAnnotation(mailboxId, annotation);
-                }
-            }
-        }));
+        return annotationMapper.executeReactive(checkThenGetMailboxId(mailboxPath, session)
+                .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxPath)))
+            .flatMapMany(mailboxId -> Flux.fromIterable(mailboxAnnotations)
+                .concatMap(annotation -> {
+                    if (annotation.isNil()) {
+                        return Mono.from(annotationMapper.deleteAnnotationReactive(mailboxId, annotation.getKey()));
+                    }
+                    return canInsertOrUpdate(mailboxId, annotation, annotationMapper)
+                        .filter(FunctionalUtils.identityPredicate())
+                        .flatMap(can -> Mono.from(annotationMapper.insertAnnotationReactive(mailboxId, annotation)));
+                }))
+            .then());
+    }
+
+    private Mono<Boolean> canInsertOrUpdate(MailboxId mailboxId, MailboxAnnotation annotation, AnnotationMapper annotationMapper) {
+        return Mono.just(annotation.size() > limitAnnotationSize)
+            .filter(FunctionalUtils.identityPredicate())
+            .flatMap(limited -> Mono.<Boolean>error(new AnnotationException("annotation too big.")))
+            .switchIfEmpty(annotationCountCanInsertOrUpdate(mailboxId, annotation, annotationMapper));
     }
 
-    private boolean canInsertOrUpdate(MailboxId mailboxId, MailboxAnnotation annotation, AnnotationMapper annotationMapper) throws AnnotationException {
-        if (annotation.size() > limitAnnotationSize) {
-            throw new AnnotationException("annotation too big.");
-        }
-        if (!annotationMapper.exist(mailboxId, annotation)
-            && annotationMapper.countAnnotations(mailboxId) >= limitOfAnnotations) {
-            throw new AnnotationException("too many annotations.");
-        }
-        return true;
+    private Mono<Boolean> annotationCountCanInsertOrUpdate(MailboxId mailboxId, MailboxAnnotation annotation, AnnotationMapper annotationMapper) {
+        return Mono.from(annotationMapper.existReactive(mailboxId, annotation))
+            .filter(FunctionalUtils.identityPredicate().negate())
+            .flatMap(exist -> Mono.from(annotationMapper.countAnnotationsReactive(mailboxId))
+                .filter(count -> count >= limitOfAnnotations)
+                .flatMap(limited -> Mono.<Boolean>error(new AnnotationException("too many annotations."))))
+            .switchIfEmpty(Mono.just(true));
     }
 
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeysWithOneDepth(MailboxPath mailboxPath, MailboxSession session,
-            Set<MailboxAnnotationKey> keys) throws MailboxException {
-        AnnotationMapper annotationMapper = mailboxSessionMapperFactory.getAnnotationMapper(session);
-        final MailboxId mailboxId = checkThenGetMailboxId(mailboxPath, session);
+                                                                    Set<MailboxAnnotationKey> keys) throws MailboxException {
+        return MailboxReactorUtils.block(getAnnotationsByKeysWithOneDepthReactive(mailboxPath, session, keys).collectList());
+    }
 
-        return annotationMapper.execute(
-            () -> annotationMapper.getAnnotationsByKeysWithOneDepth(mailboxId, keys));
+    @Override
+    public Flux<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) {
+        AnnotationMapper annotationMapper = mailboxSessionMapperFactory.getAnnotationMapper(session);
+        return checkThenGetMailboxId(mailboxPath, session)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxPath)))
+            .flatMapMany(mailboxId -> annotationMapper.executeReactive(Flux.from(annotationMapper.getAnnotationsByKeysWithOneDepthReactive(mailboxId, keys))
+                    .collectList())
+                .flatMapIterable(Function.identity()));
     }
 
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxPath mailboxPath, MailboxSession session,
-            Set<MailboxAnnotationKey> keys) throws MailboxException {
+                                                                    Set<MailboxAnnotationKey> keys) throws MailboxException {
+        return MailboxReactorUtils.block(getAnnotationsByKeysWithAllDepthReactive(mailboxPath, session, keys).collectList());
+    }
+
+    @Override
+    public Flux<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) {
         AnnotationMapper annotationMapper = mailboxSessionMapperFactory.getAnnotationMapper(session);
-        MailboxId mailboxId = checkThenGetMailboxId(mailboxPath, session);
+        return checkThenGetMailboxId(mailboxPath, session)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxPath)))
+            .flatMapMany(mailboxId -> annotationMapper.executeReactive(Flux.from(annotationMapper.getAnnotationsByKeysWithAllDepthReactive(mailboxId, keys))
+                    .collectList())
+                .flatMapIterable(Function.identity()));
+    }
 
-        return annotationMapper.execute(
-            () -> annotationMapper.getAnnotationsByKeysWithAllDepth(mailboxId, keys));
+    private Mono<MailboxId> checkThenGetMailboxId(MailboxPath mailboxPath, MailboxSession session) {
+        return mailboxSessionMapperFactory.getMailboxMapper(session).findMailboxByPath(mailboxPath)
+            .flatMap(mailbox -> Mono.from(rightManager.hasRightReactive(mailboxPath, Right.Read, session))
+                .filter(FunctionalUtils.identityPredicate().negate())
+                .flatMap(hasRight -> Mono.<MailboxId>error(new InsufficientRightsException("Not enough rights on " + mailboxPath)))
+                .switchIfEmpty(Mono.just(mailbox.getMailboxId())));
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 788c53b47e..d69ff57375 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -1022,18 +1022,33 @@ public class StoreMailboxManager implements MailboxManager {
         return annotationManager.getAllAnnotations(mailboxPath, session);
     }
 
+    @Override
+    public Publisher<MailboxAnnotation> getAllAnnotationsReactive(MailboxPath mailboxPath, MailboxSession session) {
+        return annotationManager.getAllAnnotationsReactive(mailboxPath, session);
+    }
+
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeys(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys)
         throws MailboxException {
         return annotationManager.getAnnotationsByKeys(mailboxPath, session, keys);
     }
 
+    @Override
+    public Publisher<MailboxAnnotation> getAnnotationsByKeysReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) {
+        return annotationManager.getAnnotationsByKeysReactive(mailboxPath, session, keys);
+    }
+
     @Override
     public void updateAnnotations(MailboxPath mailboxPath, MailboxSession session, List<MailboxAnnotation> mailboxAnnotations)
         throws MailboxException {
         annotationManager.updateAnnotations(mailboxPath, session, mailboxAnnotations);
     }
 
+    @Override
+    public Publisher<Void> updateAnnotationsReactive(MailboxPath mailboxPath, MailboxSession session, List<MailboxAnnotation> mailboxAnnotations) {
+        return annotationManager.updateAnnotationsReactive(mailboxPath, session, mailboxAnnotations);
+    }
+
 
     @Override
     public boolean hasCapability(MailboxCapabilities capability) {
@@ -1046,12 +1061,22 @@ public class StoreMailboxManager implements MailboxManager {
         return annotationManager.getAnnotationsByKeysWithOneDepth(mailboxPath, session, keys);
     }
 
+    @Override
+    public Publisher<MailboxAnnotation> getAnnotationsByKeysWithOneDepthReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) {
+        return annotationManager.getAnnotationsByKeysWithOneDepthReactive(mailboxPath, session, keys);
+    }
+
     @Override
     public List<MailboxAnnotation> getAnnotationsByKeysWithAllDepth(MailboxPath mailboxPath, MailboxSession session,
                                                                     Set<MailboxAnnotationKey> keys) throws MailboxException {
         return annotationManager.getAnnotationsByKeysWithAllDepth(mailboxPath, session, keys);
     }
 
+    @Override
+    public Publisher<MailboxAnnotation> getAnnotationsByKeysWithAllDepthReactive(MailboxPath mailboxPath, MailboxSession session, Set<MailboxAnnotationKey> keys) {
+        return annotationManager.getAnnotationsByKeysWithAllDepthReactive(mailboxPath, session, keys);
+    }
+
     @Override
     public boolean hasChildren(MailboxPath mailboxPath, MailboxSession session) throws MailboxException {
         MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session);
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxManagerAnnotationTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxManagerAnnotationTest.java
index 0424b4ce84..338f7a5299 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxManagerAnnotationTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxManagerAnnotationTest.java
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -53,6 +52,7 @@ import org.mockito.MockitoAnnotations;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class StoreMailboxManagerAnnotationTest {
@@ -93,8 +93,13 @@ class StoreMailboxManagerAnnotationTest {
                 Mapper.Transaction<?> transaction = (Mapper.Transaction<?>) invocationOnMock.getArguments()[0];
                 return transaction.run();
             });
+        when(annotationMapper.executeReactive(any(Mono.class)))
+            .thenAnswer(invocationOnMock -> invocationOnMock.getArguments()[0]);
+
         when(storeRightManager.hasRight(any(Mailbox.class), any(MailboxACL.Right.class), any(MailboxSession.class)))
             .thenReturn(true);
+        when(storeRightManager.hasRightReactive(any(MailboxPath.class), any(MailboxACL.Right.class), any(MailboxSession.class)))
+            .thenReturn(Mono.just(true));
 
         annotationManager = spy(new StoreMailboxAnnotationManager(mailboxSessionMapperFactory,
             storeRightManager));
@@ -105,24 +110,27 @@ class StoreMailboxManagerAnnotationTest {
         when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.error(new MailboxException()));
 
         assertThatThrownBy(() -> annotationManager.updateAnnotations(mailboxPath, session, ImmutableList.of(PRIVATE_ANNOTATION)))
-            .hasCauseInstanceOf(MailboxException.class);
+            .isInstanceOf(MailboxException.class);
     }
 
     @Test
-    void updateAnnotationsShouldCallAnnotationMapperToInsertAnnotation() throws Exception {
-        when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.just(mailbox));
+    void updateAnnotationsShouldCallAnnotationMapperToInsertAnnotation() throws MailboxException {
+        when(annotationMapper.existReactive(eq(mailbox.getMailboxId()), any())).thenReturn(Mono.just(true));
+        when(annotationMapper.insertAnnotationReactive(eq(mailbox.getMailboxId()), any())).thenReturn(Mono.empty());
         annotationManager.updateAnnotations(mailboxPath, session, ANNOTATIONS);
 
-        verify(annotationMapper, times(2)).insertAnnotation(eq(mailboxId), any(MailboxAnnotation.class));
+        verify(annotationMapper, times(2)).insertAnnotationReactive(eq(mailboxId), any(MailboxAnnotation.class));
     }
 
     @Test
-    void updateAnnotationsShouldCallAnnotationMapperToDeleteAnnotation() throws Exception {
-        when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.just(mailbox));
+    void updateAnnotationsShouldCallAnnotationMapperToDeleteAnnotation() throws MailboxException {
+        when(annotationMapper.existReactive(eq(mailbox.getMailboxId()), any())).thenReturn(Mono.just(true));
+        when(annotationMapper.insertAnnotationReactive(eq(mailbox.getMailboxId()), any())).thenReturn(Mono.empty());
+        when(annotationMapper.deleteAnnotationReactive(eq(mailbox.getMailboxId()), any())).thenReturn(Mono.empty());
         annotationManager.updateAnnotations(mailboxPath, session, ANNOTATIONS_WITH_NIL_ENTRY);
 
-        verify(annotationMapper, times(1)).insertAnnotation(eq(mailboxId), eq(PRIVATE_ANNOTATION));
-        verify(annotationMapper, times(1)).deleteAnnotation(eq(mailboxId), eq(SHARED_KEY));
+        verify(annotationMapper, times(1)).insertAnnotationReactive(eq(mailboxId), eq(PRIVATE_ANNOTATION));
+        verify(annotationMapper, times(1)).deleteAnnotationReactive(eq(mailboxId), eq(SHARED_KEY));
     }
 
     @Test
@@ -130,13 +138,13 @@ class StoreMailboxManagerAnnotationTest {
         when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.error(new MailboxException()));
 
         assertThatThrownBy(() -> annotationManager.getAllAnnotations(mailboxPath, session))
-            .hasCauseInstanceOf(MailboxException.class);
+            .isInstanceOf(MailboxException.class);
     }
 
     @Test
     void getAllAnnotationsShouldReturnEmptyForNonStoredAnnotation() throws Exception {
         when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.just(mailbox));
-        when(annotationMapper.getAllAnnotations(eq(mailboxId))).thenReturn(Collections.<MailboxAnnotation>emptyList());
+        when(annotationMapper.getAllAnnotationsReactive(eq(mailboxId))).thenReturn(Flux.fromIterable(List.of()));
 
         assertThat(annotationManager.getAllAnnotations(mailboxPath, session)).isEmpty();
     }
@@ -144,7 +152,7 @@ class StoreMailboxManagerAnnotationTest {
     @Test
     void getAllAnnotationsShouldReturnStoredAnnotation() throws Exception {
         when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.just(mailbox));
-        when(annotationMapper.getAllAnnotations(eq(mailboxId))).thenReturn(ANNOTATIONS);
+        when(annotationMapper.getAllAnnotationsReactive(eq(mailboxId))).thenReturn(Flux.fromIterable(ANNOTATIONS));
 
         assertThat(annotationManager.getAllAnnotations(mailboxPath, session)).isEqualTo(ANNOTATIONS);
     }
@@ -154,13 +162,13 @@ class StoreMailboxManagerAnnotationTest {
         when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.error(new MailboxException()));
 
         assertThatThrownBy(() -> annotationManager.getAnnotationsByKeys(mailboxPath, session, KEYS))
-            .hasCauseInstanceOf(MailboxException.class);
+            .isInstanceOf(MailboxException.class);
     }
 
     @Test
-    void getAnnotationsByKeysShouldRetrieveStoreAnnotationsByKey() throws Exception {
+    void getAnnotationsByKeysShouldRetrieveStoreAnnotationsByKey() throws MailboxException {
         when(mailboxMapper.findMailboxByPath(eq(mailboxPath))).thenReturn(Mono.just(mailbox));
-        when(annotationMapper.getAnnotationsByKeys(eq(mailboxId), eq(KEYS))).thenReturn(ANNOTATIONS);
+        when(annotationMapper.getAnnotationsByKeysReactive(eq(mailboxId), eq(KEYS))).thenReturn(Flux.fromIterable(ANNOTATIONS));
 
         assertThat(annotationManager.getAnnotationsByKeys(mailboxPath, session, KEYS)).isEqualTo(ANNOTATIONS);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/04: IMAP - Reactive GetMetadata & SetMetadata processor

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 96fa02364905b9c92dd2716a60f6e696e11cbb4a
Author: Tung Tran <vt...@linagora.com>
AuthorDate: Thu Apr 6 08:10:52 2023 +0700

    IMAP - Reactive GetMetadata & SetMetadata processor
---
 .../james/imap/processor/GetMetadataProcessor.java | 83 +++++++++-------------
 .../james/imap/processor/SetMetadataProcessor.java | 34 ++++-----
 .../imap/processor/GetAnnotationProcessorTest.java | 73 ++++++++++++-------
 .../imap/processor/SetMetadataProcessorTest.java   | 21 +++---
 4 files changed, 109 insertions(+), 102 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/GetMetadataProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/GetMetadataProcessor.java
index b563b1142c..e01d87579c 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/GetMetadataProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/GetMetadataProcessor.java
@@ -19,7 +19,8 @@
 
 package org.apache.james.imap.processor;
 
-import java.util.Comparator;
+import static org.apache.james.util.ReactorUtils.logOnError;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -51,7 +52,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSortedSet;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * Support for RFC-5464 IMAP METADATA (GETMETADATA command)
@@ -82,27 +85,23 @@ public class GetMetadataProcessor extends AbstractMailboxProcessor<GetMetadataRe
     }
 
     @Override
-    protected void processRequest(GetMetadataRequest request, ImapSession session, Responder responder) {
-        try {
-            proceed(request, session, responder);
-        } catch (MailboxNotFoundException e) {
-            LOGGER.info("The command: {} is failed because not found mailbox {}", request.getCommand().getName(), request.getMailboxName());
-            no(request, responder, HumanReadableText.FAILURE_NO_SUCH_MAILBOX, ResponseCode.tryCreate());
-        } catch (MailboxException e) {
-            LOGGER.error("GetAnnotation on mailbox {} failed for user {}", request.getMailboxName(), session.getUserName(), e);
-            no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
-        }
-    }
-
-    private void proceed(GetMetadataRequest message, ImapSession session, Responder responder) throws MailboxException {
-        String mailboxName = message.getMailboxName();
-        Optional<Integer> maxsize = message.getMaxsize();
-        MailboxPath mailboxPath = PathConverter.forSession(session).buildFullPath(mailboxName);
-
-        List<MailboxAnnotation> mailboxAnnotations = getMailboxAnnotations(session, message.getKeys(), message.getDepth(), mailboxPath);
-        Optional<Integer> maximumOversizedSize = getMaxSizeValue(mailboxAnnotations, maxsize);
-
-        respond(message, responder, mailboxName, mailboxAnnotations, maxsize, maximumOversizedSize);
+    protected Mono<Void> processRequestReactive(GetMetadataRequest request, ImapSession session, Responder responder) {
+        String mailboxName = request.getMailboxName();
+        Optional<Integer> maxsize = request.getMaxsize();
+
+        return getMailboxAnnotations(session, request.getKeys(), request.getDepth(), PathConverter.forSession(session).buildFullPath(mailboxName))
+            .collectList()
+            .flatMap(mailboxAnnotations -> Mono.fromCallable(() -> getMaxSizeValue(mailboxAnnotations, maxsize))
+                .flatMap(maximumOversizedSize -> Mono.fromRunnable(() -> respond(request, responder, mailboxName, mailboxAnnotations, maxsize, maximumOversizedSize)))
+                .then())
+            .doOnEach(logOnError(MailboxNotFoundException.class,
+                e -> LOGGER.info("The command: {} is failed because not found mailbox {}", request.getCommand().getName(), request.getMailboxName())))
+            .onErrorResume(MailboxNotFoundException.class,
+                error -> Mono.fromRunnable(() -> no(request, responder, HumanReadableText.FAILURE_NO_SUCH_MAILBOX, ResponseCode.tryCreate())))
+            .doOnEach(logOnError(MailboxException.class,
+                e -> LOGGER.error("GetAnnotation on mailbox {} failed for user {}", request.getMailboxName(), session.getUserName(), e)))
+            .onErrorResume(MailboxException.class,
+                error -> Mono.fromRunnable(() -> no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING)));
     }
 
     private void respond(ImapRequest request, Responder responder, String mailboxName,
@@ -116,11 +115,11 @@ public class GetMetadataProcessor extends AbstractMailboxProcessor<GetMetadataRe
         }
     }
 
-    private Optional<Integer> getMaxSizeValue(final List<MailboxAnnotation> mailboxAnnotations, Optional<Integer> maxsize) {
-        if (maxsize.isPresent()) {
-            return maxsize.map(value -> getMaxSizeOfOversizedItems(mailboxAnnotations, value)).get();
-        }
-        return Optional.empty();
+    private Optional<Integer> getMaxSizeValue(List<MailboxAnnotation> mailboxAnnotations, Optional<Integer> maxsize) {
+        return maxsize.flatMap(value -> mailboxAnnotations.stream()
+            .map(MailboxAnnotation::size)
+            .filter(size -> size > value)
+            .reduce(Integer::max));
     }
 
     private List<MailboxAnnotation> filterItemsBySize(List<MailboxAnnotation> mailboxAnnotations, final Optional<Integer> maxsize) {
@@ -133,40 +132,26 @@ public class GetMetadataProcessor extends AbstractMailboxProcessor<GetMetadataRe
             .collect(ImmutableList.toImmutableList());
     }
 
-    private List<MailboxAnnotation> getMailboxAnnotations(ImapSession session, Set<MailboxAnnotationKey> keys, GetMetadataRequest.Depth depth, MailboxPath mailboxPath) throws MailboxException {
+    private Flux<MailboxAnnotation> getMailboxAnnotations(ImapSession session, Set<MailboxAnnotationKey> keys, GetMetadataRequest.Depth depth, MailboxPath mailboxPath) {
         MailboxSession mailboxSession = session.getMailboxSession();
         switch (depth) {
             case ZERO:
                 return getMailboxAnnotationsWithDepthZero(keys, mailboxPath, mailboxSession);
             case ONE:
-                return getMailboxManager().getAnnotationsByKeysWithOneDepth(mailboxPath, mailboxSession, keys);
+                return Flux.from(getMailboxManager().getAnnotationsByKeysWithOneDepthReactive(mailboxPath, mailboxSession, keys));
             case INFINITY:
-                return getMailboxManager().getAnnotationsByKeysWithAllDepth(mailboxPath, mailboxSession, keys);
+                return Flux.from(getMailboxManager().getAnnotationsByKeysWithAllDepthReactive(mailboxPath, mailboxSession, keys));
             default:
-                throw new NotImplementedException("Not implemented");
+                return Flux.error(new NotImplementedException("Not implemented"));
         }
     }
 
-    private List<MailboxAnnotation> getMailboxAnnotationsWithDepthZero(Set<MailboxAnnotationKey> keys, MailboxPath mailboxPath, MailboxSession mailboxSession) throws MailboxException {
+    private Flux<MailboxAnnotation> getMailboxAnnotationsWithDepthZero(Set<MailboxAnnotationKey> keys, MailboxPath mailboxPath, MailboxSession mailboxSession) {
         if (keys.isEmpty()) {
-            return getMailboxManager().getAllAnnotations(mailboxPath, mailboxSession);
+            return Flux.from(getMailboxManager().getAllAnnotationsReactive(mailboxPath, mailboxSession));
         } else {
-            return getMailboxManager().getAnnotationsByKeys(mailboxPath, mailboxSession, keys);
-        }
-    }
-
-    private Optional<Integer> getMaxSizeOfOversizedItems(List<MailboxAnnotation> mailboxAnnotations, final Integer maxsize) {
-        Predicate<MailboxAnnotation> filterOverSizedAnnotation = annotation -> annotation.size() > maxsize;
-
-        ImmutableSortedSet<Integer> overLimitSizes = mailboxAnnotations.stream()
-            .filter(filterOverSizedAnnotation)
-            .map(MailboxAnnotation::size)
-            .collect(ImmutableSortedSet.toImmutableSortedSet(Comparator.reverseOrder()));
-
-        if (overLimitSizes.isEmpty()) {
-            return Optional.empty();
+            return Flux.from(getMailboxManager().getAnnotationsByKeysReactive(mailboxPath, mailboxSession, keys));
         }
-        return Optional.of(overLimitSizes.first());
     }
 
     @Override
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/SetMetadataProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/SetMetadataProcessor.java
index 4b9bc686ab..1317bef71c 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/SetMetadataProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/SetMetadataProcessor.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.imap.processor;
 
+import static org.apache.james.util.ReactorUtils.logOnError;
+
 import java.util.List;
 import java.util.Optional;
 
@@ -37,7 +39,6 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.exception.AnnotationException;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
-import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
@@ -45,6 +46,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Support for RFC-5464 IMAP METADATA (SETMETADATA command)
  *
@@ -74,26 +77,23 @@ public class SetMetadataProcessor extends AbstractMailboxProcessor<SetMetadataRe
     }
 
     @Override
-    protected void processRequest(SetMetadataRequest request, ImapSession session, Responder responder) {
+    protected Mono<Void> processRequestReactive(SetMetadataRequest request, ImapSession session, Responder responder) {
         final MailboxManager mailboxManager = getMailboxManager();
         final MailboxSession mailboxSession = session.getMailboxSession();
         final String mailboxName = request.getMailboxName();
-        try {
-            MailboxPath mailboxPath = PathConverter.forSession(session).buildFullPath(mailboxName);
-
-            mailboxManager.updateAnnotations(mailboxPath, mailboxSession, request.getMailboxAnnotations());
 
-            okComplete(request, responder);
-        } catch (MailboxNotFoundException e) {
-            LOGGER.info("{} failed for mailbox {}", request.getCommand().getName(), mailboxName, e);
-            no(request, responder, HumanReadableText.FAILURE_NO_SUCH_MAILBOX, StatusResponse.ResponseCode.tryCreate());
-        } catch (AnnotationException e) {
-            LOGGER.info("{} failed for mailbox {}", request.getCommand().getName(), mailboxName, e);
-            no(request, responder, new HumanReadableText(HumanReadableText.MAILBOX_ANNOTATION_KEY, e.getMessage()));
-        } catch (MailboxException e) {
-            LOGGER.error("{} failed for mailbox {}", request.getCommand().getName(), mailboxName, e);
-            no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING);
-        }
+        return Mono.from(mailboxManager.updateAnnotationsReactive(
+                PathConverter.forSession(session).buildFullPath(mailboxName),
+                mailboxSession, request.getMailboxAnnotations()))
+            .then(Mono.fromRunnable(() -> okComplete(request, responder)).then())
+            .doOnEach(logOnError(MailboxException.class,
+                e -> LOGGER.info("{} failed for mailbox {}", request.getCommand().getName(), mailboxName, e)))
+            .onErrorResume(MailboxNotFoundException.class,
+                error -> Mono.fromRunnable(() -> no(request, responder, HumanReadableText.FAILURE_NO_SUCH_MAILBOX, StatusResponse.ResponseCode.tryCreate())))
+            .onErrorResume(AnnotationException.class,
+                error -> Mono.fromRunnable(() -> no(request, responder, new HumanReadableText(HumanReadableText.MAILBOX_ANNOTATION_KEY, error.getMessage()))))
+            .onErrorResume(MailboxException.class,
+                error -> Mono.fromRunnable(() -> no(request, responder, HumanReadableText.GENERIC_FAILURE_DURING_PROCESSING)));
     }
 
     @Override
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/GetAnnotationProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/GetAnnotationProcessorTest.java
index 6daee8a4ed..12ac146a9e 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/GetAnnotationProcessorTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/GetAnnotationProcessorTest.java
@@ -65,6 +65,7 @@ import org.mockito.stubbing.Answer;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class GetAnnotationProcessorTest {
@@ -136,7 +137,11 @@ class GetAnnotationProcessorTest {
 
     @Test
     void processShouldResponseNoWithFailureWhenMailboxDoesNotExist() throws Exception {
-        doThrow(MailboxNotFoundException.class).when(mockMailboxManager).getAllAnnotations(eq(inbox), eq(mailboxSession));
+
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.error(new MailboxNotFoundException("")));
+
+
         when(mockStatusResponseFactory.taggedNo(any(Tag.class), any(ImapCommand.class), any(HumanReadableText.class), any(ResponseCode.class)))
             .thenReturn(statusResponse);
 
@@ -152,7 +157,9 @@ class GetAnnotationProcessorTest {
 
     @Test
     void processShouldResponseNoWithGenericFailureWhenManagerThrowMailboxException() throws Exception {
-        doThrow(MailboxException.class).when(mockMailboxManager).getAllAnnotations(eq(inbox), eq(mailboxSession));
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.error(new MailboxException("")));
+
         when(mockStatusResponseFactory.taggedNo(any(Tag.class), any(ImapCommand.class), any(HumanReadableText.class)))
             .thenReturn(statusResponse);
 
@@ -166,10 +173,12 @@ class GetAnnotationProcessorTest {
     }
 
     @Test
-    void processShouldGetAllAnnotationsAndReturnCompleteResponse() throws Exception {
+    void processShouldGetAllAnnotationsAndReturnCompleteResponse() {
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.empty());
         processor.process(annotationRequestBuilder.build(), mockResponder, imapSession);
 
-        verify(mockMailboxManager, times(1)).getAllAnnotations(inbox, mailboxSession);
+        verify(mockMailboxManager, times(1)).getAllAnnotationsReactive(inbox, mailboxSession);
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class), any(ImapCommand.class), humanTextCaptor.capture());
         verify(mockResponder, times(2)).respond(captorAnnotationResponse.capture());
 
@@ -179,10 +188,12 @@ class GetAnnotationProcessorTest {
     }
 
     @Test
-    void processShouldGetAnnotationsByKeysAndReturnCompleteResponse() throws Exception {
+    void processShouldGetAnnotationsByKeysAndReturnCompleteResponse() {
+        when(mockMailboxManager.getAnnotationsByKeysReactive(inbox, mailboxSession, keys))
+            .thenReturn(Flux.empty());
         processor.process(annotationRequestBuilder.keys(keys).build(), mockResponder, imapSession);
 
-        verify(mockMailboxManager, times(1)).getAnnotationsByKeys(eq(inbox), eq(mailboxSession), eq(keys));
+        verify(mockMailboxManager, times(1)).getAnnotationsByKeysReactive(eq(inbox), eq(mailboxSession), eq(keys));
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class), any(ImapCommand.class), humanTextCaptor.capture());
         verify(mockResponder, times(2)).respond(captorAnnotationResponse.capture());
         verifyNoMoreInteractions(mockResponder);
@@ -191,15 +202,16 @@ class GetAnnotationProcessorTest {
     }
 
     @Test
-    void processShouldGetAnnotationsAndReturnCompleteResponseWithTheLongestEntryInfoWhenLimitMaxsize() throws Exception {
-        when(mockMailboxManager.getAllAnnotations(inbox, mailboxSession)).thenReturn(ImmutableList.of(PRIVATE_ANNOTATION, SHARED_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION));
+    void processShouldGetAnnotationsAndReturnCompleteResponseWithTheLongestEntryInfoWhenLimitMaxsize() {
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(PRIVATE_ANNOTATION, SHARED_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.maxsize(Optional.of(10)).build(), mockResponder, imapSession);
 
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class),
-                any(ImapCommand.class),
-                humanTextCaptor.capture(),
-                captorResponsecode.capture());
+            any(ImapCommand.class),
+            humanTextCaptor.capture(),
+            captorResponsecode.capture());
         verify(mockResponder, times(2)).respond(captorAnnotationResponse.capture());
         verifyNoMoreInteractions(mockResponder);
 
@@ -208,14 +220,15 @@ class GetAnnotationProcessorTest {
     }
 
     @Test
-    void processShouldGetAnnotationsAndReturnCompleteResponseDoesNotTruncateDataByMaxsize() throws Exception {
-        when(mockMailboxManager.getAllAnnotations(inbox, mailboxSession)).thenReturn(ImmutableList.of(PRIVATE_ANNOTATION, SHARED_ANNOTATION));
+    void processShouldGetAnnotationsAndReturnCompleteResponseDoesNotTruncateDataByMaxsize() {
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(PRIVATE_ANNOTATION, SHARED_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.maxsize(Optional.of(100)).build(), mockResponder, imapSession);
 
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class),
-                any(ImapCommand.class),
-                humanTextCaptor.capture());
+            any(ImapCommand.class),
+            humanTextCaptor.capture());
         verify(mockResponder, times(2)).respond(captorAnnotationResponse.capture());
         verifyNoMoreInteractions(mockResponder);
 
@@ -223,8 +236,9 @@ class GetAnnotationProcessorTest {
     }
 
     @Test
-    void processShouldGetAnnotationsAndReturnCompleteResponseWithTruncateDataByMaxsize() throws Exception {
-        when(mockMailboxManager.getAllAnnotations(inbox, mailboxSession)).thenReturn(ImmutableList.of(SHARED_ANNOTATION, PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION));
+    void processShouldGetAnnotationsAndReturnCompleteResponseWithTruncateDataByMaxsize() {
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(SHARED_ANNOTATION, PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.maxsize(Optional.of(15)).build(), mockResponder, imapSession);
 
@@ -240,8 +254,9 @@ class GetAnnotationProcessorTest {
     }
 
     @Test
-    void processShouldGetAnnotationsAndReturnCompleteResponseDoesnotTruncateDataByMaxsizeWhenNoMoreOverSizeItem() throws Exception {
-        when(mockMailboxManager.getAllAnnotations(inbox, mailboxSession)).thenReturn(ImmutableList.of(SHARED_ANNOTATION, PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION));
+    void processShouldGetAnnotationsAndReturnCompleteResponseDoesnotTruncateDataByMaxsizeWhenNoMoreOverSizeItem() {
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(SHARED_ANNOTATION, PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.maxsize(Optional.of(100)).build(), mockResponder, imapSession);
 
@@ -256,12 +271,13 @@ class GetAnnotationProcessorTest {
     }
 
     @Test
-    void processShouldGetAnnotationsByOneDepthAndReturnCompleteResponseWithTruncateDataByMaxsize() throws Exception {
-        when(mockMailboxManager.getAnnotationsByKeysWithOneDepth(inbox, mailboxSession, keys)).thenReturn(ImmutableList.of(PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION));
+    void processShouldGetAnnotationsByOneDepthAndReturnCompleteResponseWithTruncateDataByMaxsize() {
+        when(mockMailboxManager.getAnnotationsByKeysWithOneDepthReactive(inbox, mailboxSession, keys))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.maxsize(Optional.of(14)).depth(Depth.ONE).keys(keys).build(), mockResponder, imapSession);
 
-        verify(mockMailboxManager, times(1)).getAnnotationsByKeysWithOneDepth(eq(inbox), eq(mailboxSession), eq(keys));
+        verify(mockMailboxManager, times(1)).getAnnotationsByKeysWithOneDepthReactive(eq(inbox), eq(mailboxSession), eq(keys));
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class),
             any(ImapCommand.class),
             humanTextCaptor.capture(),
@@ -275,11 +291,12 @@ class GetAnnotationProcessorTest {
 
     @Test
     void processShouldGetAnnotationsAndReturnCompleteResponseWithTruncateDataByLessThenOrEqualMaxsize() throws Exception {
-        when(mockMailboxManager.getAllAnnotations(inbox, mailboxSession)).thenReturn(ImmutableList.of(PRIVATE_ANNOTATION, SHARED_ANNOTATION));
+        when(mockMailboxManager.getAllAnnotationsReactive(inbox, mailboxSession))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(PRIVATE_ANNOTATION, SHARED_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.maxsize(Optional.of(15)).build(), mockResponder, imapSession);
 
-        verify(mockMailboxManager, times(1)).getAllAnnotations(eq(inbox), eq(mailboxSession));
+        verify(mockMailboxManager, times(1)).getAllAnnotationsReactive(eq(inbox), eq(mailboxSession));
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class),
             any(ImapCommand.class),
             humanTextCaptor.capture());
@@ -293,11 +310,12 @@ class GetAnnotationProcessorTest {
 
     @Test
     void processShouldGetAnnotationsByInfinityDepthAndReturnCompleteResponseWithTruncateDataByMaxsize() throws Exception {
-        when(mockMailboxManager.getAnnotationsByKeysWithAllDepth(inbox, mailboxSession, keys)).thenReturn(ImmutableList.of(PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION));
+        when(mockMailboxManager.getAnnotationsByKeysWithAllDepthReactive(inbox, mailboxSession, keys))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.maxsize(Optional.of(14)).depth(Depth.INFINITY).keys(keys).build(), mockResponder, imapSession);
 
-        verify(mockMailboxManager, times(1)).getAnnotationsByKeysWithAllDepth(eq(inbox), eq(mailboxSession), eq(keys));
+        verify(mockMailboxManager, times(1)).getAnnotationsByKeysWithAllDepthReactive(eq(inbox), eq(mailboxSession), eq(keys));
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class),
             any(ImapCommand.class),
             humanTextCaptor.capture(),
@@ -312,7 +330,8 @@ class GetAnnotationProcessorTest {
 
     @Test
     void processShouldGetAnnotationsByInfinityDepthAndReturnCompleteResponse() throws Exception {
-        when(mockMailboxManager.getAnnotationsByKeysWithAllDepth(inbox, mailboxSession, keys)).thenReturn(ImmutableList.of(PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION));
+        when(mockMailboxManager.getAnnotationsByKeysWithAllDepthReactive(inbox, mailboxSession, keys))
+            .thenReturn(Flux.fromIterable(ImmutableList.of(PRIVATE_ANNOTATION, PRIVATE_CHILD_ANNOTATION, PRIVATE_GRANDCHILD_ANNOTATION)));
 
         processor.process(annotationRequestBuilder.depth(Depth.INFINITY).keys(keys).build(), mockResponder, imapSession);
 
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/SetMetadataProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/SetMetadataProcessorTest.java
index 885dd4a6ab..a61bca7656 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/SetMetadataProcessorTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/SetMetadataProcessorTest.java
@@ -23,7 +23,6 @@ import static org.apache.james.imap.ImapFixture.TAG;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -113,9 +112,9 @@ class SetMetadataProcessorTest {
     }
 
     @Test
-    void processShouldResponseNoWithNoSuchMailboxWhenManagerThrowMailboxNotFoundException() throws Exception {
-        doThrow(MailboxNotFoundException.class).when(mockMailboxManager).updateAnnotations(eq(inbox),
-            eq(mockMailboxSession), eq(mailboxAnnotations));
+    void processShouldResponseNoWithNoSuchMailboxWhenManagerThrowMailboxNotFoundException() {
+        when(mockMailboxManager.updateAnnotationsReactive(eq(inbox), eq(mockMailboxSession), eq(mailboxAnnotations)))
+            .thenReturn(Mono.error(new MailboxNotFoundException("")));
 
         processor.process(request, mockResponder, imapSession);
 
@@ -126,8 +125,9 @@ class SetMetadataProcessorTest {
     }
 
     @Test
-    void processShouldResponseNoWithGenericFailureWhenManagerThrowMailboxException() throws Exception {
-        doThrow(MailboxException.class).when(mockMailboxManager).updateAnnotations(eq(inbox), eq(mockMailboxSession), eq(mailboxAnnotations));
+    void processShouldResponseNoWithGenericFailureWhenManagerThrowMailboxException() {
+        when(mockMailboxManager.updateAnnotationsReactive(eq(inbox), eq(mockMailboxSession), eq(mailboxAnnotations)))
+            .thenReturn(Mono.error(new MailboxException("")));
 
         processor.process(request, mockResponder, imapSession);
 
@@ -137,13 +137,15 @@ class SetMetadataProcessorTest {
     }
 
     @Test
-    void processShouldWorkWithCompleteResponse() throws Exception {
+    void processShouldWorkWithCompleteResponse() {
+        when(mockMailboxManager.updateAnnotationsReactive(inbox, mockMailboxSession, mailboxAnnotations))
+            .thenReturn(Mono.empty());
         when(mockStatusResponseFactory.taggedOk(any(Tag.class), any(ImapCommand.class), any(HumanReadableText.class)))
             .thenReturn(okResponse);
 
         processor.process(request, mockResponder, imapSession);
 
-        verify(mockMailboxManager).updateAnnotations(inbox, mockMailboxSession, mailboxAnnotations);
+        verify(mockMailboxManager).updateAnnotationsReactive(inbox, mockMailboxSession, mailboxAnnotations);
         verify(mockResponder).respond(okResponse);
         verify(mockStatusResponseFactory, times(1)).taggedOk(any(Tag.class), any(ImapCommand.class), humanTextCaptor.capture());
 
@@ -152,7 +154,8 @@ class SetMetadataProcessorTest {
 
     @Test
     void processShouldResponseNoWhenManagerThrowsAnnotationException() throws Exception {
-        doThrow(AnnotationException.class).when(mockMailboxManager).updateAnnotations(eq(inbox), eq(mockMailboxSession), eq(mailboxAnnotations));
+        when(mockMailboxManager.updateAnnotationsReactive(eq(inbox), eq(mockMailboxSession), eq(mailboxAnnotations)))
+            .thenReturn(Mono.error(new AnnotationException()));
 
         processor.process(request, mockResponder, imapSession);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/04: MailboxAnnotationListener implement reactive group event listener

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bbdd764273f7f006aa2d76779a2b58249c6632e3
Author: Tung Tran <vt...@linagora.com>
AuthorDate: Thu Apr 6 08:09:22 2023 +0700

    MailboxAnnotationListener implement reactive group event listener
---
 .../store/event/MailboxAnnotationListener.java     | 23 ++++++++---------
 .../store/event/MailboxAnnotationListenerTest.java | 29 +++++++++++++---------
 2 files changed, 27 insertions(+), 25 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
index 87a37baf8b..6385f9caf2 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxAnnotationListener.java
@@ -18,8 +18,6 @@
  ****************************************************************/
 package org.apache.james.mailbox.store.event;
 
-import java.util.List;
-
 import javax.inject.Inject;
 
 import org.apache.james.events.Event;
@@ -28,12 +26,15 @@ import org.apache.james.events.Group;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.SessionProvider;
 import org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
-import org.apache.james.mailbox.model.MailboxAnnotation;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailbox.store.mail.AnnotationMapper;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
-public class MailboxAnnotationListener implements EventListener.GroupEventListener {
+public class MailboxAnnotationListener implements EventListener.ReactiveGroupEventListener {
     public static final class MailboxAnnotationListenerGroup extends Group {
 
     }
@@ -60,20 +61,16 @@ public class MailboxAnnotationListener implements EventListener.GroupEventListen
     }
 
     @Override
-    public void event(Event event) {
+    public Publisher<Void> reactiveEvent(Event event) {
         if (event instanceof MailboxDeletion) {
             MailboxSession mailboxSession = sessionProvider.createSystemSession(event.getUsername());
             AnnotationMapper annotationMapper = mailboxSessionMapperFactory.getAnnotationMapper(mailboxSession);
             MailboxId mailboxId = ((MailboxDeletion) event).getMailboxId();
 
-            deleteRelatedAnnotations(mailboxId, annotationMapper);
-        }
-    }
-
-    private void deleteRelatedAnnotations(MailboxId mailboxId, AnnotationMapper annotationMapper) {
-        List<MailboxAnnotation> annotations = annotationMapper.getAllAnnotations(mailboxId);
-        for (MailboxAnnotation annotation : annotations) {
-            annotationMapper.deleteAnnotation(mailboxId, annotation.getKey());
+            return Flux.from(annotationMapper.getAllAnnotationsReactive(mailboxId))
+                .flatMap(annotation -> Mono.from(annotationMapper.deleteAnnotationReactive(mailboxId, annotation.getKey())))
+                .then();
         }
+        return Mono.empty();
     }
 }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
index ede6600038..1c310a5447 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/event/MailboxAnnotationListenerTest.java
@@ -56,6 +56,9 @@ import org.mockito.MockitoAnnotations;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 class MailboxAnnotationListenerTest {
     static final Username USER = Username.of("user");
     static final MailboxPath MAILBOX_PATH = new MailboxPath("namespace", USER, "name");
@@ -116,13 +119,13 @@ class MailboxAnnotationListenerTest {
     }
 
     @Test
-    void eventShoudlDoNothingIfMailboxDoesNotHaveAnyAnnotation() throws Exception {
-        when(annotationMapper.getAllAnnotations(any(MailboxId.class))).thenReturn(ImmutableList.<MailboxAnnotation>of());
+    void eventShouldDoNothingIfMailboxDoesNotHaveAnyAnnotation() throws Exception {
+        when(annotationMapper.getAllAnnotationsReactive(any(MailboxId.class))).thenReturn(Flux.fromIterable(List.of()));
 
         listener.event(deleteEvent);
 
         verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
-        verify(annotationMapper).getAllAnnotations(eq(mailboxId));
+        verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
 
         verifyNoMoreInteractions(mailboxSessionMapperFactory);
         verifyNoMoreInteractions(annotationMapper);
@@ -131,14 +134,15 @@ class MailboxAnnotationListenerTest {
 
     @Test
     void eventShoudlDeleteAllMailboxAnnotation() throws Exception {
-        when(annotationMapper.getAllAnnotations(eq(mailboxId))).thenReturn(ANNOTATIONS);
+        when(annotationMapper.getAllAnnotationsReactive(eq(mailboxId))).thenReturn(Flux.fromIterable(ANNOTATIONS));
+        when(annotationMapper.deleteAnnotationReactive(eq(mailboxId), any())).thenReturn(Mono.empty());
 
         listener.event(deleteEvent);
 
         verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
-        verify(annotationMapper).getAllAnnotations(eq(mailboxId));
-        verify(annotationMapper).deleteAnnotation(eq(mailboxId), eq(PRIVATE_KEY));
-        verify(annotationMapper).deleteAnnotation(eq(mailboxId), eq(SHARED_KEY));
+        verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
+        verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY));
+        verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(SHARED_KEY));
 
         verifyNoMoreInteractions(mailboxSessionMapperFactory);
         verifyNoMoreInteractions(annotationMapper);
@@ -146,15 +150,16 @@ class MailboxAnnotationListenerTest {
     }
 
     @Test
-    void eventShouldPropagateFailure() throws Exception {
-        when(annotationMapper.getAllAnnotations((eq(mailboxId)))).thenReturn(ANNOTATIONS);
-        doThrow(new RuntimeException()).when(annotationMapper).deleteAnnotation(eq(mailboxId), eq(PRIVATE_KEY));
+    void eventShouldPropagateFailure() {
+        when(annotationMapper.getAllAnnotationsReactive(eq(mailboxId))).thenReturn(Flux.fromIterable(ANNOTATIONS));
+        when(annotationMapper.deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY)))
+            .thenReturn(Mono.error(new RuntimeException()));
 
         assertThatThrownBy(() -> listener.event(deleteEvent)).isInstanceOf(RuntimeException.class);
 
         verify(mailboxSessionMapperFactory).getAnnotationMapper(eq(mailboxSession));
-        verify(annotationMapper).getAllAnnotations(eq(mailboxId));
-        verify(annotationMapper).deleteAnnotation(eq(mailboxId), eq(PRIVATE_KEY));
+        verify(annotationMapper).getAllAnnotationsReactive(eq(mailboxId));
+        verify(annotationMapper).deleteAnnotationReactive(eq(mailboxId), eq(PRIVATE_KEY));
 
         verifyNoMoreInteractions(mailboxSessionMapperFactory);
         verifyNoMoreInteractions(annotationMapper);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org