You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/06/07 10:59:11 UTC
[james-project] 03/07: JAMES-2777 James should close ElasticSearch
scroll context
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5bf49a7c57210a929203a4da28b2211cb4f36faf
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed May 22 13:12:19 2019 +0700
JAMES-2777 James should close ElasticSearch scroll context
---
.../james/backends/es/DeleteByQueryPerformer.java | 4 +-
.../{ScrollIterable.java => ScrolledSearch.java} | 59 ++++++++----
...llIterableTest.java => ScrolledSearchTest.java} | 24 ++---
.../org/apache/james/mailbox/MessageManager.java | 3 +-
.../ElasticSearchListeningMessageSearchIndex.java | 27 +++---
.../search/ElasticSearchSearcher.java | 24 ++---
.../lucene/search/LuceneMessageSearchIndex.java | 6 +-
.../LuceneMailboxMessageSearchIndexTest.java | 102 ++++++++++-----------
.../james/vault/DeletedMessageVaultHookTest.java | 13 +--
.../elasticsearch/ElasticSearchQuotaSearcher.java | 17 ++--
.../james/mailbox/store/StoreMessageManager.java | 7 +-
.../store/search/LazyMessageSearchIndex.java | 3 +-
.../mailbox/store/search/MessageSearchIndex.java | 4 +-
.../store/search/SimpleMessageSearchIndex.java | 5 +-
.../search/AbstractMessageSearchIndexTest.java | 3 +-
.../imap/processor/AbstractMailboxProcessor.java | 6 +-
.../james/imap/processor/SearchProcessor.java | 51 ++++++-----
.../imap/processor/base/SelectedMailboxImpl.java | 8 +-
.../james/imap/processor/SearchProcessorTest.java | 3 +-
.../processor/base/MailboxEventAnalyserTest.java | 5 +-
.../processor/base/SelectedMailboxImplTest.java | 7 +-
.../org/apache/james/FakeMessageSearchIndex.java | 4 +-
22 files changed, 198 insertions(+), 187 deletions(-)
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
index c84c9fc..26376c6 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
@@ -19,7 +19,7 @@
package org.apache.james.backends.es;
-import org.apache.james.backends.es.search.ScrollIterable;
+import org.apache.james.backends.es.search.ScrolledSearch;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
@@ -51,7 +51,7 @@ public class DeleteByQueryPerformer {
}
public Mono<Void> perform(QueryBuilder queryBuilder) {
- return Flux.fromStream(new ScrollIterable(client, prepareSearch(queryBuilder)).stream())
+ return Flux.fromStream(new ScrolledSearch(client, prepareSearch(queryBuilder)).searchResponses())
.flatMap(searchResponse -> deleteRetrievedIds(client, searchResponse))
.thenEmpty(Mono.empty());
}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
similarity index 73%
rename from backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java
rename to backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
index 1a8d693..bf015f6 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
@@ -19,39 +19,27 @@
package org.apache.james.backends.es.search;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.james.backends.es.ListenerToFuture;
import org.apache.james.util.streams.Iterators;
+import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.SearchHit;
-public class ScrollIterable implements Iterable<SearchResponse> {
- private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
-
- private final RestHighLevelClient client;
- private final SearchRequest searchRequest;
-
- public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) {
- this.client = client;
- this.searchRequest = searchRequest;
- }
+import com.github.fge.lambdas.Throwing;
- @Override
- public Iterator<SearchResponse> iterator() {
- return new ScrollIterator(client, searchRequest);
- }
-
- public Stream<SearchResponse> stream() {
- return Iterators.toStream(iterator());
- }
-
- public static class ScrollIterator implements Iterator<SearchResponse> {
+public class ScrolledSearch {
+ private static class ScrollIterator implements Iterator<SearchResponse>, Closeable {
private final RestHighLevelClient client;
private CompletableFuture<SearchResponse> searchResponseFuture;
@@ -64,6 +52,13 @@ public class ScrollIterable implements Iterable<SearchResponse> {
}
@Override
+ public void close() throws IOException {
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(searchResponseFuture.join().getScrollId());
+ client.clearScroll(clearScrollRequest);
+ }
+
+ @Override
public boolean hasNext() {
SearchResponse join = searchResponseFuture.join();
return !allSearchResponsesConsumed(join);
@@ -82,9 +77,33 @@ public class ScrollIterable implements Iterable<SearchResponse> {
return result;
}
+ public Stream<SearchResponse> stream() {
+ return Iterators.toStream(this)
+ .onClose(Throwing.runnable(this::close));
+ }
+
private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
return searchResponse.getHits().getHits().length == 0;
}
}
+ private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+
+ private final RestHighLevelClient client;
+ private final SearchRequest searchRequest;
+
+ public ScrolledSearch(RestHighLevelClient client, SearchRequest searchRequest) {
+ this.client = client;
+ this.searchRequest = searchRequest;
+ }
+
+ public Stream<SearchHit> searchHits() {
+ return searchResponses()
+ .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
+ }
+
+ public Stream<SearchResponse> searchResponses() {
+ return new ScrollIterator(client, searchRequest)
+ .stream();
+ }
}
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
similarity index 91%
rename from backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java
rename to backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
index c52847c..f622a63 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
@@ -23,9 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.james.backends.es.ClientProvider;
import org.apache.james.backends.es.DockerElasticSearchRule;
@@ -47,8 +44,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-public class ScrollIterableTest {
-
+public class ScrolledSearchTest {
private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
private static final int SIZE = 2;
private static final String MESSAGE = "message";
@@ -80,7 +76,7 @@ public class ScrollIterableTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(new ScrollIterable(client, searchRequest))
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits())
.isEmpty();
}
}
@@ -103,7 +99,8 @@ public class ScrollIterableTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+ .extracting(SearchHit::getId)
.containsOnly(id);
}
}
@@ -132,7 +129,8 @@ public class ScrollIterableTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+ .extracting(SearchHit::getId)
.containsOnly(id1, id2);
}
}
@@ -167,18 +165,12 @@ public class ScrollIterableTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+ .extracting(SearchHit::getId)
.containsOnly(id1, id2, id3);
}
}
- private List<String> convertToIdList(ScrollIterable scrollIterable) {
- return scrollIterable.stream()
- .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
- .map(SearchHit::getId)
- .collect(Collectors.toList());
- }
-
private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
.scroll(TIMEOUT)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
index f2d13e0..29d17b0 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Stream;
import javax.mail.Flags;
@@ -103,7 +104,7 @@ public interface MessageManager {
* @throws MailboxException
* when search fails for other reasons
*/
- Iterator<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException;
+ Stream<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException;
/**
* Expunges messages in the given range from this mailbox by first retrieving the messages to be deleted
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 1dd808c..23e2845 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -23,9 +23,9 @@ import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Named;
@@ -50,6 +50,7 @@ import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.SessionProvider;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
+import org.apache.james.util.OptionalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,13 +99,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
}
@Override
- public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) {
+ public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) {
Preconditions.checkArgument(session != null, "'session' is mandatory");
Optional<Integer> noLimit = Optional.empty();
+
return searcher
- .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit)
- .map(SearchResult::getMessageUid)
- .iterator();
+ .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit)
+ .map(SearchResult::getMessageUid);
}
@Override
@@ -115,13 +116,15 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
return ImmutableList.of();
}
- return searcher.search(mailboxIds, searchQuery, Optional.empty())
- .peek(this::logIfNoMessageId)
- .map(SearchResult::getMessageId)
- .map(Optional::get)
- .distinct()
- .limit(limit)
- .collect(Guavate.toImmutableList());
+ try (Stream<SearchResult> searchResults = searcher.search(mailboxIds, searchQuery, Optional.empty())) {
+ return searchResults
+ .peek(this::logIfNoMessageId)
+ .map(SearchResult::getMessageId)
+ .flatMap(OptionalUtils::toStream)
+ .distinct()
+ .limit(limit)
+ .collect(Guavate.toImmutableList());
+ }
}
@Override
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
index 3863302..9406b68 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
@@ -19,7 +19,6 @@
package org.apache.james.mailbox.elasticsearch.search;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
@@ -27,7 +26,7 @@ import java.util.stream.Stream;
import org.apache.james.backends.es.AliasName;
import org.apache.james.backends.es.NodeMappingFactory;
import org.apache.james.backends.es.ReadAliasName;
-import org.apache.james.backends.es.search.ScrollIterable;
+import org.apache.james.backends.es.search.ScrolledSearch;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.elasticsearch.json.JsonMessageConstants;
import org.apache.james.mailbox.elasticsearch.query.QueryConverter;
@@ -37,7 +36,6 @@ import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailbox.store.search.MessageSearchIndex;
import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.unit.TimeValue;
@@ -49,9 +47,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
public class ElasticSearchSearcher {
-
public static final int DEFAULT_SEARCH_SIZE = 100;
-
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
private static final ImmutableList<String> STORED_FIELDS = ImmutableList.of(JsonMessageConstants.MAILBOX_ID,
@@ -78,8 +74,9 @@ public class ElasticSearchSearcher {
public Stream<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query,
Optional<Integer> limit) {
SearchRequest searchRequest = prepareSearch(mailboxIds, query, limit);
- Stream<MessageSearchIndex.SearchResult> pairStream = new ScrollIterable(client, searchRequest).stream()
- .flatMap(this::transformResponseToUidStream);
+ Stream<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest)
+ .searchHits()
+ .flatMap(this::extractContentFromHit);
return limit.map(pairStream::limit)
.orElse(pairStream);
@@ -107,27 +104,20 @@ public class ElasticSearchSearcher {
.orElse(size);
}
- private Stream<MessageSearchIndex.SearchResult> transformResponseToUidStream(SearchResponse searchResponse) {
- return Arrays.stream(searchResponse.getHits().getHits())
- .map(this::extractContentFromHit)
- .filter(Optional::isPresent)
- .map(Optional::get);
- }
-
- private Optional<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
+ private Stream<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
DocumentField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID);
DocumentField uid = hit.field(JsonMessageConstants.UID);
Optional<DocumentField> id = retrieveMessageIdField(hit);
if (mailboxId != null && uid != null) {
Number uidAsNumber = uid.getValue();
- return Optional.of(
+ return Stream.of(
new MessageSearchIndex.SearchResult(
id.map(field -> messageIdFactory.fromString(field.getValue())),
mailboxIdFactory.fromString(mailboxId.getValue()),
MessageUid.of(uidAsNumber.longValue())));
} else {
LOGGER.warn("Can not extract UID, MessageID and/or MailboxId for search result {}", hit.getId());
- return Optional.empty();
+ return Stream.empty();
}
}
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index 93d0893..8300b5c 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -36,6 +36,7 @@ import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
+import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
@@ -463,13 +464,12 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
@Override
- public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+ public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
Preconditions.checkArgument(session != null, "'session' is mandatory");
return searchMultimap(ImmutableList.of(mailbox.getMailboxId()), searchQuery)
.stream()
- .map(SearchResult::getMessageUid)
- .iterator();
+ .map(SearchResult::getMessageUid);
}
@Override
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index cf57832..d407a8e 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -24,10 +24,10 @@ import java.nio.charset.Charset;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
@@ -160,7 +160,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void bodySearchShouldMatchPhraseInBody() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.bodyContains(CUSTARD));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -168,7 +168,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void bodySearchShouldNotMatchAbsentPhraseInBody() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.bodyContains(CUSTARD + CUSTARD));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).isEmpty();
}
@@ -176,7 +176,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void bodySearchShouldBeCaseInsensitive() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.bodyContains(RHUBARD));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -184,7 +184,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void bodySearchNotMatchPhraseOnlyInFrom() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.bodyContains(FROM_ADDRESS));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).isEmpty();
}
@@ -192,7 +192,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void bodySearchShouldNotMatchPhraseOnlyInSubject() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.bodyContains(SUBJECT_PART));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).isEmpty();
}
@@ -200,7 +200,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void textSearchShouldMatchPhraseInBody() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.mailContains(CUSTARD));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -208,7 +208,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void textSearchShouldNotAbsentMatchPhraseInBody() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.mailContains(CUSTARD + CUSTARD));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).isEmpty();
}
@@ -216,7 +216,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void textSearchMatchShouldBeCaseInsensitive() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.mailContains(RHUBARD.toLowerCase(Locale.US)));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -224,7 +224,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void addressSearchShouldMatchToFullAddress() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.address(AddressType.To,FROM_ADDRESS));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -232,7 +232,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void addressSearchShouldMatchToDisplayName() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.address(AddressType.To,"Harry"));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -240,7 +240,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void addressSearchShouldMatchToEmail() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.address(AddressType.To,"Harry@example.org"));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -248,7 +248,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void addressSearchShouldMatchFrom() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.address(AddressType.From,"ser-from@domain.or"));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -256,7 +256,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void textSearchShouldMatchPhraseOnlyInToHeader() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.mailContains(FROM_ADDRESS));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -264,7 +264,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void textSearchShouldMatchPhraseOnlyInSubjectHeader() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.mailContains(SUBJECT_PART));
- Iterator<MessageUid> result = index.search(session, mailbox3, query);
+ Stream<MessageUid> result = index.search(session, mailbox3, query);
assertThat(result).containsExactly(uid5);
}
@@ -272,7 +272,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void searchAllShouldMatchAllMailboxEmails() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.all());
- Iterator<MessageUid> result = index.search(session, mailbox2, query);
+ Stream<MessageUid> result = index.search(session, mailbox2, query);
assertThat(result).containsExactly(uid2);
}
@@ -325,7 +325,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void flagSearchShouldMatch() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.flagIsSet(Flag.DELETED));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4);
}
@@ -333,7 +333,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void bodySearchShouldMatchSeveralEmails() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.bodyContains("body"));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -341,7 +341,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void textSearchShouldMatchSeveralEmails() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.mailContains("body"));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -349,7 +349,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void headerSearchShouldMatch() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.headerContains("Subject", "test"));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid4);
}
@@ -357,7 +357,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void headerExistsShouldMatch() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.headerExists("Subject"));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid4);
}
@@ -365,7 +365,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void flagUnsetShouldMatch() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.flagIsUnSet(Flag.DRAFT));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -376,7 +376,7 @@ public class LuceneMailboxMessageSearchIndexTest {
cal.setTime(new Date());
query.andCriteria(SearchQuery.internalDateBefore(cal.getTime(), DateResolution.Day));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3);
}
@@ -387,7 +387,7 @@ public class LuceneMailboxMessageSearchIndexTest {
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
query.andCriteria(SearchQuery.internalDateAfter(cal.getTime(), DateResolution.Day));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid4);
}
@@ -399,7 +399,7 @@ public class LuceneMailboxMessageSearchIndexTest {
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
query.andCriteria(SearchQuery.internalDateOn(cal.getTime(), DateResolution.Day));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1);
}
@@ -409,7 +409,7 @@ public class LuceneMailboxMessageSearchIndexTest {
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
query.andCriteria(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1)}));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1);
}
@@ -419,7 +419,7 @@ public class LuceneMailboxMessageSearchIndexTest {
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
query.andCriteria(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1), new SearchQuery.UidRange(uid3,uid4)}));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -427,7 +427,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void sizeEqualsShouldMatch() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.sizeEquals(200));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1);
}
@@ -435,7 +435,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void sizeLessThanShouldMatch() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.sizeLessThan(200));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4);
}
@@ -443,7 +443,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void sizeGreaterThanShouldMatch() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.sizeGreaterThan(6));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -451,7 +451,7 @@ public class LuceneMailboxMessageSearchIndexTest {
public void uidShouldBeSorted() throws Exception {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.all());
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -460,7 +460,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.Uid, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid4, uid3, uid1);
}
@@ -469,7 +469,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.SentDate, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4, uid1);
}
@@ -478,7 +478,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.SentDate, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid4, uid3);
}
@@ -487,7 +487,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.BaseSubject, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid1, uid4);
}
@@ -496,7 +496,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.BaseSubject, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid4, uid1, uid3);
}
@@ -505,7 +505,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxFrom, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4, uid1);
}
@@ -514,7 +514,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxFrom, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid4, uid3);
}
@@ -523,7 +523,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxCc, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -532,7 +532,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxCc, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4, uid1);
}
@@ -541,7 +541,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxTo, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid4, uid1, uid3);
}
@@ -550,7 +550,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.MailboxTo, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid1, uid4);
}
@@ -559,7 +559,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayTo, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid4, uid1, uid3);
}
@@ -568,7 +568,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayTo, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid1, uid4);
}
@@ -577,7 +577,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayFrom, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4, uid1);
}
@@ -586,7 +586,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.DisplayFrom, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid4, uid3);
}
@@ -595,7 +595,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.Arrival, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid1, uid4);
}
@@ -604,7 +604,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.Arrival, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid4, uid1, uid3);
}
@@ -613,7 +613,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.Size, Order.NATURAL)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4, uid1);
}
@@ -622,7 +622,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery(SearchQuery.all());
query.setSorts(ImmutableList.of(new Sort(SortClause.Size, Order.REVERSE)));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid1, uid3, uid4);
}
@@ -631,7 +631,7 @@ public class LuceneMailboxMessageSearchIndexTest {
SearchQuery query = new SearchQuery();
query.andCriteria(SearchQuery.not(SearchQuery.uid(new SearchQuery.UidRange[] { new SearchQuery.UidRange(uid1)})));
- Iterator<MessageUid> result = index.search(session, mailbox, query);
+ Stream<MessageUid> result = index.search(session, mailbox, query);
assertThat(result).containsExactly(uid3, uid4);
}
}
diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 324f20c..7b22354 100644
--- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -52,6 +52,7 @@ import org.apache.james.vault.search.Query;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
class DeletedMessageVaultHookTest {
@@ -165,7 +166,7 @@ class DeletedMessageVaultHookTest {
long messageSize = messageSize(bobMessageManager, composedMessageId);
DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize);
- bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+ bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
assertThat(messageVault.search(ALICE, Query.ALL).blockFirst())
.isEqualTo(deletedMessage);
@@ -186,7 +187,7 @@ class DeletedMessageVaultHookTest {
MessageManager bobMessageManager = mailboxManager.getMailbox(aliceMailbox, bobSession);
appendMessage(aliceMessageManager);
- bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+ bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
assertThat(messageVault.search(BOB, Query.ALL).collectList().block())
.isEmpty();
@@ -212,7 +213,7 @@ class DeletedMessageVaultHookTest {
long messageSize = messageSize(bobMessageManager, composedMessageId);
DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
- bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+ bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
assertThat(messageVault.search(BOB, Query.ALL).blockFirst())
.isEqualTo(deletedMessage);
@@ -236,7 +237,7 @@ class DeletedMessageVaultHookTest {
messageIdManager.setInMailboxes(messageId, ImmutableList.of(bobMailbox), bobSession);
- bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+ bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
assertThat(messageVault.search(ALICE, Query.ALL).collectList().block())
.isEmpty();
@@ -262,7 +263,7 @@ class DeletedMessageVaultHookTest {
long messageSize = messageSize(bobMessageManager, composedMessageId);
DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
- bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+ bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
assertThat(messageVault.search(BOB, Query.ALL).blockFirst())
.isEqualTo(deletedMessage);
@@ -286,7 +287,7 @@ class DeletedMessageVaultHookTest {
messageIdManager.setInMailboxes(messageId, ImmutableList.of(aliceMailbox, bobMailbox), bobSession);
- bobMessageManager.delete(ImmutableList.copyOf(bobMessageManager.search(searchQuery, bobSession)), bobSession);
+ bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession);
assertThat(messageVault.search(ALICE, Query.ALL).collectList().block())
.isEmpty();
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
index fd3cff7..7c02032 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
@@ -29,7 +29,7 @@ import java.util.stream.Stream;
import org.apache.james.backends.es.AliasName;
import org.apache.james.backends.es.NodeMappingFactory;
import org.apache.james.backends.es.ReadAliasName;
-import org.apache.james.backends.es.search.ScrollIterable;
+import org.apache.james.backends.es.search.ScrolledSearch;
import org.apache.james.core.User;
import org.apache.james.quota.search.QuotaQuery;
import org.apache.james.quota.search.QuotaSearcher;
@@ -59,10 +59,12 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
@Override
public List<User> search(QuotaQuery query) {
try {
- return searchHits(query)
- .map(SearchHit::getId)
- .map(User::fromUsername)
- .collect(Guavate.toImmutableList());
+ try (Stream<SearchHit> searchHits = searchHits(query)) {
+ return searchHits
+ .map(SearchHit::getId)
+ .map(User::fromUsername)
+ .collect(Guavate.toImmutableList());
+ }
} catch (IOException e) {
throw new RuntimeException("Unexpected exception while executing " + query, e);
}
@@ -92,13 +94,12 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
}
private Stream<SearchHit> executeScrolledSearch(QuotaQuery query) {
- return new ScrollIterable(client,
+ return new ScrolledSearch(client,
new SearchRequest(readAlias.getValue())
.types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.source(searchSourceBuilder(query))
.scroll(TIMEOUT))
- .stream()
- .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
+ .searchHits()
.skip(query.getOffset().getValue());
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index 40318dc..5d63d3b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
@@ -745,7 +746,7 @@ public class StoreMessageManager implements MessageManager {
}
@Override
- public Iterator<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException {
+ public Stream<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException {
if (query.equals(new SearchQuery(SearchQuery.all()))) {
return listAllMessageUids(mailboxSession);
}
@@ -900,11 +901,11 @@ public class StoreMessageManager implements MessageManager {
.getApplicableFlag(mailbox);
}
- private Iterator<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException {
+ private Stream<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException {
final MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
return messageMapper.execute(
- () -> messageMapper.listAllMessageUids(mailbox));
+ () -> Iterators.toStream(messageMapper.listAllMessageUids(mailbox)));
}
@Override
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index c3faf9a..61a2f9b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxManager.SearchCapabilities;
@@ -104,7 +105,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
*
*/
@Override
- public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+ public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
Preconditions.checkArgument(session != null, "'session' is mandatory");
MailboxId id = mailbox.getMailboxId();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
index 9343ee0..eb1dcee 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
@@ -21,9 +21,9 @@ package org.apache.james.mailbox.store.search;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Stream;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession;
@@ -45,7 +45,7 @@ public interface MessageSearchIndex {
/**
* Return all uids of the previous indexed {@link Mailbox}'s which match the {@link SearchQuery}
*/
- Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException;
+ Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException;
/**
* Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 2198506..26b602d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -106,13 +106,12 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
}
@Override
- public Iterator<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
+ public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
Preconditions.checkArgument(session != null, "'session' is mandatory");
return searchResults(session, ImmutableList.of(mailbox).stream(), query)
.stream()
.filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId()))
- .map(SearchResult::getMessageUid)
- .iterator();
+ .map(SearchResult::getMessageUid);
}
private List<SearchResult> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
index 6df7e78..0f189f4 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
@@ -62,7 +62,6 @@ import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
@@ -1488,7 +1487,7 @@ public abstract class AbstractMessageSearchIndexTest {
.await()
.atMost(30, TimeUnit.SECONDS)
.until(
- () -> Iterators.size(messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery)) == 9);
+ () -> messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery).count() == 9);
}
@Test
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
index fa43bb7..971bc44 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Stream;
import javax.mail.Flags;
@@ -584,9 +585,8 @@ public abstract class AbstractMailboxProcessor<M extends ImapRequest> extends Ab
}
searchQuery.andCriteria(SearchQuery.uid(nRanges));
searchQuery.andCriteria(SearchQuery.modSeqGreaterThan(changedSince));
- Iterator<MessageUid> uids = mailbox.search(searchQuery, session);
- while (uids.hasNext()) {
- vanishedUids.remove(uids.next());
+ try (Stream<MessageUid> uids = mailbox.search(searchQuery, session)) {
+ uids.forEach(vanishedUids::remove);
}
UidRange[] vanishedIdRanges = uidRanges(MessageRange.toRanges(vanishedUids));
responder.respond(new VanishedResponse(vanishedIdRanges, true));
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
index f074eec..c01bd31 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
@@ -23,10 +23,9 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
-import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-import java.util.TreeSet;
+import java.util.stream.Stream;
import javax.mail.Flags.Flag;
@@ -68,6 +67,7 @@ import org.apache.james.util.MDCBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> implements CapabilityImplementingProcessor {
@@ -94,27 +94,10 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp
final SearchQuery query = toQuery(searchKey, session);
MailboxSession msession = ImapSessionUtils.getMailboxSession(session);
- final Iterator<MessageUid> it = mailbox.search(query, msession);
-
- final Collection<Long> results = new TreeSet<>();
- final Collection<MessageUid> uids = new TreeSet<>();
-
- while (it.hasNext()) {
- final MessageUid uid = it.next();
- final Long number;
- if (useUids) {
- uids.add(uid);
- results.add(uid.asLong());
- } else {
- final int msn = session.getSelected().msn(uid);
- number = (long) msn;
- if (number == SelectedMailbox.NO_SUCH_MESSAGE == false) {
- results.add(number);
- }
- }
-
- }
-
+
+ final Collection<MessageUid> uids = performUidSearch(mailbox, query, msession);
+ final Collection<Long> results = asResults(session, useUids, uids);
+
// Check if the search did contain the MODSEQ searchkey. If so we need to include the highest mod in the response.
//
// See RFC4551: 3.4. MODSEQ Search Criterion in SEARCH
@@ -220,7 +203,27 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp
session.setAttribute(SEARCH_MODSEQ, null);
}
}
-
+
+ private Collection<Long> asResults(ImapSession session, boolean useUids, Collection<MessageUid> uids) {
+ if (useUids) {
+ return uids.stream()
+ .map(MessageUid::asLong)
+ .collect(Guavate.toImmutableList());
+ } else {
+ return uids.stream()
+ .map(uid -> session.getSelected().msn(uid))
+ .map(Integer::longValue)
+ .filter(msn -> msn != SelectedMailbox.NO_SUCH_MESSAGE)
+ .collect(Guavate.toImmutableList());
+ }
+ }
+
+ private Collection<MessageUid> performUidSearch(MessageManager mailbox, SearchQuery query, MailboxSession msession) throws MailboxException {
+ try (Stream<MessageUid> stream = mailbox.search(query, msession)) {
+ return stream.collect(Guavate.toImmutableList());
+ }
+ }
+
private long[] toArray(Collection<Long> results) {
return results.stream().mapToLong(x -> x).toArray();
}
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index c1ec72d..63a4c77 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
+import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
@@ -49,7 +50,7 @@ import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailbox.model.UpdatedFlags;
-import com.google.common.collect.ImmutableList;
+import com.github.steveash.guavate.Guavate;
/**
* Default implementation of {@link SelectedMailbox}
@@ -92,8 +93,9 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener {
registration = eventBus.register(this, new MailboxIdRegistrationKey(mailboxId));
applicableFlags = messageManager.getApplicableFlags(mailboxSession);
- uidMsnConverter.addAll(ImmutableList.copyOf(
- messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)));
+ try (Stream<MessageUid> stream = messageManager.search(new SearchQuery(SearchQuery.all()), mailboxSession)) {
+ uidMsnConverter.addAll(stream.collect(Guavate.toImmutableList()));
+ }
}
@Override
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
index b3d4911..3e5c0a2 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.TimeZone;
+import java.util.stream.Stream;
import javax.mail.Flags;
import javax.mail.Flags.Flag;
@@ -469,7 +470,7 @@ public class SearchProcessorTest {
private void check(SearchKey key, final SearchQuery query) throws Exception {
when(session.getAttribute(SearchProcessor.SEARCH_MODSEQ)).thenReturn(null);
when(session.getAttribute(ImapSessionUtils.MAILBOX_SESSION_ATTRIBUTE_SESSION_KEY)).thenReturn(mailboxSession);
- when(mailbox.search(query, mailboxSession)).thenReturn(new ArrayList<MessageUid>().iterator());
+ when(mailbox.search(query, mailboxSession)).thenReturn(Stream.empty());
when(selectedMailbox.getApplicableFlags()).thenReturn(new Flags());
when(selectedMailbox.hasNewApplicableFlags()).thenReturn(false);
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
index a22744a..7bbeaef 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Date;
+import java.util.stream.Stream;
import javax.mail.Flags;
@@ -56,8 +57,6 @@ import org.apache.james.metrics.api.NoopMetricFactory;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.ImmutableList;
-
public class MailboxEventAnalyserTest {
private static final MessageUid UID = MessageUid.of(900);
private static final UpdatedFlags ADD_RECENT_UPDATED_FLAGS = UpdatedFlags.builder()
@@ -152,7 +151,7 @@ public class MailboxEventAnalyserTest {
when(messageManager.getApplicableFlags(any())).thenReturn(new Flags());
when(messageManager.getId()).thenReturn(MAILBOX_ID);
when(messageManager.search(any(), any()))
- .thenReturn(ImmutableList.of(MESSAGE_UID).iterator());
+ .thenReturn(Stream.of(MESSAGE_UID));
when(messageManager.getMessages(any(), any(), any()))
.thenReturn(new SingleMessageResultIterator(messageResult));
diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index 64fbb7d..7d7df56 100644
--- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
import javax.mail.Flags;
@@ -61,8 +62,6 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableList;
-
public class SelectedMailboxImplTest {
@@ -146,10 +145,10 @@ public class SelectedMailboxImplTest {
.isEqualTo(1);
}
- private Answer<Iterator<MessageUid>> delayedSearchAnswer() {
+ private Answer<Stream<MessageUid>> delayedSearchAnswer() {
return invocation -> {
Thread.sleep(1000);
- return ImmutableList.of(MessageUid.of(1), MessageUid.of(3)).iterator();
+ return Stream.of(MessageUid.of(1), MessageUid.of(3));
};
}
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index fdd7cea..1ab3a1d 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -21,8 +21,8 @@ package org.apache.james;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.List;
+import java.util.stream.Stream;
import org.apache.commons.lang.NotImplementedException;
import org.apache.james.mailbox.MailboxManager;
@@ -73,7 +73,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
}
@Override
- public Iterator<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+ public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
throw new NotImplementedException();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org