You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/11/04 02:16:52 UTC

[james-project] branch master updated (cef00d6 -> 9d16f68)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from cef00d6  Add a warmup stage in jenkins stress tests file
     new 03729af  JAMES-2037 Use Flux for MessageManager::search
     new d493089  MAILBOX-339 Propagate errors on single items upon migration
     new cd9f288  MAILBOX-339 Limit concurency for migration processes
     new bfd12be  JAMES-3409 Delete MailboxPathV2 content based on MailboxPathV2
     new e6cb2f7  JAMES-3409 Add a retry in MailboxPathV3Migration
     new 52afff5  JAMES-3433 MetricableBlobStore should propagate storage strategy upon reads
     new f813e60  JAMES-3433 CachedBlobStore should only delete data when underlying backend did delete it too
     new 9150cbe  JAMES-3433 Tests enforcing Blob Store cache usage
     new 9d16f68  JAMES-3368 Follow the specification for default Email/get properties

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/james/mailbox/MessageManager.java   |   3 +-
 .../mail/migration/MailboxPathV2Migration.java     |   5 +-
 .../mail/migration/MailboxPathV3Migration.java     |  16 +-
 .../mail/migration/MessageV3Migration.java         |   2 +-
 .../ElasticSearchListeningMessageSearchIndex.java  |   6 +-
 .../ElasticSearchIntegrationTest.java              |  24 +--
 ...asticSearchListeningMessageSearchIndexTest.java |  26 +--
 .../lucene/search/LuceneMessageSearchIndex.java    |   5 +-
 .../LuceneMailboxMessageSearchIndexTest.java       | 106 ++++++------
 .../james/vault/DeletedMessageVaultHookTest.java   |  12 +-
 .../james/mailbox/store/StoreMessageManager.java   |   7 +-
 .../store/search/LazyMessageSearchIndex.java       |   3 +-
 .../mailbox/store/search/MessageSearchIndex.java   |   3 +-
 .../store/search/SimpleMessageSearchIndex.java     |   5 +-
 .../store/AbstractCombinationManagerTest.java      |  12 +-
 .../search/AbstractMessageSearchIndexTest.java     | 162 +++++++++---------
 .../imap/processor/AbstractMailboxProcessor.java   |   2 +-
 .../james/imap/processor/SearchProcessor.java      |   6 +-
 .../imap/processor/base/SelectedMailboxImpl.java   |  10 +-
 .../james/imap/processor/SearchProcessorTest.java  |   5 +-
 .../processor/base/MailboxEventAnalyserTest.java   |   5 +-
 .../processor/base/SelectedMailboxImplTest.java    |  13 +-
 .../java/org/apache/james/blob/api/BlobStore.java  |   2 +-
 .../apache/james/blob/api/MetricableBlobStore.java |  13 +-
 .../blob/cassandra/cache/CachedBlobStore.java      |  13 +-
 .../main/java/org/apache/james/blob/api/Store.java |   3 +-
 .../deduplication/DeDuplicationBlobStore.scala     |   4 +-
 .../blob/deduplication/PassThroughBlobStore.scala  |   5 +-
 .../java/org/apache/james/SearchModuleChooser.java |   3 +-
 .../org/apache/james/CassandraCacheQueryTest.java  | 181 +++++++++++++++++++++
 .../org/apache/james/FakeMessageSearchIndex.java   |   3 +-
 .../rfc8621/contract/EmailGetMethodContract.scala  | 135 ++++++++++++++-
 .../scala/org/apache/james/jmap/mail/Email.scala   |   5 +-
 .../james/webadmin/service/ExportService.java      |   3 +-
 34 files changed, 569 insertions(+), 239 deletions(-)
 create mode 100644 server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraCacheQueryTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/09: JAMES-3409 Add a retry in MailboxPathV3Migration

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e6cb2f70f4d7f710dde47fbdbf8b1616b736bbd2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 3 08:40:23 2020 +0700

    JAMES-3409 Add a retry in MailboxPathV3Migration
---
 .../mailbox/cassandra/mail/migration/MailboxPathV3Migration.java | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
index a2b9e41..7041427 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail.migration;
 
 import java.time.Clock;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.Optional;
 
