You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/16 08:48:31 UTC

[james-project] 16/23: JAMES-2719 Migrate ES backend code to ES6 syntax

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7f756a80bdb93b5a3168755fdee9b1546a23d561
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed May 15 15:50:40 2019 +0700

    JAMES-2719 Migrate ES backend code to ES6 syntax
---
 .../james/backends/es/v6/ClientProvider.java       |   4 +-
 .../james/backends/es/v6/ClientProviderImpl.java   |  28 ++---
 .../backends/es/v6/DeleteByQueryPerformer.java     |  46 +++-----
 .../backends/es/v6/ElasticSearchConfiguration.java |   2 +-
 .../james/backends/es/v6/ElasticSearchIndexer.java |  48 ++++----
 .../james/backends/es/v6/IndexCreationFactory.java | 131 +++++++++++----------
 .../james/backends/es/v6/NodeMappingFactory.java   |  46 ++++----
 .../backends/es/v6/search/ScrollIterable.java      |  81 -------------
 8 files changed, 149 insertions(+), 237 deletions(-)

diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
index 0145d0a..9ba5d21 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
@@ -18,9 +18,9 @@
  ****************************************************************/
 package org.apache.james.backends.es.v6;
 
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
 
 public interface ClientProvider {
 
-    Client get();
+    RestHighLevelClient get();
 }
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
index aac59a5..074296b 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
@@ -18,17 +18,14 @@
  ****************************************************************/
 package org.apache.james.backends.es.v6;
 
-import java.net.InetAddress;
 import java.util.Optional;
 
+import org.apache.http.HttpHost;
 import org.apache.james.util.Host;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
 
-import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.consumers.ConsumerChainer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -50,6 +47,7 @@ public class ClientProviderImpl implements ClientProvider {
     }
 
     private static final String CLUSTER_NAME_SETTING = "cluster.name";
+    private static final String HTTP_HOST_SCHEME = "http";
 
     private final ImmutableList<Host> hosts;
     private final Optional<String> clusterName;
@@ -60,19 +58,15 @@ public class ClientProviderImpl implements ClientProvider {
         this.clusterName = clusterName;
     }
 
+    private HttpHost[] hostsToHttpHosts() {
+        return hosts.stream()
+            .map(host -> new HttpHost(host.getHostName(), host.getPort(), HTTP_HOST_SCHEME))
+            .toArray(HttpHost[]::new);
+    }
 
     @Override
