You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/06/07 10:59:11 UTC

[james-project] 03/07: JAMES-2777 James should close ElasticSearch scroll context

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5bf49a7c57210a929203a4da28b2211cb4f36faf
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed May 22 13:12:19 2019 +0700

    JAMES-2777 James should close ElasticSearch scroll context
---
 .../james/backends/es/DeleteByQueryPerformer.java  |   4 +-
 .../{ScrollIterable.java => ScrolledSearch.java}   |  59 ++++++++----
 ...llIterableTest.java => ScrolledSearchTest.java} |  24 ++---
 .../org/apache/james/mailbox/MessageManager.java   |   3 +-
 .../ElasticSearchListeningMessageSearchIndex.java  |  27 +++---
 .../search/ElasticSearchSearcher.java              |  24 ++---
 .../lucene/search/LuceneMessageSearchIndex.java    |   6 +-
 .../LuceneMailboxMessageSearchIndexTest.java       | 102 ++++++++++-----------
 .../james/vault/DeletedMessageVaultHookTest.java   |  13 +--
 .../elasticsearch/ElasticSearchQuotaSearcher.java  |  17 ++--
 .../james/mailbox/store/StoreMessageManager.java   |   7 +-
 .../store/search/LazyMessageSearchIndex.java       |   3 +-
 .../mailbox/store/search/MessageSearchIndex.java   |   4 +-
 .../store/search/SimpleMessageSearchIndex.java     |   5 +-
 .../search/AbstractMessageSearchIndexTest.java     |   3 +-
 .../imap/processor/AbstractMailboxProcessor.java   |   6 +-
 .../james/imap/processor/SearchProcessor.java      |  51 ++++++-----
 .../imap/processor/base/SelectedMailboxImpl.java   |   8 +-
 .../james/imap/processor/SearchProcessorTest.java  |   3 +-
 .../processor/base/MailboxEventAnalyserTest.java   |   5 +-
 .../processor/base/SelectedMailboxImplTest.java    |   7 +-
 .../org/apache/james/FakeMessageSearchIndex.java   |   4 +-
 22 files changed, 198 insertions(+), 187 deletions(-)

diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
index c84c9fc..26376c6 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
@@ -19,7 +19,7 @@
 
 package org.apache.james.backends.es;
 
-import org.apache.james.backends.es.search.ScrollIterable;
+import org.apache.james.backends.es.search.ScrolledSearch;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
@@ -51,7 +51,7 @@ public class DeleteByQueryPerformer {
     }
 
     public Mono<Void> perform(QueryBuilder queryBuilder) {
-        return Flux.fromStream(new ScrollIterable(client, prepareSearch(queryBuilder)).stream())
+        return Flux.fromStream(new ScrolledSearch(client, prepareSearch(queryBuilder)).searchResponses())
             .flatMap(searchResponse -> deleteRetrievedIds(client, searchResponse))
             .thenEmpty(Mono.empty());
     }
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
similarity index 73%
rename from backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java
rename to backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
index 1a8d693..bf015f6 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
@@ -19,39 +19,27 @@
 
 package org.apache.james.backends.es.search;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
 import org.apache.james.backends.es.ListenerToFuture;
 import org.apache.james.util.streams.Iterators;
+import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.SearchHit;
 
-public class ScrollIterable implements Iterable<SearchResponse> {
-    private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
-
-    private final RestHighLevelClient client;
-    private final SearchRequest searchRequest;
-
-    public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) {
-        this.client = client;
-        this.searchRequest = searchRequest;
-    }
+import com.github.fge.lambdas.Throwing;
 