@@ -37,6 +38,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.RetryBackoffSpec;
 
 public class MailboxPathV3Migration implements Migration {
 
@@ -91,6 +94,9 @@ public class MailboxPathV3Migration implements Migration {
     public static final Logger LOGGER = LoggerFactory.getLogger(MailboxPathV3Migration.class);
     public static final TaskType TYPE = TaskType.of("cassandra-mailbox-path-v3-migration");
     private static final int CONCURRENCY = 50;
+    private static final int MAX_ATTEMPTS = 3;
+    private static final Duration MIN_BACKOFF = Duration.ofSeconds(1);
+
     private final CassandraMailboxPathV2DAO daoV2;
     private final CassandraMailboxPathV3DAO daoV3;
     private final CassandraMailboxDAO mailboxDAO;
@@ -117,6 +123,9 @@ public class MailboxPathV3Migration implements Migration {
             .flatMap(mailbox -> daoV3.save(mailbox)
                 .then(daoV2.delete(idAndPath.getMailboxPath())))
             .onErrorResume(error -> handleErrorMigrate(idAndPath, error))
+            .retryWhen(RetryBackoffSpec.backoff(MAX_ATTEMPTS, MIN_BACKOFF)
+                .jitter(0.5)
+                .scheduler(Schedulers.elastic()))
             .then();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/09: MAILBOX-339 Limit concurency for migration processes

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cd9f2885770a32eca5418160742bc8dd50e9e2fc
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Nov 2 13:51:49 2020 +0100

    MAILBOX-339 Limit concurency for migration processes
---
 .../james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java | 3 ++-
 .../james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
index 46c3fc2..d95487b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
@@ -89,6 +89,7 @@ public class MailboxPathV2Migration implements Migration {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(MailboxPathV2Migration.class);
     public static final TaskType TYPE = TaskType.of("cassandra-mailbox-path-v2-migration");
+    private static final int CONCURRENCY = 50;
     private final CassandraMailboxPathDAOImpl daoV1;
     private final CassandraMailboxPathV2DAO daoV2;
     private final long initialCount;
@@ -103,7 +104,7 @@ public class MailboxPathV2Migration implements Migration {
     @Override
     public void apply() {
         daoV1.readAll()
-            .flatMap(this::migrate)
+            .flatMap(this::migrate, CONCURRENCY)
             .doOnError(t -> LOGGER.error("Error while performing migration", t))
             .blockLast();
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
index ccd727c..afe5e63 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
@@ -90,6 +90,7 @@ public class MailboxPathV3Migration implements Migration {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(MailboxPathV3Migration.class);
     public static final TaskType TYPE = TaskType.of("cassandra-mailbox-path-v3-migration");
+    private static final int CONCURRENCY = 50;
     private final CassandraMailboxPathV2DAO daoV2;
     private final CassandraMailboxPathV3DAO daoV3;
     private final CassandraMailboxDAO mailboxDAO;
@@ -106,7 +107,7 @@ public class MailboxPathV3Migration implements Migration {
     @Override
     public void apply() {
         daoV2.listAll()
-            .flatMap(this::migrate)
+            .flatMap(this::migrate, CONCURRENCY)
             .doOnError(t -> LOGGER.error("Error while performing migration", t))
             .blockLast();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/09: JAMES-2037 Use Flux for MessageManager::search

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 03729af5c0fd33433a958b3cbbaf31737a96a6f9
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Nov 2 08:21:34 2020 +0100

    JAMES-2037 Use Flux for MessageManager::search
---
 .../org/apache/james/mailbox/MessageManager.java   |   3 +-
 .../ElasticSearchListeningMessageSearchIndex.java  |   6 +-
 .../ElasticSearchIntegrationTest.java              |  24 +--
 ...asticSearchListeningMessageSearchIndexTest.java |  26 ++--
 .../lucene/search/LuceneMessageSearchIndex.java    |   5 +-
 .../LuceneMailboxMessageSearchIndexTest.java       | 106 +++++++-------
 .../james/vault/DeletedMessageVaultHookTest.java   |  12 +-
 .../james/mailbox/store/StoreMessageManager.java   |   7 +-
 .../store/search/LazyMessageSearchIndex.java       |   3 +-
 .../mailbox/store/search/MessageSearchIndex.java   |   3 +-
 .../store/search/SimpleMessageSearchIndex.java     |   5 +-
 .../store/AbstractCombinationManagerTest.java      |  12 +-
 .../search/AbstractMessageSearchIndexTest.java     | 162 +++++++++++----------
 .../imap/processor/AbstractMailboxProcessor.java   |   2 +-
 .../james/imap/processor/SearchProcessor.java      |   6 +-
 .../imap/processor/base/SelectedMailboxImpl.java   |  10 +-
 .../james/imap/processor/SearchProcessorTest.java  |   5 +-
 .../processor/base/MailboxEventAnalyserTest.java   |   5 +-
 .../processor/base/SelectedMailboxImplTest.java    |  13 +-
 .../java/org/apache/james/SearchModuleChooser.java |   3 +-
 .../org/apache/james/FakeMessageSearchIndex.java   |   3 +-
 21 files changed, 207 insertions(+), 214 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
index 6a8ab49..779d84e 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -113,7 +112,7 @@ public interface MessageManager {
      * @throws MailboxException
      *             when search fails for other reasons
      */
-    Stream<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException;
+    Publisher<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException;
 
     /**
      * Expunges messages in the given range from this mailbox by first retrieving the messages to be deleted
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 2d0df24..b6245a8 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -34,7 +34,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -121,14 +120,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
     
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         Optional<Integer> noLimit = Optional.empty();
 
         return searcher
             .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit)
-            .map(SearchResult::getMessageUid)
-            .toStream();
+            .map(SearchResult::getMessageUid);
     }
     
     @Override
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
index 46eb518..5ae6e4c 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
@@ -57,6 +57,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.base.Strings;
 
+import reactor.core.publisher.Flux;
+
 class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
     static final int BATCH_SIZE = 1;
@@ -137,7 +139,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -156,7 +158,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -175,7 +177,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("0123456789")), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("0123456789")), session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -194,7 +196,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("matchMe")), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("matchMe")), session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -214,7 +216,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains(reasonableLongTerm)), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains(reasonableLongTerm)), session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -236,7 +238,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.headerExists("Custom-header")), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.headerExists("Custom-header")), session)).toStream())
             .containsExactly(customDateHeaderMessageId.getUid(), customStringHeaderMessageId.getUid());
     }
 
@@ -258,7 +260,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.all()), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.all()), session)).toStream())
             .contains(customStringHeaderMessageId.getUid());
     }
 
@@ -290,7 +292,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "bob@other.tld")), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "bob@other.tld")), session)).toStream())
             .containsOnly(messageId2.getUid());
     }
 
@@ -322,7 +324,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "alice-test")), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "alice-test")), session)).toStream())
             .containsOnly(messageId2.getUid());
     }
 
@@ -354,7 +356,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "alice-test@domain.tld")), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "alice-test@domain.tld")), session)).toStream())
             .containsOnly(messageId1.getUid());
     }
 
@@ -386,7 +388,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "domain-test.tld")), session))
+        assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "domain-test.tld")), session)).toStream())
             .containsOnly(messageId1.getUid());
     }
 }
\ No newline at end of file
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
index 1dc6d7e..1c588af 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
@@ -202,7 +202,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -213,7 +213,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_WITH_ATTACHMENT.getUid());
     }
 
@@ -225,7 +225,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -237,7 +237,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid(), MESSAGE_2.getUid());
     }
 
@@ -255,7 +255,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_WITH_ATTACHMENT.getUid());
     }
 
@@ -279,7 +279,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -294,7 +294,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_2.getUid());
     }
 
@@ -309,7 +309,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -323,7 +323,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -361,7 +361,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -382,7 +382,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -404,7 +404,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -438,7 +438,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index c9ddec0..0304099 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -456,11 +456,10 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
     
     
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
 
-        return searchMultimap(ImmutableList.of(mailbox.getMailboxId()), searchQuery)
-            .stream()
+        return Flux.fromIterable(searchMultimap(ImmutableList.of(mailbox.getMailboxId()), searchQuery))
             .map(SearchResult::getMessageUid);
     }
 
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index 3f913f0..32813af 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -195,105 +195,105 @@ class LuceneMailboxMessageSearchIndexTest {
     @Test
     void bodySearchShouldMatchPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void bodySearchShouldNotMatchAbsentPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(CUSTARD + CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).isEmpty();
     }
     
     @Test
     void bodySearchShouldBeCaseInsensitive() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(RHUBARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void bodySearchNotMatchPhraseOnlyInFrom() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(FROM_ADDRESS));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).isEmpty();
     }
 
     @Test
     void bodySearchShouldNotMatchPhraseOnlyInSubject() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(SUBJECT_PART));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).isEmpty();
     }
 
     @Test
     void textSearchShouldMatchPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains(CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void textSearchShouldNotAbsentMatchPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains(CUSTARD + CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).isEmpty();
     }
 
     @Test
     void textSearchMatchShouldBeCaseInsensitive() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains(RHUBARD.toLowerCase(Locale.US)));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void addressSearchShouldMatchToFullAddress() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.To,FROM_ADDRESS));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void addressSearchShouldMatchToDisplayName() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.To,"Harry"));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void addressSearchShouldMatchToEmail() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.To,"Harry@example.org"));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void addressSearchShouldMatchFrom() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.From,"ser-from@domain.or"));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void textSearchShouldMatchPhraseOnlyInToHeader() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains(FROM_ADDRESS));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void textSearchShouldMatchPhraseOnlyInSubjectHeader() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains(SUBJECT_PART));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void searchAllShouldMatchAllMailboxEmails() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        Stream<MessageUid> result = index.search(session, mailbox2, query);
+        Stream<MessageUid> result = index.search(session, mailbox2, query).toStream();
         assertThat(result).containsExactly(uid2);
     }
 
@@ -345,42 +345,42 @@ class LuceneMailboxMessageSearchIndexTest {
     @Test
     void flagSearchShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flag.DELETED));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4);
     }
     
     @Test
     void bodySearchShouldMatchSeveralEmails() throws Exception {    
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains("body"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void textSearchShouldMatchSeveralEmails() throws Exception {    
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains("body"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void headerSearchShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.headerContains("Subject", "test"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid4);
     }
     
     @Test
     void headerExistsShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.headerExists("Subject"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid4);
     }
     
     @Test
     void flagUnsetShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsUnSet(Flag.DRAFT));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -390,7 +390,7 @@ class LuceneMailboxMessageSearchIndexTest {
         cal.setTime(new Date());
         SearchQuery query = SearchQuery.of(SearchQuery.internalDateBefore(cal.getTime(), DateResolution.Day));
         
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3);
     }
     
@@ -400,7 +400,7 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = SearchQuery.of(SearchQuery.internalDateAfter(cal.getTime(), DateResolution.Day));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid4);
     }
     
@@ -411,7 +411,7 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = SearchQuery.of(SearchQuery.internalDateOn(cal.getTime(), DateResolution.Day));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1);
     }
     
@@ -420,7 +420,7 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = SearchQuery.of(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1)}));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1);
     }
     
@@ -429,35 +429,35 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = SearchQuery.of(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1), new SearchQuery.UidRange(uid3,uid4)}));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void sizeEqualsShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.sizeEquals(200));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1);
     }
     
     @Test
     void sizeLessThanShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.sizeLessThan(200));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4);
     }
     
     @Test
     void sizeGreaterThanShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.sizeGreaterThan(6));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void uidShouldBeSorted() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -465,7 +465,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void uidReverseSortShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Uid, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid4, uid3, uid1);
     }
     
@@ -473,7 +473,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnSentDateShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.SentDate, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -481,7 +481,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnSentDateShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.SentDate, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
 
@@ -489,7 +489,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnSubjectShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.BaseSubject, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -497,7 +497,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnSubjectShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.BaseSubject, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -505,7 +505,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnMailboxFromShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxFrom, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -513,7 +513,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnMailboxFromShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxFrom, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
     
@@ -521,7 +521,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnMailboxCCShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxCc, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -529,7 +529,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnMailboxCCShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxCc, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -537,7 +537,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnMailboxToShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxTo, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -545,7 +545,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnMailboxToShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxTo, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -553,7 +553,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnDisplayToShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayTo, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -561,7 +561,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnDisplayToShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayTo, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -569,7 +569,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnDisplayFromShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayFrom, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -577,7 +577,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnDisplayFromShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayFrom, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
     
@@ -585,7 +585,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnArrivalDateShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Arrival, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -593,7 +593,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnArrivalDateShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Arrival, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -601,7 +601,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnSizeShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Size, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -609,7 +609,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnSizeShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Size, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -617,7 +617,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void notOperatorShouldReverseMatching() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.not(SearchQuery.uid(new SearchQuery.UidRange[] { new SearchQuery.UidRange(uid1)})));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query).toStream();
         assertThat(result).containsExactly(uid3, uid4);
     }
 
@@ -634,7 +634,7 @@ class LuceneMailboxMessageSearchIndexTest {
         index.update(session, mailbox.getMailboxId(), Lists.newArrayList(updatedFlags)).block();
 
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
-        assertThat(index.search(session, mailbox, query))
+        assertThat(index.search(session, mailbox, query).toStream())
             .containsExactly(uid2);
     }
 
@@ -651,7 +651,7 @@ class LuceneMailboxMessageSearchIndexTest {
         index.update(session, mailbox.getMailboxId(), Lists.newArrayList(updatedFlags)).block();
 
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
-        assertThat(index.search(session, mailbox, query))
+        assertThat(index.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -669,7 +669,7 @@ class LuceneMailboxMessageSearchIndexTest {
         index.update(session, mailbox.getMailboxId(), Lists.newArrayList(updatedFlags)).block();
 
         SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
-        assertThat(index.search(session, mailbox, query))
+        assertThat(index.search(session, mailbox, query).toStream())
             .containsExactly(uid2);
     }
 
diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 0a94c32..7251a98 100644
--- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -197,7 +197,7 @@ class DeletedMessageVaultHookTest {
         long messageSize = messageSize(bobMessageManager, composedMessageId);
 
         DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize);
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
+        bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).blockFirst())
             .isEqualTo(deletedMessage);
@@ -218,7 +218,7 @@ class DeletedMessageVaultHookTest {
         MessageManager bobMessageManager = mailboxManager.getMailbox(aliceMailbox, bobSession);
         appendMessage(aliceMessageManager);
 
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
+        bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).collectList().block())
             .isEmpty();
@@ -244,7 +244,7 @@ class DeletedMessageVaultHookTest {
 
         long messageSize = messageSize(bobMessageManager, composedMessageId);
         DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
+        bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).blockFirst())
             .isEqualTo(deletedMessage);
@@ -268,7 +268,7 @@ class DeletedMessageVaultHookTest {
 
         messageIdManager.setInMailboxes(messageId, ImmutableList.of(bobMailbox), bobSession);
 
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
+        bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).collectList().block())
             .isEmpty();
@@ -294,7 +294,7 @@ class DeletedMessageVaultHookTest {
 
         long messageSize = messageSize(bobMessageManager, composedMessageId);
         DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
+        bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).blockFirst())
             .isEqualTo(deletedMessage);
@@ -318,7 +318,7 @@ class DeletedMessageVaultHookTest {
 
         messageIdManager.setInMailboxes(messageId, ImmutableList.of(aliceMailbox, bobMailbox), bobSession);
 
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
+        bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).collectList().block())
             .isEmpty();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index fb4f79d..0103598 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -707,7 +706,7 @@ public class StoreMessageManager implements MessageManager {
     }
 
     @Override
-    public Stream<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException {
+    public Flux<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException {
         if (query.equals(LIST_ALL_QUERY)) {
             return listAllMessageUids(mailboxSession);
         }
@@ -859,11 +858,11 @@ public class StoreMessageManager implements MessageManager {
             .getApplicableFlag(mailbox);
     }
 
-    private Stream<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException {
+    private Flux<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException {
         final MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
 
         return messageMapper.execute(
-            () -> messageMapper.listAllMessageUids(mailbox).toStream());
+            () -> messageMapper.listAllMessageUids(mailbox));
     }
 
     @Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index 7e44fea..65c588b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -23,7 +23,6 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -111,7 +110,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
      * 
      */
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         MailboxId id = mailbox.getMailboxId();
         
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
index 4873099..30269ad 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
@@ -22,7 +22,6 @@ package org.apache.james.mailbox.store.search;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
@@ -46,7 +45,7 @@ public interface MessageSearchIndex {
     /**
      * Return all uids of the previous indexed {@link Mailbox}'s which match the {@link SearchQuery}
      */
-    Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException;
+    Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException;
 
     /**
      * Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 90c7cfe..63dc12c 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -113,12 +113,11 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     }
     
     @Override
-    public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         return searchResults(session, Flux.just(mailbox), query)
             .filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId()))
-            .map(SearchResult::getMessageUid)
-            .toStream();
+            .map(SearchResult::getMessageUid);
     }
 
     private Set<MailboxMessage> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index f0a8f81..bdddff1 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -114,7 +114,7 @@ public abstract class AbstractCombinationManagerTest {
 
         messageIdManager.setInMailboxes(messageId, ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session);
 
-        assertThat(messageManager2.search(query, session)).hasSize(1);
+        assertThat(Flux.from(messageManager2.search(query, session)).toStream()).hasSize(1);
     }
 
     @Test
@@ -131,7 +131,7 @@ public abstract class AbstractCombinationManagerTest {
             .get(0)
             .getUid();
 
-        assertThat(messageManager2.search(query, session)).hasSize(1)
+        assertThat(Flux.from(messageManager2.search(query, session)).toStream()).hasSize(1)
             .containsExactly(uidInMailbox2);
     }
 
@@ -144,7 +144,7 @@ public abstract class AbstractCombinationManagerTest {
         messageIdManager.setInMailboxes(composedMessageId.getMessageId(),
             ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session);
 
-        assertThat(messageManager1.search(query, session)).hasSize(1)
+        assertThat(Flux.from(messageManager1.search(query, session)).toStream()).hasSize(1)
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -508,7 +508,7 @@ public abstract class AbstractCombinationManagerTest {
             .getUid();
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
-        assertThat(messageManager2.search(searchQuery, session))
+        assertThat(Flux.from(messageManager2.search(searchQuery, session)).toStream())
             .hasSize(1)
             .containsOnly(uid2);
     }
@@ -522,7 +522,7 @@ public abstract class AbstractCombinationManagerTest {
         messageIdManager.delete(messageId, ImmutableList.of(mailbox1.getMailboxId()), session);
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
-        assertThat(messageManager1.search(searchQuery, session)).isEmpty();
+        assertThat(Flux.from(messageManager1.search(searchQuery, session)).toStream()).isEmpty();
     }
 
     @Test
@@ -537,7 +537,7 @@ public abstract class AbstractCombinationManagerTest {
         messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session);
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
-        assertThat(messageManager1.search(searchQuery, session)).isEmpty();
+        assertThat(Flux.from(messageManager1.search(searchQuery, session)).toStream()).isEmpty();
     }
 
     private Predicate<MessageResult> messageInMailbox2() {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
index ddbb186..5e3d2e9 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
@@ -67,6 +67,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 public abstract class AbstractMessageSearchIndexTest {
 
     private static final long LIMIT = 100L;
@@ -377,7 +379,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void emptySearchQueryShouldReturnAllUids() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.matchAll();
         
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -385,7 +387,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void allShouldReturnAllUids() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -394,7 +396,7 @@ public abstract class AbstractMessageSearchIndexTest {
         /* Only mail4.eml contains word MAILET-94 */
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.bodyContains("MAILET-94"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -404,7 +406,7 @@ public abstract class AbstractMessageSearchIndexTest {
            mail.eml contains created and thus matches the query with a low score */
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.bodyContains("created summary"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m2.getUid(), m8.getUid());
     }
 
@@ -412,7 +414,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void hasAttachmentShouldOnlyReturnMessageThatHasAttachmentWhichAreNotInline() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.hasAttachment());
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream())
             .containsOnly(mailWithAttachment.getUid());
     }
 
@@ -427,7 +429,7 @@ public abstract class AbstractMessageSearchIndexTest {
         
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream())
             .contains(mailWithDotsInHeader.getUid());
     }
 
@@ -442,7 +444,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerExists("X-header.with.dots"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream())
             .contains(mailWithDotsInHeader.getUid());
     }
 
@@ -468,7 +470,7 @@ public abstract class AbstractMessageSearchIndexTest {
             SearchQuery.attachmentContains(emailToSearch),
             SearchQuery.bodyContains(emailToSearch))));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m11.getUid());
     }
 
@@ -476,7 +478,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void hasNoAttachmenShouldOnlyReturnMessageThatHasNoAttachmentWhichAreNotInline() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.hasNoAttachment());
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream())
             .containsOnly(mOther.getUid(), mailWithInlinedAttachment.getUid());
     }
 
