You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/20 09:40:00 UTC

[james-project] 03/09: JAMES-2765 Backend-ES v6 should still include ScrollIterable

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e77a6b87c3e7ac469ddbb081863d6b110c555dcb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 16 17:27:33 2019 +0700

    JAMES-2765 Backend-ES v6 should still include ScrollIterable
---
 .../james/backends/es/v6/IndexCreationFactory.java |   2 +-
 .../backends/es/v6/search/ListenerToFuture.java    |  51 ++++++
 .../backends/es/v6/search/ScrollIterable.java      |  91 ++++++++++
 .../backends/es/v6/search/ScrollIterableTest.java  | 197 +++++++++++++++++++++
 4 files changed, 340 insertions(+), 1 deletion(-)

diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
index 2cba4ad..111bbcf 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
@@ -42,7 +42,7 @@ import com.google.common.collect.ImmutableList;
 
 public class IndexCreationFactory {
 
-    static class AliasSpecificationStep {
+    public static class AliasSpecificationStep {
         private final int nbShards;
         private final int nbReplica;
         private final IndexName indexName;
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java
new file mode 100644
index 0000000..1ae43b5
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java
@@ -0,0 +1,51 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.elasticsearch.action.ActionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ListenerToFuture<T> implements ActionListener<T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerToFuture.class);
+
+    private CompletableFuture<T> future;
+
+    public ListenerToFuture() {
+        this.future = new CompletableFuture<>();
+    }
+
+    @Override
+    public void onResponse(T t) {
+        future.complete(t);
+    }
+
+    @Override
+    public void onFailure(Exception e) {
+        LOGGER.warn("Error while waiting ElasticSearch query execution: ", e);
+        future.completeExceptionally(e);
+    }
+
+    public CompletableFuture<T> getFuture() {
+        return future;
+    }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
new file mode 100644
index 0000000..a5e4a70
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
@@ -0,0 +1,91 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import org.apache.james.util.streams.Iterators;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+
+public class ScrollIterable implements Iterable<SearchResponse> {
+    private static final TimeValue TIMEOUT = new TimeValue(60000);
+
+    private final RestHighLevelClient client;
+    private final SearchRequest searchRequest;
+
+    public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) {
+        this.client = client;
+        this.searchRequest = searchRequest;
+    }
+
+    @Override
+    public Iterator<SearchResponse> iterator() {
+        return new ScrollIterator(client, searchRequest);
+    }
+
+    public Stream<SearchResponse> stream() {
+        return Iterators.toStream(iterator());
+    }
+
+    public static class ScrollIterator implements Iterator<SearchResponse> {
+        private final RestHighLevelClient client;
+        private CompletableFuture<SearchResponse> searchResponseFuture;
+
+        ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) {
+            this.client = client;
+            ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
+            client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener);
+
+            this.searchResponseFuture = listener.getFuture();
+        }
+
+        @Override
+        public boolean hasNext() {
+            SearchResponse join = searchResponseFuture.join();
+            return !allSearchResponsesConsumed(join);
+        }
+
+        @Override
+        public SearchResponse next() {
+            SearchResponse result = searchResponseFuture.join();
+            ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
+            client.scrollAsync(
+                new SearchScrollRequest()
+                    .scrollId(result.getScrollId())
+                    .scroll(TIMEOUT),
+                RequestOptions.DEFAULT,
+                listener);
+            searchResponseFuture = listener.getFuture();
+            return result;
+        }
+
+        private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
+            return searchResponse.getHits().getHits().length == 0;
+        }
+    }
+
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
new file mode 100644
index 0000000..c9709b1
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
@@ -0,0 +1,197 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.james.backends.es.v6.ClientProvider;
+import org.apache.james.backends.es.v6.DockerElasticSearchRule;
+import org.apache.james.backends.es.v6.ElasticSearchConfiguration;
+import org.apache.james.backends.es.v6.IndexCreationFactory;
+import org.apache.james.backends.es.v6.IndexName;
+import org.apache.james.backends.es.v6.ReadAliasName;
+import org.apache.james.backends.es.v6.TypeName;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ScrollIterableTest {
+
+    private static final TimeValue TIMEOUT = new TimeValue(6000);
+    private static final int SIZE = 2;
+    private static final String MESSAGE = "message";
+    private static final IndexName INDEX_NAME = new IndexName("index");
+    private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
+    private static final TypeName TYPE_NAME = new TypeName("messages");
+
+    private static final ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
+
+    @Rule
+    public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
+    private ClientProvider clientProvider;
+
+    @Before
+    public void setUp() {
+        clientProvider = elasticSearch.clientProvider();
+        new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+            .useIndex(INDEX_NAME)
+            .addAlias(ALIAS_NAME)
+            .createIndexAndAliases(clientProvider.get());
+        elasticSearch.awaitForElasticSearch();
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenEmpty() throws Exception {
+        try (RestHighLevelClient client = clientProvider.get()) {
+            SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+                .types(TYPE_NAME.getValue())
+                .scroll(TIMEOUT)
+                .source(new SearchSourceBuilder()
+                    .query(QueryBuilders.matchAllQuery())
+                    .size(SIZE));
+
+            assertThat(new ScrollIterable(client, searchRequest))
+                .isEmpty();
+        }
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenOneElement() throws Exception {
+        try (RestHighLevelClient client = clientProvider.get()) {
+            String id = "1";
+            client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id)
+                    .source(MESSAGE, "Sample message"),
+                RequestOptions.DEFAULT);
+
+            elasticSearch.awaitForElasticSearch();
+            WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
+
+            SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+                .types(TYPE_NAME.getValue())
+                .scroll(TIMEOUT)
+                .source(new SearchSourceBuilder()
+                    .query(QueryBuilders.matchAllQuery())
+                    .size(SIZE));
+
+            assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+                .containsOnly(id);
+        }
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenSizeElement() throws Exception {
+        try (RestHighLevelClient client = clientProvider.get()) {
+            String id1 = "1";
+            client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
+                    .source(MESSAGE, "Sample message"),
+                RequestOptions.DEFAULT);
+
+            String id2 = "2";
+            client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
+                    .source(MESSAGE, "Sample message"),
+                RequestOptions.DEFAULT);
+
+            elasticSearch.awaitForElasticSearch();
+            WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
+
+            SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+                .types(TYPE_NAME.getValue())
+                .scroll(TIMEOUT)
+                .source(new SearchSourceBuilder()
+                    .query(QueryBuilders.matchAllQuery())
+                    .size(SIZE));
+
+            assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+                .containsOnly(id1, id2);
+        }
+    }
+
+    @Test
+    public void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception {
+        try (RestHighLevelClient client = clientProvider.get()) {
+            String id1 = "1";
+            client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
+                    .source(MESSAGE, "Sample message"),
+                RequestOptions.DEFAULT);
+
+            String id2 = "2";
+            client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
+                    .source(MESSAGE, "Sample message"),
+                RequestOptions.DEFAULT);
+
+            String id3 = "3";
+            client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id3)
+                    .source(MESSAGE, "Sample message"),
+                RequestOptions.DEFAULT);
+
+            elasticSearch.awaitForElasticSearch();
+            WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
+
+            SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+                .types(TYPE_NAME.getValue())
+                .scroll(TIMEOUT)
+                .source(new SearchSourceBuilder()
+                    .query(QueryBuilders.matchAllQuery())
+                    .size(SIZE));
+
+            assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
+                .containsOnly(id1, id2, id3);
+        }
+    }
+
+    private List<String> convertToIdList(ScrollIterable scrollIterable) {
+        return scrollIterable.stream()
+            .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
+            .map(SearchHit::getId)
+            .collect(Collectors.toList());
+    }
+
+    private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
+        SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+            .types(TYPE_NAME.getValue())
+            .scroll(TIMEOUT)
+            .source(new SearchSourceBuilder()
+                .query(QueryBuilders.matchAllQuery()));
+
+        SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
+            .getHits()
+            .getHits();
+
+        assertThat(hits)
+            .extracting(SearchHit::getId)
+            .contains(ids);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org