-    @Override
-    public Iterator<SearchResponse> iterator() {
-        return new ScrollIterator(client, searchRequest);
-    }
-
-    public Stream<SearchResponse> stream() {
-        return Iterators.toStream(iterator());
-    }
-
-    public static class ScrollIterator implements Iterator<SearchResponse> {
+public class ScrolledSearch {
+    private static class ScrollIterator implements Iterator<SearchResponse>, Closeable {
         private final RestHighLevelClient client;
         private CompletableFuture<SearchResponse> searchResponseFuture;
 
@@ -64,6 +52,13 @@ public class ScrollIterable implements Iterable<SearchResponse> {
         }
 
         @Override
+        public void close() throws IOException {
+            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+            clearScrollRequest.addScrollId(searchResponseFuture.join().getScrollId());
+            client.clearScroll(clearScrollRequest);
+        }
+
+        @Override
         public boolean hasNext() {
             SearchResponse join = searchResponseFuture.join();
             return !allSearchResponsesConsumed(join);
@@ -82,9 +77,33 @@ public class ScrollIterable implements Iterable<SearchResponse> {
             return result;
         }
 
+        public Stream<SearchResponse> stream() {
+            return Iterators.toStream(this)
+                .onClose(Throwing.runnable(this::close));
+        }
+
         private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
             return searchResponse.getHits().getHits().length == 0;
         }
     }
 
+    private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+
+    private final RestHighLevelClient client;
+    private final SearchRequest searchRequest;
+
+    public ScrolledSearch(RestHighLevelClient client, SearchRequest searchRequest) {
+        this.client = client;
+        this.searchRequest = searchRequest;
+    }
+
+    public Stream<SearchHit> searchHits() {
+        return searchResponses()
+            .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
+    }
+
+    public Stream<SearchResponse> searchResponses() {
+        return new ScrollIterator(client, searchRequest)
+            .stream();
+    }
 }
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
similarity index 91%
rename from backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java
rename to backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
index c52847c..f622a63 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
@@ -23,9 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.james.backends.es.ClientProvider;
 import org.apache.james.backends.es.DockerElasticSearchRule;
@@ -47,8 +44,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-public class ScrollIterableTest {
-
+public class ScrolledSearchTest {
     private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
     private static final int SIZE = 2;
     private static final String MESSAGE = "message";
@@ -80,7 +76,7 @@ public class ScrollIterableTest {
                     .query(QueryBuilders.matchAllQuery())
                     .size(SIZE));
 
-            assertThat(new ScrollIterable(client, searchRequest))
+            assertThat(new ScrolledSearch(client, searchRequest).searchHits())
                 .isEmpty();
         }
     }
@@ -103,7 +99,8 @@ public class ScrollIterableTest {
                     .query(QueryBuilders.matchAllQuery())
                     .size(SIZE));
 
-            assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+            assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+                .extracting(SearchHit::getId)
                 .containsOnly(id);
         }
     }
@@ -132,7 +129,8 @@ public class ScrollIterableTest {
                     .query(QueryBuilders.matchAllQuery())
                     .size(SIZE));
 
-            assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+            assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+                .extracting(SearchHit::getId)
                 .containsOnly(id1, id2);
         }
     }
@@ -167,18 +165,12 @@ public class ScrollIterableTest {
                     .query(QueryBuilders.matchAllQuery())
                     .size(SIZE));
 
-            assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+            assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+                .extracting(SearchHit::getId)
                 .containsOnly(id1, id2, id3);
         }
     }
 
-    private List<String> convertToIdList(ScrollIterable scrollIterable) {
-        return scrollIterable.stream()
-            .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
-            .map(SearchHit::getId)
-            .collect(Collectors.toList());
-    }
-
     private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
         SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
             .scroll(TIMEOUT)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