@@ -484,7 +486,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsSetShouldReturnUidOfMessageMarkedAsDeletedWhenUsedWithFlagDeleted() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DELETED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -492,7 +494,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsSetShouldReturnUidOfMessageMarkedAsAnsweredWhenUsedWithFlagAnswered() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m2.getUid());
     }
 
@@ -500,7 +502,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsSetShouldReturnUidOfMessageMarkedAsDraftWhenUsedWithFlagDraft() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m3.getUid());
     }
 
@@ -509,7 +511,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 7 is not marked as RECENT
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.RECENT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -517,7 +519,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsSetShouldReturnUidOfMessageMarkedAsFlaggedWhenUsedWithFlagFlagged() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.FLAGGED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -526,7 +528,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 6 is marked as read.
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.SEEN));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m6.getUid());
     }
 
@@ -542,7 +544,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.SEEN));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .contains(m5.getUid());
     }
     
@@ -635,7 +637,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsSetShouldReturnUidsOfMessageContainingAGivenUserFlag() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet("Hello"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -643,7 +645,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void userFlagsShouldBeMatchedExactly() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet("Hello bonjour"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .isEmpty();
     }
 
@@ -651,7 +653,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsDeletedWhenUsedWithFlagDeleted() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.DELETED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -659,7 +661,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsAnsweredWhenUsedWithFlagAnswered() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.ANSWERED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -667,7 +669,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsDraftWhenUsedWithFlagDraft() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.DRAFT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -676,7 +678,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 7 is not marked as RECENT
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.RECENT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m7.getUid());
     }
 
