You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/20 09:40:00 UTC
[james-project] 03/09: JAMES-2765 Backend-ES v6 should still
include ScrollIterable
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e77a6b87c3e7ac469ddbb081863d6b110c555dcb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 16 17:27:33 2019 +0700
JAMES-2765 Backend-ES v6 should still include ScrollIterable
---
.../james/backends/es/v6/IndexCreationFactory.java | 2 +-
.../backends/es/v6/search/ListenerToFuture.java | 51 ++++++
.../backends/es/v6/search/ScrollIterable.java | 91 ++++++++++
.../backends/es/v6/search/ScrollIterableTest.java | 197 +++++++++++++++++++++
4 files changed, 340 insertions(+), 1 deletion(-)
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
index 2cba4ad..111bbcf 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
@@ -42,7 +42,7 @@ import com.google.common.collect.ImmutableList;
public class IndexCreationFactory {
- static class AliasSpecificationStep {
+ public static class AliasSpecificationStep {
private final int nbShards;
private final int nbReplica;
private final IndexName indexName;
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java
new file mode 100644
index 0000000..1ae43b5
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java
@@ -0,0 +1,51 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.elasticsearch.action.ActionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ListenerToFuture<T> implements ActionListener<T> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ListenerToFuture.class);
+
+ private CompletableFuture<T> future;
+
+ public ListenerToFuture() {
+ this.future = new CompletableFuture<>();
+ }
+
+ @Override
+ public void onResponse(T t) {
+ future.complete(t);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ LOGGER.warn("Error while waiting ElasticSearch query execution: ", e);
+ future.completeExceptionally(e);
+ }
+
+ public CompletableFuture<T> getFuture() {
+ return future;
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
new file mode 100644
index 0000000..a5e4a70
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import org.apache.james.util.streams.Iterators;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+
+public class ScrollIterable implements Iterable<SearchResponse> {
+ private static final TimeValue TIMEOUT = new TimeValue(60000);
+
+ private final RestHighLevelClient client;
+ private final SearchRequest searchRequest;
+
+ public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) {
+ this.client = client;
+ this.searchRequest = searchRequest;
+ }
+
+ @Override
+ public Iterator<SearchResponse> iterator() {
+ return new ScrollIterator(client, searchRequest);
+ }
+
+ public Stream<SearchResponse> stream() {
+ return Iterators.toStream(iterator());
+ }
+
+ public static class ScrollIterator implements Iterator<SearchResponse> {
+ private final RestHighLevelClient client;
+ private CompletableFuture<SearchResponse> searchResponseFuture;
+
+ ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) {
+ this.client = client;
+ ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
+ client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener);
+
+ this.searchResponseFuture = listener.getFuture();
+ }
+
+ @Override
+ public boolean hasNext() {
+ SearchResponse join = searchResponseFuture.join();
+ return !allSearchResponsesConsumed(join);
+ }
+
+ @Override
+ public SearchResponse next() {
+ SearchResponse result = searchResponseFuture.join();
+ ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
+ client.scrollAsync(
+ new SearchScrollRequest()
+ .scrollId(result.getScrollId())
+ .scroll(TIMEOUT),
+ RequestOptions.DEFAULT,
+ listener);
+ searchResponseFuture = listener.getFuture();
+ return result;
+ }
+
+ private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
+ return searchResponse.getHits().getHits().length == 0;
+ }
+ }
+
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
new file mode 100644
index 0000000..c9709b1
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
@@ -0,0 +1,197 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.james.backends.es.v6.ClientProvider;
+import org.apache.james.backends.es.v6.DockerElasticSearchRule;
+import org.apache.james.backends.es.v6.ElasticSearchConfiguration;
+import org.apache.james.backends.es.v6.IndexCreationFactory;
+import org.apache.james.backends.es.v6.IndexName;
+import org.apache.james.backends.es.v6.ReadAliasName;
+import org.apache.james.backends.es.v6.TypeName;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ScrollIterableTest {
+
+ private static final TimeValue TIMEOUT = new TimeValue(6000);
+ private static final int SIZE = 2;
+ private static final String MESSAGE = "message";
+ private static final IndexName INDEX_NAME = new IndexName("index");
+ private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
+ private static final TypeName TYPE_NAME = new TypeName("messages");
+
+ private static final ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
+
+ @Rule
+ public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
+ private ClientProvider clientProvider;
+
+ @Before
+ public void setUp() {
+ clientProvider = elasticSearch.clientProvider();
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(INDEX_NAME)
+ .addAlias(ALIAS_NAME)
+ .createIndexAndAliases(clientProvider.get());
+ elasticSearch.awaitForElasticSearch();
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenEmpty() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .types(TYPE_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
+
+ assertThat(new ScrollIterable(client, searchRequest))
+ .isEmpty();
+ }
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenOneElement() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
+ String id = "1";
+ client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
+
+ elasticSearch.awaitForElasticSearch();
+ WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
+
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .types(TYPE_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
+
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+ .containsOnly(id);
+ }
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenSizeElement() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
+ String id1 = "1";
+ client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
+
+ String id2 = "2";
+ client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
+
+ elasticSearch.awaitForElasticSearch();
+ WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
+
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .types(TYPE_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
+
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+ .containsOnly(id1, id2);
+ }
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
+ String id1 = "1";
+ client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
+
+ String id2 = "2";
+ client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
+
+ String id3 = "3";
+ client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id3)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
+
+ elasticSearch.awaitForElasticSearch();
+ WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
+
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .types(TYPE_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
+
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+ .containsOnly(id1, id2, id3);
+ }
+ }
+
+ private List<String> convertToIdList(ScrollIterable scrollIterable) {
+ return scrollIterable.stream()
+ .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
+ .map(SearchHit::getId)
+ .collect(Collectors.toList());
+ }
+
+ private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .types(TYPE_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery()));
+
+ SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
+ .getHits()
+ .getHits();
+
+ assertThat(hits)
+ .extracting(SearchHit::getId)
+ .contains(ids);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org