index f2d13e0..29d17b0 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -103,7 +104,7 @@ public interface MessageManager {
      * @throws MailboxException
      *             when search fails for other reasons
      */
-    Iterator<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException;
+    Stream<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException;
 
     /**
      * Expunges messages in the given range from this mailbox by first retrieving the messages to be deleted
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 1dd808c..23e2845 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -23,9 +23,9 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailbox.store.SessionProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
+import org.apache.james.util.OptionalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,13 +99,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
     
     @Override
-    public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) {
+    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         Optional<Integer> noLimit = Optional.empty();
+
         return searcher
-                .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit)
-                .map(SearchResult::getMessageUid)
-                .iterator();
+            .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit)
+            .map(SearchResult::getMessageUid);
     }
     
     @Override
@@ -115,13 +116,15 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
             return ImmutableList.of();
         }
 
-        return searcher.search(mailboxIds, searchQuery, Optional.empty())
-            .peek(this::logIfNoMessageId)
-            .map(SearchResult::getMessageId)
-            .map(Optional::get)
-            .distinct()
-            .limit(limit)
-            .collect(Guavate.toImmutableList());
+        try (Stream<SearchResult> searchResults = searcher.search(mailboxIds, searchQuery, Optional.empty())) {
+            return searchResults
+                .peek(this::logIfNoMessageId)
+                .map(SearchResult::getMessageId)
+                .flatMap(OptionalUtils::toStream)
+                .distinct()
+                .limit(limit)
+                .collect(Guavate.toImmutableList());
+        }
     }
 
     @Override
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
index 3863302..9406b68 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.mailbox.elasticsearch.search;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.stream.Stream;
@@ -27,7 +26,7 @@ import java.util.stream.Stream;
 import org.apache.james.backends.es.AliasName;
 import org.apache.james.backends.es.NodeMappingFactory;
 import org.apache.james.backends.es.ReadAliasName;
-import org.apache.james.backends.es.search.ScrollIterable;
+import org.apache.james.backends.es.search.ScrolledSearch;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
 import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
@@ -37,7 +36,6 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.mailbox.store.search.MessageSearchIndex;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.document.DocumentField;
 import org.elasticsearch.common.unit.TimeValue;
@@ -49,9 +47,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.ImmutableList;
 
 public class ElasticSearchSearcher {
-
     public static final int DEFAULT_SEARCH_SIZE = 100;
-
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
     private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
     private static final ImmutableList<String> STORED_FIELDS = ImmutableList.of(JsonMessageConstants.MAILBOX_ID,
@@ -78,8 +74,9 @@ public class ElasticSearchSearcher {
     public Stream<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query,
                                                           Optional<Integer> limit) {
         SearchRequest searchRequest = prepareSearch(mailboxIds, query, limit);
-        Stream<MessageSearchIndex.SearchResult> pairStream = new ScrollIterable(client, searchRequest).stream()
-            .flatMap(this::transformResponseToUidStream);
+        Stream<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest)
+            .searchHits()
+            .flatMap(this::extractContentFromHit);
 
         return limit.map(pairStream::limit)
             .orElse(pairStream);
@@ -107,27 +104,20 @@ public class ElasticSearchSearcher {
             .orElse(size);
     }
 
-    private Stream<MessageSearchIndex.SearchResult> transformResponseToUidStream(SearchResponse searchResponse) {
-        return Arrays.stream(searchResponse.getHits().getHits())
-            .map(this::extractContentFromHit)
-            .filter(Optional::isPresent)
-            .map(Optional::get);
-    }
-
-    private Optional<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
+    private Stream<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
         DocumentField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID);
         DocumentField uid = hit.field(JsonMessageConstants.UID);
         Optional<DocumentField> id = retrieveMessageIdField(hit);
         if (mailboxId != null && uid != null) {
             Number uidAsNumber = uid.getValue();
-            return Optional.of(
+            return Stream.of(
                 new MessageSearchIndex.SearchResult(
                     id.map(field -> messageIdFactory.fromString(field.getValue())),
                     mailboxIdFactory.fromString(mailboxId.getValue()),
                     MessageUid.of(uidAsNumber.longValue())));
         } else {
             LOGGER.warn("Can not extract UID, MessageID and/or MailboxId for search result {}", hit.getId());
-            return Optional.empty();
+            return Stream.empty();
         }
     }
 
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index 93d0893..8300b5c 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -36,6 +36,7 @@ import java.util.Locale;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.stream.Stream;
 
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
@@ -463,13 +464,12 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
     
     
     @Override
-    public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
 
         return searchMultimap(ImmutableList.of(mailbox.getMailboxId()), searchQuery)
             .stream()
-            .map(SearchResult::getMessageUid)
-            .iterator();
+            .map(SearchResult::getMessageUid);
     }
 
     @Override
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index cf57832..d407a8e 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -24,10 +24,10 @@ import java.nio.charset.Charset;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -160,7 +160,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void bodySearchShouldMatchPhraseInBody() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains(CUSTARD));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
 
@@ -168,7 +168,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void bodySearchShouldNotMatchAbsentPhraseInBody() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains(CUSTARD + CUSTARD));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).isEmpty();
     }
     
@@ -176,7 +176,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void bodySearchShouldBeCaseInsensitive() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains(RHUBARD));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
 
@@ -184,7 +184,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void bodySearchNotMatchPhraseOnlyInFrom() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains(FROM_ADDRESS));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).isEmpty();
     }
 
@@ -192,7 +192,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void bodySearchShouldNotMatchPhraseOnlyInSubject() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains(SUBJECT_PART));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).isEmpty();
     }
 
@@ -200,7 +200,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void textSearchShouldMatchPhraseInBody() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.mailContains(CUSTARD));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
 
@@ -208,7 +208,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void textSearchShouldNotAbsentMatchPhraseInBody() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.mailContains(CUSTARD + CUSTARD));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).isEmpty();
     }
 
@@ -216,7 +216,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void textSearchMatchShouldBeCaseInsensitive() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.mailContains(RHUBARD.toLowerCase(Locale.US)));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
 
@@ -224,7 +224,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void addressSearchShouldMatchToFullAddress() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.address(AddressType.To,FROM_ADDRESS));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
 
@@ -232,7 +232,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void addressSearchShouldMatchToDisplayName() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.address(AddressType.To,"Harry"));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
     
@@ -240,7 +240,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void addressSearchShouldMatchToEmail() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.address(AddressType.To,"Harry@example.org"));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
     
@@ -248,7 +248,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void addressSearchShouldMatchFrom() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.address(AddressType.From,"ser-from@domain.or"));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
 
@@ -256,7 +256,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void textSearchShouldMatchPhraseOnlyInToHeader() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.mailContains(FROM_ADDRESS));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
     
@@ -264,7 +264,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void textSearchShouldMatchPhraseOnlyInSubjectHeader() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.mailContains(SUBJECT_PART));
-        Iterator<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, query);
         assertThat(result).containsExactly(uid5);
     }
     
@@ -272,7 +272,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void searchAllShouldMatchAllMailboxEmails() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.all());
-        Iterator<MessageUid> result = index.search(session, mailbox2, query);
+        Stream<MessageUid> result = index.search(session, mailbox2, query);
         assertThat(result).containsExactly(uid2);
     }
 
@@ -325,7 +325,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void flagSearchShouldMatch() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.flagIsSet(Flag.DELETED));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4);
     }
     
@@ -333,7 +333,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void bodySearchShouldMatchSeveralEmails() throws Exception {    
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains("body"));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -341,7 +341,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void textSearchShouldMatchSeveralEmails() throws Exception {    
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.mailContains("body"));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -349,7 +349,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void headerSearchShouldMatch() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.headerContains("Subject", "test"));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid4);
     }
     
@@ -357,7 +357,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void headerExistsShouldMatch() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.headerExists("Subject"));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid4);
     }
     
@@ -365,7 +365,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void flagUnsetShouldMatch() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.flagIsUnSet(Flag.DRAFT));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -376,7 +376,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         cal.setTime(new Date());
         query.andCriteria(SearchQuery.internalDateBefore(cal.getTime(), DateResolution.Day));
         
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3);
     }
     
@@ -387,7 +387,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         query.andCriteria(SearchQuery.internalDateAfter(cal.getTime(), DateResolution.Day));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid4);
     }
     
@@ -399,7 +399,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         query.andCriteria(SearchQuery.internalDateOn(cal.getTime(), DateResolution.Day));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1);
     }
     
@@ -409,7 +409,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         query.andCriteria(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1)}));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1);
     }
     
@@ -419,7 +419,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         query.andCriteria(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1), new SearchQuery.UidRange(uid3,uid4)}));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -427,7 +427,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void sizeEqualsShouldMatch() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.sizeEquals(200));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1);
     }
     
@@ -435,7 +435,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void sizeLessThanShouldMatch() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.sizeLessThan(200));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4);
     }
     
@@ -443,7 +443,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void sizeGreaterThanShouldMatch() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.sizeGreaterThan(6));
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -451,7 +451,7 @@ public class LuceneMailboxMessageSearchIndexTest {
     public void uidShouldBeSorted() throws Exception {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.all());
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -460,7 +460,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.Uid, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid4, uid3, uid1);
     }
     
@@ -469,7 +469,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.SentDate, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -478,7 +478,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.SentDate, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
 
@@ -487,7 +487,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.BaseSubject, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -496,7 +496,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.BaseSubject, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -505,7 +505,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxFrom, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -514,7 +514,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxFrom, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
     
@@ -523,7 +523,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxCc, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -532,7 +532,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxCc, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -541,7 +541,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxTo, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -550,7 +550,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxTo, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -559,7 +559,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayTo, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -568,7 +568,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayTo, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -577,7 +577,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayFrom, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -586,7 +586,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayFrom, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
     
@@ -595,7 +595,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.Arrival, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -604,7 +604,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.Arrival, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -613,7 +613,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.Size, Order.NATURAL)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -622,7 +622,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery(SearchQuery.all());
         query.setSorts(ImmutableList.of(new Sort(SortClause.Size, Order.REVERSE)));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -631,7 +631,7 @@ public class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.not(SearchQuery.uid(new SearchQuery.UidRange[] { new SearchQuery.UidRange(uid1)})));
 
-        Iterator<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, query);
         assertThat(result).containsExactly(uid3, uid4);
     }
 }
diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 324f20c..7b22354 100644
--- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -52,6 +52,7 @@ import org.apache.james.vault.search.Query;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 class DeletedMessageVaultHookTest {
@@ -165,7 +166,7 @@ class DeletedMessageVaultHookTest {
         long messageSize = messageSize(bobMessageManager, composedMessageId);
 
         DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize);
-        bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
 
         assertThat(messageVault.search(ALICE, Query.ALL).blockFirst())
             .isEqualTo(deletedMessage);
@@ -186,7 +187,7 @@ class DeletedMessageVaultHookTest {
         MessageManager bobMessageManager = mailboxManager.getMailbox(aliceMailbox, bobSession);
         appendMessage(aliceMessageManager);
 
-        bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
 
         assertThat(messageVault.search(BOB, Query.ALL).collectList().block())
             .isEmpty();
@@ -212,7 +213,7 @@ class DeletedMessageVaultHookTest {
 
         long messageSize = messageSize(bobMessageManager, composedMessageId);
         DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
-        bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
 
         assertThat(messageVault.search(BOB, Query.ALL).blockFirst())
             .isEqualTo(deletedMessage);
@@ -236,7 +237,7 @@ class DeletedMessageVaultHookTest {
 
         messageIdManager.setInMailboxes(messageId, ImmutableList.of(bobMailbox), bobSession);
 
-        bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
 
         assertThat(messageVault.search(ALICE, Query.ALL).collectList().block())
             .isEmpty();
@@ -262,7 +263,7 @@ class DeletedMessageVaultHookTest {
 
         long messageSize = messageSize(bobMessageManager, composedMessageId);
         DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
-        bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
 
         assertThat(messageVault.search(BOB, Query.ALL).blockFirst())
             .isEqualTo(deletedMessage);
@@ -286,7 +287,7 @@ class DeletedMessageVaultHookTest {
 
         messageIdManager.setInMailboxes(messageId, ImmutableList.of(aliceMailbox, bobMailbox), bobSession);
 
-        bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+        bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
 
         assertThat(messageVault.search(ALICE, Query.ALL).collectList().block())
             .isEmpty();
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
index fd3cff7..7c02032 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
@@ -29,7 +29,7 @@ import java.util.stream.Stream;
 import org.apache.james.backends.es.AliasName;
 import org.apache.james.backends.es.NodeMappingFactory;
 import org.apache.james.backends.es.ReadAliasName;
-import org.apache.james.backends.es.search.ScrollIterable;
+import org.apache.james.backends.es.search.ScrolledSearch;
 import org.apache.james.core.User;
 import org.apache.james.quota.search.QuotaQuery;
 import org.apache.james.quota.search.QuotaSearcher;
@@ -59,10 +59,12 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
     @Override
     public List<User> search(QuotaQuery query) {
         try {
-            return searchHits(query)
-                .map(SearchHit::getId)
-                .map(User::fromUsername)
-                .collect(Guavate.toImmutableList());
+            try (Stream<SearchHit> searchHits = searchHits(query)) {
+                return searchHits
+                    .map(SearchHit::getId)
+                    .map(User::fromUsername)
+                    .collect(Guavate.toImmutableList());
+            }
         } catch (IOException e) {
             throw new RuntimeException("Unexpected exception while executing " + query, e);
         }
@@ -92,13 +94,12 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
     }
 
     private Stream<SearchHit> executeScrolledSearch(QuotaQuery query) {
-        return new ScrollIterable(client,
+        return new ScrolledSearch(client,
             new SearchRequest(readAlias.getValue())
                 .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
                 .source(searchSourceBuilder(query))
                 .scroll(TIMEOUT))
-            .stream()
-            .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
+            .searchHits()
             .skip(query.getOffset().getValue());
     }
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 40318dc..5d63d3b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -745,7 +746,7 @@ public class StoreMessageManager implements MessageManager {
     }
 
     @Override
-    public Iterator<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException {
+    public Stream<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException {
         if (query.equals(new SearchQuery(SearchQuery.all()))) {
             return listAllMessageUids(mailboxSession);
         }
@@ -900,11 +901,11 @@ public class StoreMessageManager implements MessageManager {
             .getApplicableFlag(mailbox);
     }
 
-    private Iterator<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException {
+    private Stream<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException {
         final MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
 
         return messageMapper.execute(
-            () -> messageMapper.listAllMessageUids(mailbox));
+            () -> Iterators.toStream(messageMapper.listAllMessageUids(mailbox)));
     }
 
     @Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index c3faf9a..61a2f9b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
 
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxManager.SearchCapabilities;
@@ -104,7 +105,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
      * 
      */
     @Override
-    public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         MailboxId id = mailbox.getMailboxId();
         
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
index 9343ee0..eb1dcee 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
@@ -21,9 +21,9 @@ package org.apache.james.mailbox.store.search;
 
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
@@ -45,7 +45,7 @@ public interface MessageSearchIndex {
     /**
      * Return all uids of the previous indexed {@link Mailbox}'s which match the {@link SearchQuery}
      */
-    Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException;
+    Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException;
 
     /**
      * Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 2198506..26b602d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -106,13 +106,12 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     }
     
     @Override
-    public Iterator<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
+    public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         return searchResults(session, ImmutableList.of(mailbox).stream(), query)
             .stream()
             .filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId()))
-            .map(SearchResult::getMessageUid)
-            .iterator();
+            .map(SearchResult::getMessageUid);
     }
 
     private List<SearchResult> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
index 6df7e78..0f189f4 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
@@ -62,7 +62,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
 import com.jayway.awaitility.Awaitility;
 import com.jayway.awaitility.Duration;
 
@@ -1488,7 +1487,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .await()
             .atMost(30, TimeUnit.SECONDS)
             .until(
-                () -> Iterators.size(messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery)) == 9);
+                () -> messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery).count() == 9);
     }
 
     @Test
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
index fa43bb7..971bc44 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -584,9 +585,8 @@ public abstract class AbstractMailboxProcessor<M extends ImapRequest> extends Ab
             }
             searchQuery.andCriteria(SearchQuery.uid(nRanges));
             searchQuery.andCriteria(SearchQuery.modSeqGreaterThan(changedSince));
-            Iterator<MessageUid> uids = mailbox.search(searchQuery, session);
-            while (uids.hasNext()) {
-                vanishedUids.remove(uids.next());
+            try (Stream<MessageUid> uids = mailbox.search(searchQuery, session)) {
+                uids.forEach(vanishedUids::remove);
             }
             UidRange[] vanishedIdRanges = uidRanges(MessageRange.toRanges(vanishedUids));
             responder.respond(new VanishedResponse(vanishedIdRanges, true));
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
index f074eec..c01bd31 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
@@ -23,10 +23,9 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
-import java.util.TreeSet;
+import java.util.stream.Stream;
 
 import javax.mail.Flags.Flag;
 
@@ -68,6 +67,7 @@ import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> implements CapabilityImplementingProcessor {
@@ -94,27 +94,10 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp
 
             final SearchQuery query = toQuery(searchKey, session);
             MailboxSession msession = ImapSessionUtils.getMailboxSession(session);
-            final Iterator<MessageUid> it = mailbox.search(query, msession);
-            
-            final Collection<Long> results = new TreeSet<>();
-            final Collection<MessageUid> uids = new TreeSet<>();
-            
-            while (it.hasNext()) {
-                final MessageUid uid = it.next();
-                final Long number;
-                if (useUids) {
-                    uids.add(uid);
-                    results.add(uid.asLong());
-                } else {
-                    final int msn = session.getSelected().msn(uid);
-                    number = (long) msn;
-                    if (number == SelectedMailbox.NO_SUCH_MESSAGE == false) {
-                        results.add(number);
-                    }
-                }
-                
-            }
-            
+
+            final Collection<MessageUid> uids = performUidSearch(mailbox, query, msession);
+            final Collection<Long> results = asResults(session, useUids, uids);
+
             // Check if the search did contain the MODSEQ searchkey. If so we need to include the highest mod in the response.
             //
             // See RFC4551: 3.4. MODSEQ Search Criterion in SEARCH
@@ -220,7 +203,27 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp
             session.setAttribute(SEARCH_MODSEQ, null);
         }
     }
-    
+
+    private Collection<Long> asResults(ImapSession session, boolean useUids, Collection<MessageUid> uids) {
+        if (useUids) {
+            return uids.stream()
+                .map(MessageUid::asLong)
+                .collect(Guavate.toImmutableList());
+        } else {
+            return uids.stream()
+                .map(uid -> session.getSelected().msn(uid))
+                .map(Integer::longValue)
+                .filter(msn -> msn != SelectedMailbox.NO_SUCH_MESSAGE)
+                .collect(Guavate.toImmutableList());
+        }
+    }
+
+    private Collection<MessageUid> performUidSearch(MessageManager mailbox, SearchQuery query, MailboxSession msession) throws MailboxException {
+        try (Stream<MessageUid> stream = mailbox.search(query, msession)) {
+            return stream.collect(Guavate.toImmutableList());
+        }
+    }
+
     private long[] toArray(Collection<Long> results) {
         return results.stream().mapToLong(x -> x).toArray();
     }
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index c1ec72d..63a4c77 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -49,7 +50,7 @@ import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.mailbox.model.UpdatedFlags;
 
-import com.google.common.collect.ImmutableList;
+import com.github.steveash.guavate.Guavate;
 
 /**
  * Default implementation of {@link SelectedMailbox}
@@ -92,8 +93,9 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener {
         registration = eventBus.register(this, new MailboxIdRegistrationKey(mailboxId));
 
         applicableFlags = messageManager.getApplicableFlags(mailboxSession);
-        uidMsnConverter.addAll(ImmutableList.copyOf(
-            messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)));
+        try (Stream<MessageUid> stream = messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)) {
+            uidMsnConverter.addAll(stream.collect(Guavate.toImmutableList()));
+        }
     }
 
     @Override
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
index b3d4911..3e5c0a2 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.TimeZone;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -469,7 +470,7 @@ public class SearchProcessorTest {
     private void check(SearchKey key, final SearchQuery query) throws Exception {
         when(session.getAttribute(SearchProcessor.SEARCH_MODSEQ)).thenReturn(null);
         when(session.getAttribute(ImapSessionUtils.MAILBOX_SESSION_ATTRIBUTE_SESSION_KEY)).thenReturn(mailboxSession);
-        when(mailbox.search(query, mailboxSession)).thenReturn(new ArrayList<MessageUid>().iterator());
+        when(mailbox.search(query, mailboxSession)).thenReturn(Stream.empty());
         when(selectedMailbox.getApplicableFlags()).thenReturn(new Flags());
         when(selectedMailbox.hasNewApplicableFlags()).thenReturn(false);
 
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
index a22744a..7bbeaef 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Date;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -56,8 +57,6 @@ import org.apache.james.metrics.api.NoopMetricFactory;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-
 public class MailboxEventAnalyserTest {
     private static final MessageUid UID = MessageUid.of(900);
     private static final UpdatedFlags ADD_RECENT_UPDATED_FLAGS = UpdatedFlags.builder()
@@ -152,7 +151,7 @@ public class MailboxEventAnalyserTest {
         when(messageManager.getApplicableFlags(any())).thenReturn(new Flags());
         when(messageManager.getId()).thenReturn(MAILBOX_ID);
         when(messageManager.search(any(), any()))
-            .thenReturn(ImmutableList.of(MESSAGE_UID).iterator());
+            .thenReturn(Stream.of(MESSAGE_UID));
         when(messageManager.getMessages(any(), any(), any()))
             .thenReturn(new SingleMessageResultIterator(messageResult));
 
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index 64fbb7d..7d7df56 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -61,8 +62,6 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableList;
-
 
 public class SelectedMailboxImplTest {
 
@@ -146,10 +145,10 @@ public class SelectedMailboxImplTest {
             .isEqualTo(1);
     }
 
-    private Answer<Iterator<MessageUid>> delayedSearchAnswer() {
+    private Answer<Stream<MessageUid>> delayedSearchAnswer() {
         return invocation -> {
             Thread.sleep(1000);
-            return ImmutableList.of(MessageUid.of(1), MessageUid.of(3)).iterator();
+            return Stream.of(MessageUid.of(1), MessageUid.of(3));
         };
     }
 
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index fdd7cea..1ab3a1d 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -21,8 +21,8 @@ package org.apache.james;
 
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Stream;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.james.mailbox.MailboxManager;
@@ -73,7 +73,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
         throw new NotImplementedException();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org