@@ -684,7 +686,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsFlaggedWhenUsedWithFlagFlagged() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.FLAGGED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -693,7 +695,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 6 is marked as read.
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.SEEN));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -701,7 +703,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsUnSetShouldReturnUidsOfMessageNotContainingAGivenUserFlag() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet("Hello"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(),  m9.getUid());
     }
 
@@ -712,7 +714,7 @@ public abstract class AbstractMessageSearchIndexTest {
             DateResolution.Day));
         // Date : 2014/07/02 00:00:00.000 ( Paris time zone )
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -723,7 +725,7 @@ public abstract class AbstractMessageSearchIndexTest {
             DateResolution.Day));
         // Date : 2014/02/02 00:00:00.000 ( Paris time zone )
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid());
     }
 
@@ -734,7 +736,7 @@ public abstract class AbstractMessageSearchIndexTest {
             DateResolution.Day));
         // Date : 2014/03/02 00:00:00.000 ( Paris time zone )
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m3.getUid());
     }
 
@@ -746,7 +748,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m3.getUid(), m2.getUid());
     }
 
@@ -758,7 +760,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -770,7 +772,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m4.getUid(), m9.getUid());
     }
 
@@ -778,7 +780,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void modSeqEqualsShouldReturnUidsOfMessageHavingAGivenModSeq() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.modSeqEquals(2L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m2.getUid());
     }
 
@@ -786,7 +788,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void modSeqGreaterThanShouldReturnUidsOfMessageHavingAGreaterModSeq() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.modSeqGreaterThan(7L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid(), m9.getUid());
     }
 
@@ -794,7 +796,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void modSeqLessThanShouldReturnUidsOfMessageHavingAGreaterModSeq() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.modSeqLessThan(3L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid());
     }
 
@@ -803,7 +805,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 6 is over 6.8 KB
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.sizeGreaterThan(6800L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m6.getUid());
     }
 
@@ -812,7 +814,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 2 3 4 5 7 9 are under 5 KB
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.sizeLessThan(5000L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m7.getUid(), m9.getUid());
     }
 
@@ -820,7 +822,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void headerContainsShouldReturnUidsOfMessageHavingThisHeaderWithTheSpecifiedValue() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerContains("Precedence", "list"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m6.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -828,7 +830,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void headerExistsShouldReturnUidsOfMessageHavingThisHeader() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerExists("Precedence"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -836,7 +838,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void addressShouldReturnUidHavingRightExpeditorWhenFromIsSpecified() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.From, "murari.ksr@gmail.com"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -848,7 +850,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.From, "murari"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -860,7 +862,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.From, "gmail.com"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -872,7 +874,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "Üsteliğhan"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -880,7 +882,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void addressShouldReturnUidHavingRightRecipientWhenToIsSpecified() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "root@listes.minet.net"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -892,7 +894,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "root"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -904,7 +906,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "listes.minet.net"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -915,7 +917,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Cc, "monkey@any.com"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -926,7 +928,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Cc, "monkey"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -937,7 +939,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Cc, "any.com"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -949,7 +951,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Bcc, "monkey"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -960,7 +962,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Bcc, "any.com"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -972,7 +974,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Bcc, "no@no.com"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m9.getUid());
     }
 
@@ -981,7 +983,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery.UidRange[] numericRanges = {new SearchQuery.UidRange(m2.getUid(), m4.getUid()), new SearchQuery.UidRange(m6.getUid(), m7.getUid())};
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.uid(numericRanges));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m6.getUid(), m7.getUid());
     }
 
@@ -990,7 +992,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery.UidRange[] numericRanges = {};
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.uid(numericRanges));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -998,7 +1000,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void youShouldBeAbleToSpecifySeveralCriterionOnASingleQuery() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerExists("Precedence"), SearchQuery.modSeqGreaterThan(6L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid(), m9.getUid());
     }
 
@@ -1008,7 +1010,7 @@ public abstract class AbstractMessageSearchIndexTest {
             SearchQuery.headerExists("Precedence"),
             SearchQuery.modSeqGreaterThan(6L)));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m8.getUid(), m9.getUid());
     }
 
@@ -1019,7 +1021,7 @@ public abstract class AbstractMessageSearchIndexTest {
             SearchQuery.uid(numericRanges),
             SearchQuery.modSeqGreaterThan(6L)));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -1028,7 +1030,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery searchQuery = SearchQuery.of(
             SearchQuery.not(SearchQuery.headerExists("Precedence")));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m7.getUid());
     }
 
@@ -1036,7 +1038,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void sortShouldOrderMessages() throws Exception {
         SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.Arrival));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m1.getUid(), m2.getUid(), m3.getUid(), m5.getUid(), m4.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -1044,7 +1046,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void revertSortingShouldReturnElementsInAReversedOrder() throws Exception {
         SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.Arrival, Order.REVERSE));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m9.getUid(), m8.getUid(), m7.getUid(), m6.getUid(), m4.getUid(), m5.getUid(), m3.getUid(), m2.getUid(), m1.getUid());
     }
 
@@ -1056,7 +1058,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m3.getUid(), m2.getUid());
     }
 
@@ -1068,7 +1070,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -1080,7 +1082,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m4.getUid(), m9.getUid());
     }
 