-    public Client get() {
-        TransportClient transportClient = TransportClient.builder()
-                .settings(settings())
-                .build();
-        ConsumerChainer<Host> consumer = Throwing.consumer(host -> transportClient
-            .addTransportAddress(
-                new InetSocketTransportAddress(
-                    InetAddress.getByName(host.getHostName()),
-                    host.getPort())));
-        hosts.forEach(consumer.sneakyThrow());
-        return transportClient;
+    public RestHighLevelClient get() {
+        return new RestHighLevelClient(RestClient.builder(hostsToHttpHosts()));
     }
 
     @VisibleForTesting Settings settings() {
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
index 05fd04e..a912c0f 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
@@ -19,32 +19,29 @@
 
 package org.apache.james.backends.es.v6;
 
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-import org.apache.james.backends.es.v6.search.ScrollIterable;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 
 import com.google.common.annotations.VisibleForTesting;
 
 public class DeleteByQueryPerformer {
     public static final TimeValue TIMEOUT = new TimeValue(60000);
 
-    private final Client client;
+    private final RestHighLevelClient client;
     private final ExecutorService executor;
     private final int batchSize;
     private final WriteAliasName aliasName;
     private final TypeName typeName;
 
     @VisibleForTesting
-    public DeleteByQueryPerformer(Client client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
+    public DeleteByQueryPerformer(RestHighLevelClient client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
         this.client = client;
         this.executor = executor;
         this.batchSize = batchSize;
@@ -56,29 +53,14 @@ public class DeleteByQueryPerformer {
         return executor.submit(() -> doDeleteByQuery(queryBuilder));
     }
 
-    protected Void doDeleteByQuery(QueryBuilder queryBuilder) {
-        new ScrollIterable(client,
-            client.prepareSearch(aliasName.getValue())
-                .setTypes(typeName.getValue())
-                .setScroll(TIMEOUT)
-                .setNoFields()
-                .setQuery(queryBuilder)
-                .setSize(batchSize))
-            .stream()
-            .map(searchResponse -> deleteRetrievedIds(client, searchResponse))
-            .forEach(ListenableActionFuture::actionGet);
-        return null;
-    }
+    protected Void doDeleteByQuery(QueryBuilder queryBuilder) throws IOException {
+        DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
+            .setDocTypes(typeName.getValue())
+            .setScroll(TIMEOUT)
+            .setQuery(queryBuilder)
+            .setBatchSize(batchSize);
 
-    private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) {
-        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
-        for (SearchHit hit : searchResponse.getHits()) {
-            bulkRequestBuilder.add(client.prepareDelete()
-                .setIndex(aliasName.getValue())
-                .setType(typeName.getValue())
-                .setId(hit.getId()));
-        }
-        return bulkRequestBuilder.execute();
+        client.deleteByQuery(request, RequestOptions.DEFAULT);
+        return null;
     }
-
 }
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
index f490941..032203e 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
@@ -123,7 +123,7 @@ public class ElasticSearchConfiguration {
     public static final int DEFAULT_CONNECTION_MIN_DELAY = 3000;
     public static final int DEFAULT_NB_SHARDS = 5;
     public static final int DEFAULT_NB_REPLICA = 1;
-    public static final int DEFAULT_PORT = 9300;
+    public static final int DEFAULT_PORT = 9200;
     private static final String LOCALHOST = "127.0.0.1";
     public static final Optional<Integer> DEFAULT_PORT_AS_OPTIONAL = Optional.of(DEFAULT_PORT);
 
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
index 8572e4d..492ec37 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
@@ -18,17 +18,23 @@
  ****************************************************************/
 package org.apache.james.backends.es.v6;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.ValidationException;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,19 +48,19 @@ public class ElasticSearchIndexer {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
 
-    private final Client client;
+    private final RestHighLevelClient client;
     private final DeleteByQueryPerformer deleteByQueryPerformer;
     private final AliasName aliasName;
     private final TypeName typeName;
 
-    public ElasticSearchIndexer(Client client, ExecutorService executor,
+    public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
                                 WriteAliasName aliasName,
                                 TypeName typeName) {
         this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
     }
 
     @VisibleForTesting
-    public ElasticSearchIndexer(Client client, ExecutorService executor,
+    public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
                                 WriteAliasName aliasName,
                                 TypeName typeName,
                                 int batchSize) {
@@ -64,42 +70,42 @@ public class ElasticSearchIndexer {
         this.typeName = typeName;
     }
 
-    public IndexResponse index(String id, String content) {
+    public IndexResponse index(String id, String content) throws IOException {
         checkArgument(content);
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("Indexing {}: {}", id, StringUtils.left(content, DEBUG_MAX_LENGTH_CONTENT));
         }
-        return client.prepareIndex(aliasName.getValue(), typeName.getValue(), id)
-            .setSource(content)
-            .get();
+        return client.index(
+            new IndexRequest(aliasName.getValue(), typeName.getValue(), id)
+                .source(content, XContentType.JSON),
+            RequestOptions.DEFAULT);
     }
 
-    public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) {
+    public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) throws IOException {
         try {
             Preconditions.checkNotNull(updatedDocumentParts);
-            BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
-            updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(
-                client.prepareUpdate(
-                    aliasName.getValue(),
+            BulkRequest request = new BulkRequest();
+            updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
+                new UpdateRequest(aliasName.getValue(),
                     typeName.getValue(),
                     updatedDocumentPart.getId())
-                    .setDoc(updatedDocumentPart.getUpdatedDocumentPart())));
-            return Optional.of(bulkRequestBuilder.get());
+                .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)));
+            return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
         } catch (ValidationException e) {
             LOGGER.warn("Error while updating index", e);
             return Optional.empty();
         }
     }
 
-    public Optional<BulkResponse> delete(List<String> ids) {
+    public Optional<BulkResponse> delete(List<String> ids) throws IOException {
         try {
-            BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
-            ids.forEach(id -> bulkRequestBuilder.add(
-                client.prepareDelete(
+            BulkRequest request = new BulkRequest();
+            ids.forEach(id -> request.add(
+                new DeleteRequest(
                     aliasName.getValue(),
                     typeName.getValue(),
                     id)));
-            return Optional.of(bulkRequestBuilder.get());
+            return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
         } catch (ValidationException e) {
             LOGGER.warn("Error while deleting index", e);
             return Optional.empty();
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
index bdce72c..1a520fa 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.backends.es.v6;
 
+import static org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 import java.io.IOException;
@@ -26,23 +27,27 @@ import java.util.ArrayList;
 
 import javax.inject.Inject;
 
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 
 public class IndexCreationFactory {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class);
-    public static final String CASE_INSENSITIVE = "case_insensitive";
-    public static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
-    public static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
-    public static final String ENGLISH_SNOWBALL = "english_snowball";
+    private static final String INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE = "type=resource_already_exists_exception";
+    private static final String CASE_INSENSITIVE = "case_insensitive";
+    private static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
+    private static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
+    private static final String ENGLISH_SNOWBALL = "english_snowball";
 
     private IndexName indexName;
     private ArrayList<AliasName> aliases;
@@ -69,86 +74,86 @@ public class IndexCreationFactory {
         return this;
     }
 
-    public Client createIndexAndAliases(Client client) {
+    public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
         Preconditions.checkNotNull(indexName);
         try {
             createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica));
-            aliases.forEach(alias -> createAliasIfNeeded(client, indexName, alias));
+            aliases.forEach(Throwing.consumer(alias -> createAliasIfNeeded(client, indexName, alias)));
         } catch (IOException e) {
             LOGGER.error("Error while creating index : ", e);
         }
         return client;
     }
 
-    private void createAliasIfNeeded(Client client, IndexName indexName, AliasName aliasName) {
+    private void createAliasIfNeeded(RestHighLevelClient client, IndexName indexName, AliasName aliasName) throws IOException {
         if (!aliasExist(client, aliasName)) {
-            client.admin()
-                .indices()
-                .aliases(new IndicesAliasesRequest()
-                    .addAlias(aliasName.getValue(), indexName.getValue()))
-                .actionGet();
+            client.indices()
+                .updateAliases(
+                    new IndicesAliasesRequest().addAliasAction(
+                        new AliasActions(AliasActions.Type.ADD)
+                            .index(indexName.getValue())
+                            .alias(aliasName.getValue())),
+                    RequestOptions.DEFAULT);
         }
     }
 
-    private boolean aliasExist(Client client, AliasName aliasName) {
-        return client.admin()
-            .indices()
-            .aliasesExist(new GetAliasesRequest()
-                .aliases(aliasName.getValue()))
-            .actionGet()
-            .exists();
+    private boolean aliasExist(RestHighLevelClient client, AliasName aliasName) throws IOException {
+        return client.indices()
+            .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()),
+                RequestOptions.DEFAULT);
     }
 
-    private void createIndexIfNeeded(Client client, IndexName indexName, XContentBuilder settings) {
+    private void createIndexIfNeeded(RestHighLevelClient client, IndexName indexName, XContentBuilder settings) throws IOException {
         try {
-            client.admin()
-                .indices()
-                .prepareCreate(indexName.getValue())
-                .setSettings(settings)
-                .execute()
-                .actionGet();
-        } catch (IndexAlreadyExistsException exception) {
-            LOGGER.info("Index [{}] already exist", indexName);
+            client.indices()
+                .create(
+                    new CreateIndexRequest(indexName.getValue())
+                        .source(settings),
+                    RequestOptions.DEFAULT);
+        } catch (ElasticsearchStatusException exception) {
+            if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) {
+                LOGGER.info("Index [{}] already exist", indexName);
+            } else {
+                throw exception;
+            }
         }
     }
 
     private XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException {
         return jsonBuilder()
             .startObject()
-                .field("number_of_shards", nbShards)
-                .field("number_of_replicas", nbReplica)
-                .startObject("analysis")
-                    .startObject("analyzer")
-                        .startObject(CASE_INSENSITIVE)
-                            .field("tokenizer", "keyword")
-                            .startArray("filter")
-                                .value("lowercase")
-                            .endArray()
+                .startObject("settings")
+                    .field("number_of_shards", nbShards)
+                    .field("number_of_replicas", nbReplica)
+                    .startObject("analysis")
+                        .startObject("analyzer")
+                            .startObject(CASE_INSENSITIVE)
+                                .field("tokenizer", "keyword")
+                                .startArray("filter")
+                                    .value("lowercase")
+                                .endArray()
+                            .endObject()
+                            .startObject(KEEP_MAIL_AND_URL)
+                                .field("tokenizer", "uax_url_email")
+                                .startArray("filter")
+                                    .value("lowercase")
+                                    .value("stop")
+                                .endArray()
+                            .endObject()
+                            .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
+                                .field("tokenizer", "uax_url_email")
+                                .startArray("filter")
+                                    .value("lowercase")
+                                    .value("stop")
+                                    .value(ENGLISH_SNOWBALL)
+                                .endArray()
+                            .endObject()
                         .endObject()
-                    .endObject()
-                    .startObject("analyzer")
-                        .startObject(KEEP_MAIL_AND_URL)
-                            .field("tokenizer", "uax_url_email")
-                            .startArray("filter")
-                                .value("lowercase")
-                                .value("stop")
-                            .endArray()
-                        .endObject()
-                    .endObject()
-                    .startObject("filter")
-                        .startObject(ENGLISH_SNOWBALL)
-                            .field("type", "snowball")
-                            .field("language", "English")
-                        .endObject()
-                    .endObject()
-                    .startObject("analyzer")
-                        .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
-                        .field("tokenizer", "uax_url_email")
-                            .startArray("filter")
-                                .value("lowercase")
-                                .value("stop")
-                                .value(ENGLISH_SNOWBALL)
-                            .endArray()
+                        .startObject("filter")
+                            .startObject(ENGLISH_SNOWBALL)
+                                .field("type", "snowball")
+                                .field("language", "English")
+                            .endObject()
                         .endObject()
                     .endObject()
                 .endObject()
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
index 01b0bf0..eda3c50 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
@@ -19,8 +19,13 @@
 
 package org.apache.james.backends.es.v6;
 
+import java.io.IOException;
+
 import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetMappingsRequest;
+import org.elasticsearch.client.indices.PutMappingRequest;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 public class NodeMappingFactory {
@@ -32,6 +37,7 @@ public class NodeMappingFactory {
     public static final String INDEX = "index";
     public static final String NOT_ANALYZED = "not_analyzed";
     public static final String STRING = "string";
+    public static final String TEXT = "text";
     public static final String PROPERTIES = "properties";
     public static final String DATE = "date";
     public static final String FORMAT = "format";
@@ -44,32 +50,32 @@ public class NodeMappingFactory {
     public static final String SNOWBALL = "snowball";
     public static final String IGNORE_ABOVE = "ignore_above";
 
-    public static Client applyMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
+    public static RestHighLevelClient applyMapping(RestHighLevelClient client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) throws IOException {
         if (!mappingAlreadyExist(client, indexName, typeName)) {
-            createMapping(client, indexName, typeName, mappingsSources);
+            createMapping(client, indexName, mappingsSources);
         }
         return client;
     }
 
-    public static boolean mappingAlreadyExist(Client client, IndexName indexName, TypeName typeName) {
-        return Iterators.toStream(client.admin()
-            .indices()
-            .prepareGetMappings(indexName.getValue())
-            .execute()
-            .actionGet()
-            .getMappings()
-            .valuesIt())
-            .anyMatch(mapping -> mapping.keys().contains(typeName.getValue()));
+    public static boolean mappingAlreadyExist(RestHighLevelClient client, IndexName indexName, TypeName typeName) throws IOException {
+        return Iterators.toStream(client.indices()
+            .getMapping(
+                new GetMappingsRequest()
+                    .indices(indexName.getValue()),
+                RequestOptions.DEFAULT)
+            .mappings()
+            .values()
+            .iterator())
+            .anyMatch(mapping -> mapping.type().contains(typeName.getValue()));
     }
 
-    public static void createMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
-        client.admin()
-            .indices()
-            .preparePutMapping(indexName.getValue())
-            .setType(typeName.getValue())
-            .setSource(mappingsSources)
-            .execute()
-            .actionGet();
+    public static void createMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+        PutMappingRequest request = new PutMappingRequest(indexName.getValue())
+            .source(mappingsSources);
+        client.indices().putMapping(
+            new PutMappingRequest(indexName.getValue())
+                .source(mappingsSources),
+            RequestOptions.DEFAULT);
     }
 
 }
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
deleted file mode 100644
index eca5bae..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6.search;
-
-import java.util.Iterator;
-import java.util.stream.Stream;
-
-import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.unit.TimeValue;
-
-public class ScrollIterable implements Iterable<SearchResponse> {
-
-    private static final TimeValue TIMEOUT = new TimeValue(60000);
-    private final Client client;
-    private final SearchRequestBuilder searchRequestBuilder;
-
-    public ScrollIterable(Client client, SearchRequestBuilder searchRequestBuilder) {
-        this.client = client;
-        this.searchRequestBuilder = searchRequestBuilder;
-    }
-
-    @Override
-    public Iterator<SearchResponse> iterator() {
-        return new ScrollIterator(client, searchRequestBuilder);
-    }
-
-    public Stream<SearchResponse> stream() {
-        return Iterators.toStream(iterator());
-    }
-
-    public static class ScrollIterator implements Iterator<SearchResponse> {
-
-        private final Client client;
-        private ListenableActionFuture<SearchResponse> searchResponseFuture;
-
-        public ScrollIterator(Client client, SearchRequestBuilder searchRequestBuilder) {
-            this.client = client;
-            this.searchResponseFuture = searchRequestBuilder.execute();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return !allSearchResponsesConsumed(searchResponseFuture.actionGet());
-        }
-
-        @Override
-        public SearchResponse next() {
-            SearchResponse result = searchResponseFuture.actionGet();
-            searchResponseFuture =  client.prepareSearchScroll(result.getScrollId())
-                .setScroll(TIMEOUT)
-                .execute();
-            return result;
-        }
-
-        private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
-            return searchResponse.getHits().getHits().length == 0;
-        }
-    }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org