@@ -1088,7 +1090,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void mailsContainsShouldIncludeMailHavingAttachmentsMatchingTheRequest() throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.mailContains("root mailing list"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m1.getUid(), m6.getUid());
     }
 
@@ -1100,7 +1102,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.MailboxCc))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m3.getUid(), m5.getUid(), m4.getUid(), m2.getUid());
         // 2 : No cc
         // 3 : Cc : abc@abc.org
@@ -1116,7 +1118,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.MailboxFrom))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m3.getUid(), m2.getUid(), m4.getUid(), m5.getUid());
         // m3 : jira1@apache.org
         // m2 : jira2@apache.org
@@ -1132,7 +1134,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.MailboxTo))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m5.getUid(), m3.getUid(), m2.getUid(), m4.getUid());
         // 5 : "zzz" <ma...@james.apache.org>
         // 3 : "aaa" <a-...@james.apache.org>
@@ -1148,7 +1150,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.BaseSubject))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m4.getUid(), m3.getUid(), m2.getUid(), m5.getUid());
         // 2 : [jira] [Created] (MAILBOX-234) Convert Message into JSON
         // 3 : [jira] [Closed] (MAILBOX-217) We should index attachment in elastic search
@@ -1164,7 +1166,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Size))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m2.getUid(), m3.getUid(), m5.getUid(), m4.getUid());
         // 2 : 3210 o
         // 3 : 3647 o
@@ -1180,7 +1182,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.DisplayFrom))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m4.getUid(), m3.getUid(), m5.getUid(), m2.getUid());
         // 2 : Tellier Benoit (JIRA)
         // 3 : efij
@@ -1196,7 +1198,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.DisplayTo))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m3.getUid(), m2.getUid(), m4.getUid(), m5.getUid());
         // 2 : abc
         // 3 : aaa
@@ -1212,7 +1214,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.SentDate))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m5.getUid(), m4.getUid(), m2.getUid(), m3.getUid());
         // 2 : 4 Jun 2015 09:23:37
         // 3 : 4 Jun 2015 09:27:37
@@ -1228,7 +1230,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Uid))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsExactly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid());
     }
 
@@ -1243,7 +1245,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.mailContains("User message banana"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream())
             .containsExactly(messageWithBeautifulBananaAsTextAttachment.getUid());
     }
 
@@ -1258,7 +1260,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.attachmentContains("beautiful banana"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream())
             .containsExactly(messageWithBeautifulBananaAsTextAttachment.getUid());
     }
 
@@ -1282,7 +1284,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.attachmentContains("beautiful banana"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream())
             .containsExactly(messageWithBeautifulBananaAsPDFAttachment.getUid());
     }
 
@@ -1334,7 +1336,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.SentDate));
 
-        assertThat(messageManager.search(searchQuery, session))
+        assertThat(Flux.from(messageManager.search(searchQuery, session)).toStream())
             .containsExactly(message2.getUid(),
                 message1.getUid(),
                 message3.getUid());
@@ -1373,7 +1375,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.SentDate));
 
-        assertThat(messageManager.search(searchQuery, session))
+        assertThat(Flux.from(messageManager.search(searchQuery, session)).toStream())
             .containsExactly(message2.getUid(),
                 message1.getUid(),
                 message3.getUid());
@@ -1384,7 +1386,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.mimeMessageID("<JI...@Atlassian.JIRA>"));
         // Correspond to mail.eml
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(m3.getUid());
     }
 
@@ -1406,7 +1408,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .await()
             .atMost(30, TimeUnit.SECONDS)
             .until(
-                () -> messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery).count() == 9);
+                () -> messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery).toStream().count() == 9);
     }
 
     @Test
@@ -1433,7 +1435,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.attachmentFileName(fileName));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream())
             .containsOnly(mWithFileName.getUid());
     }
 }
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
index 4f23ebc..df5bfde 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
@@ -571,7 +571,7 @@ public abstract class AbstractMailboxProcessor<R extends ImapRequest> extends Ab
             }
             searchQuery.andCriteria(SearchQuery.uid(nRanges));
             searchQuery.andCriteria(SearchQuery.modSeqGreaterThan(changedSince));
-            try (Stream<MessageUid> uids = mailbox.search(searchQuery.build(), session)) {
+            try (Stream<MessageUid> uids = Flux.from(mailbox.search(searchQuery.build(), session)).toStream()) {
                 uids.forEach(vanishedUids::remove);
             }
             UidRange[] vanishedIdRanges = uidRanges(MessageRange.toRanges(vanishedUids));
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
index e86e850..d93742f 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
@@ -225,9 +225,9 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp
     }
 
     private Collection<MessageUid> performUidSearch(MessageManager mailbox, SearchQuery query, MailboxSession msession) throws MailboxException {
-        try (Stream<MessageUid> stream = mailbox.search(query, msession)) {
-            return stream.collect(Guavate.toImmutableList());
-        }
+        return Flux.from(mailbox.search(query, msession))
+            .collect(Guavate.toImmutableList())
+            .block();
     }
 
     private long[] toArray(Collection<Long> results) {
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index 1c8d8db..087e4ad 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -58,7 +57,9 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -151,9 +152,10 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener {
         synchronized (applicableFlagsLock) {
             applicableFlags = applicableFlags.updateWithNewFlags(messageManager.getApplicableFlags(mailboxSession));
         }
-        try (Stream<MessageUid> stream = messageManager.search(SearchQuery.of(SearchQuery.all()), mailboxSession)) {
-            uidMsnConverter.addAll(stream.collect(Guavate.toImmutableList()));
-        }
+        ImmutableList<MessageUid> uids = Flux.from(messageManager.search(SearchQuery.of(SearchQuery.all()), mailboxSession))
+            .collect(Guavate.toImmutableList())
+            .block();
+        uidMsnConverter.addAll(uids);
     }
 
     @Override
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
index 1d48407..97bdcbc 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.TimeZone;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -71,6 +70,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import reactor.core.publisher.Flux;
+
 public class SearchProcessorTest {
     private static final int DAY = 6;
 
@@ -463,7 +464,7 @@ public class SearchProcessorTest {
 
     private void check(SearchKey key, final SearchQuery query) throws Exception {
         session.setMailboxSession(mailboxSession);
-        when(mailbox.search(query, mailboxSession)).thenReturn(Stream.empty());
+        when(mailbox.search(query, mailboxSession)).thenReturn(Flux.empty());
         when(selectedMailbox.getApplicableFlags()).thenReturn(new Flags());
         when(selectedMailbox.hasNewApplicableFlags()).thenReturn(false);
 
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
index f15d929..57b95b3 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Date;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -60,6 +59,8 @@ import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.Before;
 import org.junit.Test;
 
+import reactor.core.publisher.Flux;
+
 public class MailboxEventAnalyserTest {
     private static final MessageUid UID = MessageUid.of(900);
     private static final UpdatedFlags ADD_RECENT_UPDATED_FLAGS = UpdatedFlags.builder()
@@ -154,7 +155,7 @@ public class MailboxEventAnalyserTest {
         when(messageManager.getApplicableFlags(any())).thenReturn(new Flags());
         when(messageManager.getId()).thenReturn(MAILBOX_ID);
         when(messageManager.search(any(), any()))
-            .thenReturn(Stream.of(MESSAGE_UID));
+            .thenReturn(Flux.just(MESSAGE_UID));
         when(messageManager.getMessages(any(), any(), any()))
             .thenReturn(new SingleMessageResultIterator(messageResult));
 
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index 02fd802..02bd5a4 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -30,13 +30,13 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
 import java.util.Date;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 
@@ -115,7 +116,8 @@ class SelectedMailboxImplTest {
         when(messageManager.getApplicableFlags(any(MailboxSession.class)))
             .thenReturn(new Flags());
         when(messageManager.search(any(SearchQuery.class), any(MailboxSession.class)))
-            .then(delayedSearchAnswer());
+            .thenReturn(Flux.just(MessageUid.of(1), MessageUid.of(3))
+                .delayElements(Duration.ofSeconds(1)));
         when(messageManager.getId()).thenReturn(mailboxId);
 
         imapSession.setMailboxSession(mock(MailboxSession.class));
@@ -186,13 +188,6 @@ class SelectedMailboxImplTest {
             .isEqualTo(1);
     }
 
-    Answer<Stream<MessageUid>> delayedSearchAnswer() {
-        return invocation -> {
-            Thread.sleep(1000);
-            return Stream.of(MessageUid.of(1), MessageUid.of(3));
-        };
-    }
-
     Answer<Mono<Registration>> generateEmitEventAnswer(AtomicInteger success) {
         return generateEmitEventAnswer(event(), success);
     }
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java
index a35b5d9..1377ac7 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java
@@ -22,7 +22,6 @@ package org.apache.james;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -102,7 +101,7 @@ public class SearchModuleChooser {
         }
 
         @Override
-        public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+        public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
             throw new NotImplementedException("not implemented");
         }
 
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index 7dbe612..9b3fb21 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -22,7 +22,6 @@ package org.apache.james;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -80,7 +79,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
         throw new NotImplementedException("not implemented");
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/09: JAMES-3433 MetricableBlobStore should propagate storage strategy upon reads

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 52afff56094dda654929ac59c5a5e5e8f0a99a5d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 3 10:58:32 2020 +0700

    JAMES-3433 MetricableBlobStore should propagate storage strategy upon reads
---
 .../java/org/apache/james/blob/api/MetricableBlobStore.java   | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index 309a20e..faea69d 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -70,6 +70,17 @@ public class MetricableBlobStore implements BlobStore {
     }
 
     @Override
+    public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) {
+        return metricFactory.decoratePublisherWithTimerMetric(READ_BYTES_TIMER_NAME, blobStoreImpl.readBytes(bucketName, blobId, storagePolicy));
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) {
+        return metricFactory
+            .decorateSupplierWithTimerMetric(READ_TIMER_NAME, () -> blobStoreImpl.read(bucketName, blobId, storagePolicy));
+    }
+
+    @Override
     public Publisher<Void> deleteBucket(BucketName bucketName) {
         return metricFactory.decoratePublisherWithTimerMetric(DELETE_BUCKET_TIMER_NAME, blobStoreImpl.deleteBucket(bucketName));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 07/09: JAMES-3433 CachedBlobStore should only delete data when underlying backend did delete it too

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f813e60d10d90b417656eafac4ef98783ac4dc72
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 3 11:02:25 2020 +0700

    JAMES-3433 CachedBlobStore should only delete data when underlying backend did delete it too
---
 .../src/main/java/org/apache/james/blob/api/BlobStore.java  |  2 +-
 .../java/org/apache/james/blob/api/MetricableBlobStore.java |  2 +-
 .../apache/james/blob/cassandra/cache/CachedBlobStore.java  | 13 ++++++++-----
 .../src/main/java/org/apache/james/blob/api/Store.java      |  3 ++-
 .../server/blob/deduplication/DeDuplicationBlobStore.scala  |  4 ++--
 .../server/blob/deduplication/PassThroughBlobStore.scala    |  5 +++--
 .../org/apache/james/webadmin/service/ExportService.java    |  3 ++-
 7 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
index dc4abef..faf45ad 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java
@@ -55,5 +55,5 @@ public interface BlobStore {
 
     Publisher<Void> deleteBucket(BucketName bucketName);
 
-    Publisher<Void> delete(BucketName bucketName, BlobId blobId);
+    Publisher<Boolean> delete(BucketName bucketName, BlobId blobId);
 }
diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
index faea69d..bc13d6e 100644
--- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
+++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/MetricableBlobStore.java
@@ -91,7 +91,7 @@ public class MetricableBlobStore implements BlobStore {
     }
 
     @Override
-    public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
+    public Publisher<Boolean> delete(BucketName bucketName, BlobId blobId) {
         return metricFactory.decoratePublisherWithTimerMetric(DELETE_TIMER_NAME, blobStoreImpl.delete(bucketName, blobId));
     }
 
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
index 87fdfa5..a5611d5 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java
@@ -216,12 +216,15 @@ public class CachedBlobStore implements BlobStore {
     }
 
     @Override
-    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+    public Mono<Boolean> delete(BucketName bucketName, BlobId blobId) {
         return Mono.from(backend.delete(bucketName, blobId))
-            .then(Mono.just(bucketName)
-                .filter(backend.getDefaultBucketName()::equals)
-                .flatMap(ignored -> Mono.from(cache.remove(blobId)))
-                .then());
+            .flatMap(deleted -> {
+                if (backend.getDefaultBucketName().equals(bucketName)
+                      && deleted) {
+                    return Mono.from(cache.remove(blobId)).thenReturn(deleted);
+                }
+                return Mono.just(deleted);
+            });
     }
 
     @Override
diff --git a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
index 9c20cba..89295d9 100644
--- a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
+++ b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java
@@ -108,7 +108,8 @@ public interface Store<T, I> {
         @Override
         public Publisher<Void> delete(I blobIds) {
             return Flux.fromIterable(blobIds.asMap().values())
-                .flatMap(id -> blobStore.delete(blobStore.getDefaultBucketName(), id));
+                .flatMap(id -> blobStore.delete(blobStore.getDefaultBucketName(), id))
+                .then();
         }
     }
 }
diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
index 15c8c90..6cab727 100644
--- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
+++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/DeDuplicationBlobStore.scala
@@ -91,10 +91,10 @@ class DeDuplicationBlobStore @Inject()(blobStoreDAO: BlobStoreDAO,
     blobStoreDAO.deleteBucket(bucketName)
   }
 
-  override def delete(bucketName: BucketName, blobId: BlobId): Publisher[Void] = {
+  override def delete(bucketName: BucketName, blobId: BlobId): Publisher[java.lang.Boolean] = {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(blobId)
 
-    SMono.empty
+    SMono.just(Boolean.box(false))
   }
 }
diff --git a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
index 406df50..e7e5020 100644
--- a/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
+++ b/server/blob/blob-storage-strategy/src/main/scala/org/apache/james/server/blob/deduplication/PassThroughBlobStore.scala
@@ -69,10 +69,11 @@ class PassThroughBlobStore @Inject()(blobStoreDAO: BlobStoreDAO,
     blobStoreDAO.deleteBucket(bucketName)
   }
 
-  override def delete(bucketName: BucketName, blobId: BlobId): Publisher[Void] = {
+  override def delete(bucketName: BucketName, blobId: BlobId): Publisher[java.lang.Boolean] = {
     Preconditions.checkNotNull(bucketName)
     Preconditions.checkNotNull(blobId)
 
-    blobStoreDAO.delete(bucketName, blobId)
+    SMono.fromPublisher(blobStoreDAO.delete(bucketName, blobId))
+      .`then`(SMono.just(Boolean.box(true)))
   }
 }
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExportService.java b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExportService.java
index a982fd9..2e8e2ed 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExportService.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExportService.java
@@ -139,7 +139,8 @@ public class ExportService {
             .onErrorResume(e -> {
                 LOGGER.error("Error deleting Blob with blobId: {}", blobId.asString(), e);
                 return Mono.empty();
-            });
+            })
+            .then();
     }
 
     private Mono<Void> writeUserMailboxesContent(Username username, PipedOutputStream out) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/09: JAMES-3409 Delete MailboxPathV2 content based on MailboxPathV2

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bfd12be794d7ed7b5839816601c0d13889203b4f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 3 08:37:39 2020 +0700

    JAMES-3409 Delete MailboxPathV2 content based on MailboxPathV2
---
 .../james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
index afe5e63..a2b9e41 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
@@ -115,7 +115,7 @@ public class MailboxPathV3Migration implements Migration {
     private Mono<Void> migrate(CassandraIdAndPath idAndPath) {
         return mailboxDAO.retrieveMailbox(idAndPath.getCassandraId())
             .flatMap(mailbox -> daoV3.save(mailbox)
-                .then(daoV2.delete(mailbox.generateAssociatedPath())))
+                .then(daoV2.delete(idAndPath.getMailboxPath())))
             .onErrorResume(error -> handleErrorMigrate(idAndPath, error))
             .then();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/09: MAILBOX-339 Propagate errors on single items upon migration

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d4930890e1d83e8cbc014f9dd71fa39d746feea3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Nov 2 13:50:30 2020 +0100

    MAILBOX-339 Propagate errors on single items upon migration
    
    This prevents harmful version switches
---
 .../james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java  | 2 +-
 .../james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java  | 2 +-
 .../james/mailbox/cassandra/mail/migration/MessageV3Migration.java      | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
index 28024cc..46c3fc2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2Migration.java
@@ -117,7 +117,7 @@ public class MailboxPathV2Migration implements Migration {
 
     private Mono<Void> handleErrorMigrate(CassandraIdAndPath idAndPath, Throwable throwable) {
         LOGGER.error("Error while performing migration for path {}", idAndPath.getMailboxPath(), throwable);
-        return Mono.empty();
+        return Mono.error(throwable);
     }
 
     @Override
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
index 910c61f..ccd727c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV3Migration.java
@@ -121,7 +121,7 @@ public class MailboxPathV3Migration implements Migration {
 
     private Mono<Void> handleErrorMigrate(CassandraIdAndPath idAndPath, Throwable throwable) {
         LOGGER.error("Error while performing migration for path {}", idAndPath.getMailboxPath(), throwable);
-        return Mono.empty();
+        return Mono.error(throwable);
     }
 
     @Override
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java
index 2c37c4c..72cbd85 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/MessageV3Migration.java
@@ -105,7 +105,7 @@ public class MessageV3Migration implements Migration {
 
     private Mono<Void> handleErrorMigrate(MessageRepresentation messageRepresentation, Throwable throwable) {
         LOGGER.error("Error while performing migration for {}", messageRepresentation.getMessageId(), throwable);
-        return Mono.empty();
+        return Mono.error(throwable);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 08/09: JAMES-3433 Tests enforcing Blob Store cache usage

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9150cbe81ec932c0d364d71ac45cce8718ebd2d4
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 3 11:42:09 2020 +0700

    JAMES-3433 Tests enforcing Blob Store cache usage
    
    (When configured...)
---
 .../org/apache/james/CassandraCacheQueryTest.java  | 181 +++++++++++++++++++++
 1 file changed, 181 insertions(+)

diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraCacheQueryTest.java b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraCacheQueryTest.java
new file mode 100644
index 0000000..fcd326a
--- /dev/null
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/test/java/org/apache/james/CassandraCacheQueryTest.java
@@ -0,0 +1,181 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+
+import javax.mail.MessagingException;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.james.backends.cassandra.StatementRecorder;
+import org.apache.james.backends.cassandra.TestingSession;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.SessionWithInitializedTablesFactory;
+import org.apache.james.blob.cassandra.cache.CassandraBlobCacheModule;
+import org.apache.james.core.Domain;
+import org.apache.james.mailbox.DefaultMailboxes;
+import org.apache.james.modules.MailboxProbeImpl;
+import org.apache.james.modules.protocols.ImapGuiceProbe;
+import org.apache.james.modules.protocols.SmtpGuiceProbe;
+import org.apache.james.util.MimeMessageUtil;
+import org.apache.james.util.Port;
+import org.apache.james.utils.DataProbeImpl;
+import org.apache.james.utils.GuiceProbe;
+import org.apache.james.utils.SMTPMessageSender;
+import org.apache.james.utils.SpoolerProbe;
+import org.apache.james.utils.TestIMAPClient;
+import org.apache.mailet.base.test.FakeMail;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.datastax.driver.core.Session;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.Multibinder;
+import com.google.inject.name.Names;
+
+class CassandraCacheQueryTest {
+    private static class TestingSessionProbe implements GuiceProbe {
+        private final TestingSession testingSession;
+
+        @Inject
+        private TestingSessionProbe(TestingSession testingSession) {
+            this.testingSession = testingSession;
+        }
+
+        public TestingSession getTestingSession() {
+            return testingSession;
+        }
+    }
+
+    private static class TestingSessionModule extends AbstractModule {
+        @Override
+        protected void configure() {
+            Multibinder.newSetBinder(binder(), GuiceProbe.class)
+                .addBinding()
+                .to(TestingSessionProbe.class);
+
+            bind(Session.class)
+                .annotatedWith(Names.named("cache"))
+                .to(TestingSession.class);
+            bind(Session.class)
+                .to(TestingSession.class);
+            Multibinder.newSetBinder(binder(), CassandraModule.class).addBinding().toInstance(CassandraBlobCacheModule.MODULE);
+        }
+
+        @Provides
+        @Singleton
+        TestingSession provideSession(SessionWithInitializedTablesFactory factory) {
+            return new TestingSession(factory.get());
+        }
+    }
+
+    @RegisterExtension
+    static JamesServerExtension jamesServerExtension = WithCacheImmutableTest.baseExtensionBuilder()
+        .lifeCycle(JamesServerExtension.Lifecycle.PER_TEST)
+        .overrideServerModule(new TestingSessionModule())
+        .build();
+
+    private static final int MESSAGE_COUNT = 1;
+    private static final String JAMES_SERVER_HOST = "127.0.0.1";
+    private static final String DOMAIN = "apache.org";
+    private static final String JAMES_USER = "james-user@" + DOMAIN;
+    private static final String PASSWORD = "secret";
+    private static final String SENDER = "bob@apache.org";
+    private static final String UNICODE_BODY = "Unicode €uro symbol.";
+    private static final ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(Duration.ONE_HUNDRED_MILLISECONDS)
+        .await();
+
+    private StatementRecorder statementRecorder;
+
+    @BeforeEach
+    void beforeEach(GuiceJamesServer server) throws Exception {
+        this.statementRecorder = new StatementRecorder();
+        server.getProbe(DataProbeImpl.class).fluent()
+            .addDomain(DOMAIN)
+            .addUser(JAMES_USER, PASSWORD);
+
+        MailboxProbeImpl mailboxProbe = server.getProbe(MailboxProbeImpl.class);
+        mailboxProbe.createMailbox("#private", JAMES_USER, DefaultMailboxes.INBOX);
+
+        server.getProbe(TestingSessionProbe.class).getTestingSession().printStatements();
+        server.getProbe(TestingSessionProbe.class).getTestingSession().recordStatements(statementRecorder);
+
+        sendAMail(server);
+        readAMail(server);
+    }
+
+    @Test
+    void deDuplicatingBlobStoreShouldNotClearCache() {
+        assertThat(statementRecorder.listExecutedStatements(
+              StatementRecorder.Selector.preparedStatementStartingWith("DELETE FROM blob_cache")))
+            .isEmpty();
+    }
+
+    @Test
+    void cacheShouldBeRead() {
+        assertThat(statementRecorder.listExecutedStatements(
+                StatementRecorder.Selector.preparedStatementStartingWith("SELECT * FROM blob_cache")))
+            .isNotEmpty();
+    }
+
+    private void readAMail(GuiceJamesServer server) throws IOException {
+        try (TestIMAPClient reader = new TestIMAPClient()) {
+            int imapPort = server.getProbe(ImapGuiceProbe.class).getImapPort();
+            reader.connect(JAMES_SERVER_HOST, imapPort)
+                .login(JAMES_USER, PASSWORD)
+                .select(TestIMAPClient.INBOX)
+                .awaitMessageCount(CALMLY_AWAIT, MESSAGE_COUNT);
+
+            assertThat(reader.readFirstMessage())
+                .contains(UNICODE_BODY);
+        }
+    }
+
+    private void sendAMail(GuiceJamesServer server) throws IOException, MessagingException {
+        Port smtpPort = server.getProbe(SmtpGuiceProbe.class).getSmtpPort();
+        try (SMTPMessageSender sender = new SMTPMessageSender(Domain.LOCALHOST.asString())) {
+            sender.connect(JAMES_SERVER_HOST, smtpPort);
+            MimeMessage mimeMessage = MimeMessageUtil.mimeMessageFromStream(
+                ClassLoader.getSystemResourceAsStream("eml/mail-containing-unicode-characters.eml"));
+
+            FakeMail.Builder mail = FakeMail.builder()
+                .name("test-unicode-body")
+                .sender(SENDER)
+                .recipient(JAMES_USER)
+                .mimeMessage(mimeMessage);
+
+            sender.sendMessage(mail);
+        }
+
+        CALMLY_AWAIT.until(() -> server.getProbe(SpoolerProbe.class).processingFinished());
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 09/09: JAMES-3368 Follow the specification for default Email/get properties

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9d16f687cea0924d1cc3e6b96e695031060dc966
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 3 17:11:48 2020 +0700

    JAMES-3368 Follow the specification for default Email/get properties
---
 .../rfc8621/contract/EmailGetMethodContract.scala  | 135 ++++++++++++++++++++-
 .../scala/org/apache/james/jmap/mail/Email.scala   |   5 +-
 2 files changed, 134 insertions(+), 6 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailGetMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailGetMethodContract.scala
index 01ae19e..20a1800 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailGetMethodContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailGetMethodContract.scala
@@ -297,7 +297,8 @@ trait EmailGetMethodContract {
          |    "Email/get",
          |    {
          |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
-         |      "ids": ["${messageId.serialize}"]
+         |      "ids": ["${messageId.serialize}"],
+         |      "properties": ["id", "size"]
          |    },
          |    "c1"]]
          |}""".stripMargin
@@ -2263,7 +2264,8 @@ trait EmailGetMethodContract {
          |    "Email/get",
          |    {
          |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
-         |      "ids": ["${messageId.serialize}", "invalid", "${nonExistingMessageId.serialize}"]
+         |      "ids": ["${messageId.serialize}", "invalid", "${nonExistingMessageId.serialize}"],
+         |      "properties": ["id", "size"]
          |    },
          |    "c1"]]
          |}""".stripMargin
@@ -2324,7 +2326,8 @@ trait EmailGetMethodContract {
          |    "Email/get",
          |    {
          |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
-         |      "ids": ["${messageId1.serialize()}", "${messageId2.serialize()}"]
+         |      "ids": ["${messageId1.serialize()}", "${messageId2.serialize()}"],
+         |      "properties": ["id", "size"]
          |    },
          |    "c1"]]
          |}""".stripMargin
@@ -2366,6 +2369,126 @@ trait EmailGetMethodContract {
   }
 
   @Test
+  def useDefaultPropertiesWhenNone(server: GuiceJamesServer): Unit = {
+    val path = MailboxPath.inbox(BOB)
+    val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(path)
+    val messageId: MessageId = server.getProbe(classOf[MailboxProbeImpl])
+      .appendMessage(BOB.asString, path, AppendCommand.builder()
+        .withInternalDate(Date.from(ZonedDateTime.parse("2014-10-30T14:12:00Z").toInstant))
+        .build(ClassLoader.getSystemResourceAsStream("eml/multipart_simple.eml")))
+      .getMessageId
+
+    val request =
+      s"""{
+         |  "using": [
+         |    "urn:ietf:params:jmap:core",
+         |    "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [[
+         |    "Email/get",
+         |    {
+         |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
+         |      "ids": ["${messageId.serialize}"]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+    val response = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response).isEqualTo(
+      s"""{
+         |    "sessionState": "75128aab4b1b",
+         |    "methodResponses": [
+         |        [
+         |            "Email/get",
+         |            {
+         |                "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
+         |                "state": "000001",
+         |                "list": [
+         |                    {
+         |                        "threadId": "${messageId.serialize}",
+         |                        "size": 2695,
+         |                        "keywords": {},
+         |                        "blobId": "1",
+         |                        "mailboxIds": {"${mailboxId.serialize}": true},
+         |                        "id": "${messageId.serialize}",
+         |                        "receivedAt": "2014-10-30T14:12:00Z",
+         |                        "references": null,
+         |                        "subject": "MultiAttachment",
+         |                        "inReplyTo": null,
+         |                        "messageId": ["13d4375e-a4a9-f613-06a1-7e8cb1e0ea93@linagora.com"],
+         |                        "from": [{"name": "Lina","email": "from@linagora.com"}],
+         |                        "sentAt": "2017-02-27T04:24:48Z",
+         |                        "to": [{"email": "to@linagora.com"}],
+         |                        "textBody": [
+         |                            {
+         |                                "partId": "2",
+         |                                "blobId": "${messageId.serialize}_2",
+         |                                "size": 8,
+         |                                "type": "text/plain",
+         |                                "charset": "utf-8"
+         |                            }
+         |                        ],
+         |                        "attachments": [
+         |                            {
+         |                                "partId": "3",
+         |                                "blobId": "${messageId.serialize}_3",
+         |                                "size": 271,
+         |                                "name": "text1",
+         |                                "type": "text/plain",
+         |                                "charset": "UTF-8",
+         |                                "disposition": "attachment"
+         |                            },
+         |                            {
+         |                                "partId": "4",
+         |                                "blobId": "${messageId.serialize}_4",
+         |                                "size": 398,
+         |                                "name": "text2",
+         |                                "type": "application/vnd.ms-publisher",
+         |                                "charset": "us-ascii",
+         |                                "disposition": "attachment"
+         |                            },
+         |                            {
+         |                                "partId": "5",
+         |                                "blobId": "${messageId.serialize}_5",
+         |                                "size": 412,
+         |                                "name": "text3",
+         |                                "type": "text/plain",
+         |                                "charset": "UTF-8",
+         |                                "disposition": "attachment"
+         |                            }
+         |                        ],
+         |                        "htmlBody": [
+         |                            {
+         |                                "partId": "2",
+         |                                "blobId": "${messageId.serialize}_2",
+         |                                "size": 8,
+         |                                "type": "text/plain",
+         |                                "charset": "utf-8"
+         |                            }
+         |                        ],
+         |                        "bodyValues": {},
+         |                        "preview": "Send",
+         |                        "hasAttachment": true
+         |                    }
+         |                ],
+         |                "notFound": []
+         |            },
+         |            "c1"
+         |        ]
+         |    ]
+         |}""".stripMargin)
+  }
+
+  @Test
   def requestingTheSameIdTwiceReturnsItOnce(server: GuiceJamesServer): Unit = {
     val message: Message = Message.Builder
       .of
@@ -2386,7 +2509,8 @@ trait EmailGetMethodContract {
          |    "Email/get",
          |    {
          |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
-         |      "ids": ["${messageId.serialize}", "${messageId.serialize}"]
+         |      "ids": ["${messageId.serialize}", "${messageId.serialize}"],
+         |      "properties": ["id", "size"]
          |    },
          |    "c1"]]
          |}""".stripMargin
@@ -2766,7 +2890,8 @@ trait EmailGetMethodContract {
          |    "Email/get",
          |    {
          |      "accountId": "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6",
-         |      "ids": ["${messageId.serialize}"]
+         |      "ids": ["${messageId.serialize}"],
+         |      "properties": ["id", "size"]
          |    },
          |    "c1"]]
          |}""".stripMargin
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala
index e978537..534b1ec 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala
@@ -63,7 +63,10 @@ object Email {
   type UnparsedEmailIdConstraint = NonEmpty
   type UnparsedEmailId = String Refined UnparsedEmailIdConstraint
 
-  val defaultProperties: Properties = Properties("id", "size")
+  val defaultProperties: Properties = Properties("id", "blobId", "threadId", "mailboxIds", "keywords", "size",
+    "receivedAt", "messageId", "inReplyTo", "references", "sender", "from",
+    "to", "cc", "bcc", "replyTo", "subject", "sentAt", "hasAttachment",
+    "preview", "bodyValues", "textBody", "htmlBody", "attachments")
   val allowedProperties: Properties = Properties("id", "size", "bodyStructure", "textBody", "htmlBody",
     "attachments", "headers", "bodyValues", "messageId", "inReplyTo", "references", "to", "cc", "bcc",
     "from", "sender", "replyTo", "subject", "sentAt", "mailboxIds", "blobId", "threadId", "receivedAt",


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org