You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/16 08:48:31 UTC
[james-project] 16/23: JAMES-2719 Migrate ES backend code to ES6
syntax
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7f756a80bdb93b5a3168755fdee9b1546a23d561
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed May 15 15:50:40 2019 +0700
JAMES-2719 Migrate ES backend code to ES6 syntax
---
.../james/backends/es/v6/ClientProvider.java | 4 +-
.../james/backends/es/v6/ClientProviderImpl.java | 28 ++---
.../backends/es/v6/DeleteByQueryPerformer.java | 46 +++-----
.../backends/es/v6/ElasticSearchConfiguration.java | 2 +-
.../james/backends/es/v6/ElasticSearchIndexer.java | 48 ++++----
.../james/backends/es/v6/IndexCreationFactory.java | 131 +++++++++++----------
.../james/backends/es/v6/NodeMappingFactory.java | 46 ++++----
.../backends/es/v6/search/ScrollIterable.java | 81 -------------
8 files changed, 149 insertions(+), 237 deletions(-)
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
index 0145d0a..9ba5d21 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
@@ -18,9 +18,9 @@
****************************************************************/
package org.apache.james.backends.es.v6;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
public interface ClientProvider {
- Client get();
+ RestHighLevelClient get();
}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
index aac59a5..074296b 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
@@ -18,17 +18,14 @@
****************************************************************/
package org.apache.james.backends.es.v6;
-import java.net.InetAddress;
import java.util.Optional;
+import org.apache.http.HttpHost;
import org.apache.james.util.Host;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.consumers.ConsumerChainer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -50,6 +47,7 @@ public class ClientProviderImpl implements ClientProvider {
}
private static final String CLUSTER_NAME_SETTING = "cluster.name";
+ private static final String HTTP_HOST_SCHEME = "http";
private final ImmutableList<Host> hosts;
private final Optional<String> clusterName;
@@ -60,19 +58,15 @@ public class ClientProviderImpl implements ClientProvider {
this.clusterName = clusterName;
}
+ private HttpHost[] hostsToHttpHosts() {
+ return hosts.stream()
+ .map(host -> new HttpHost(host.getHostName(), host.getPort(), HTTP_HOST_SCHEME))
+ .toArray(HttpHost[]::new);
+ }
@Override
- public Client get() {
- TransportClient transportClient = TransportClient.builder()
- .settings(settings())
- .build();
- ConsumerChainer<Host> consumer = Throwing.consumer(host -> transportClient
- .addTransportAddress(
- new InetSocketTransportAddress(
- InetAddress.getByName(host.getHostName()),
- host.getPort())));
- hosts.forEach(consumer.sneakyThrow());
- return transportClient;
+ public RestHighLevelClient get() {
+ return new RestHighLevelClient(RestClient.builder(hostsToHttpHosts()));
}
@VisibleForTesting Settings settings() {
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
index 05fd04e..a912c0f 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
@@ -19,32 +19,29 @@
package org.apache.james.backends.es.v6;
+import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import org.apache.james.backends.es.v6.search.ScrollIterable;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import com.google.common.annotations.VisibleForTesting;
public class DeleteByQueryPerformer {
public static final TimeValue TIMEOUT = new TimeValue(60000);
- private final Client client;
+ private final RestHighLevelClient client;
private final ExecutorService executor;
private final int batchSize;
private final WriteAliasName aliasName;
private final TypeName typeName;
@VisibleForTesting
- public DeleteByQueryPerformer(Client client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
+ public DeleteByQueryPerformer(RestHighLevelClient client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
this.client = client;
this.executor = executor;
this.batchSize = batchSize;
@@ -56,29 +53,14 @@ public class DeleteByQueryPerformer {
return executor.submit(() -> doDeleteByQuery(queryBuilder));
}
- protected Void doDeleteByQuery(QueryBuilder queryBuilder) {
- new ScrollIterable(client,
- client.prepareSearch(aliasName.getValue())
- .setTypes(typeName.getValue())
- .setScroll(TIMEOUT)
- .setNoFields()
- .setQuery(queryBuilder)
- .setSize(batchSize))
- .stream()
- .map(searchResponse -> deleteRetrievedIds(client, searchResponse))
- .forEach(ListenableActionFuture::actionGet);
- return null;
- }
+ protected Void doDeleteByQuery(QueryBuilder queryBuilder) throws IOException {
+ DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
+ .setDocTypes(typeName.getValue())
+ .setScroll(TIMEOUT)
+ .setQuery(queryBuilder)
+ .setBatchSize(batchSize);
- private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) {
- BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
- for (SearchHit hit : searchResponse.getHits()) {
- bulkRequestBuilder.add(client.prepareDelete()
- .setIndex(aliasName.getValue())
- .setType(typeName.getValue())
- .setId(hit.getId()));
- }
- return bulkRequestBuilder.execute();
+ client.deleteByQuery(request, RequestOptions.DEFAULT);
+ return null;
}
-
}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
index f490941..032203e 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
@@ -123,7 +123,7 @@ public class ElasticSearchConfiguration {
public static final int DEFAULT_CONNECTION_MIN_DELAY = 3000;
public static final int DEFAULT_NB_SHARDS = 5;
public static final int DEFAULT_NB_REPLICA = 1;
- public static final int DEFAULT_PORT = 9300;
+ public static final int DEFAULT_PORT = 9200;
private static final String LOCALHOST = "127.0.0.1";
public static final Optional<Integer> DEFAULT_PORT_AS_OPTIONAL = Optional.of(DEFAULT_PORT);
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
index 8572e4d..492ec37 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
@@ -18,17 +18,23 @@
****************************************************************/
package org.apache.james.backends.es.v6;
+import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.ValidationException;
+import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,19 +48,19 @@ public class ElasticSearchIndexer {
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
- private final Client client;
+ private final RestHighLevelClient client;
private final DeleteByQueryPerformer deleteByQueryPerformer;
private final AliasName aliasName;
private final TypeName typeName;
- public ElasticSearchIndexer(Client client, ExecutorService executor,
+ public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
WriteAliasName aliasName,
TypeName typeName) {
this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
}
@VisibleForTesting
- public ElasticSearchIndexer(Client client, ExecutorService executor,
+ public ElasticSearchIndexer(RestHighLevelClient client, ExecutorService executor,
WriteAliasName aliasName,
TypeName typeName,
int batchSize) {
@@ -64,42 +70,42 @@ public class ElasticSearchIndexer {
this.typeName = typeName;
}
- public IndexResponse index(String id, String content) {
+ public IndexResponse index(String id, String content) throws IOException {
checkArgument(content);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Indexing {}: {}", id, StringUtils.left(content, DEBUG_MAX_LENGTH_CONTENT));
}
- return client.prepareIndex(aliasName.getValue(), typeName.getValue(), id)
- .setSource(content)
- .get();
+ return client.index(
+ new IndexRequest(aliasName.getValue(), typeName.getValue(), id)
+ .source(content, XContentType.JSON),
+ RequestOptions.DEFAULT);
}
- public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) {
+ public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) throws IOException {
try {
Preconditions.checkNotNull(updatedDocumentParts);
- BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
- updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(
- client.prepareUpdate(
- aliasName.getValue(),
+ BulkRequest request = new BulkRequest();
+ updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
+ new UpdateRequest(aliasName.getValue(),
typeName.getValue(),
updatedDocumentPart.getId())
- .setDoc(updatedDocumentPart.getUpdatedDocumentPart())));
- return Optional.of(bulkRequestBuilder.get());
+ .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)));
+ return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
} catch (ValidationException e) {
LOGGER.warn("Error while updating index", e);
return Optional.empty();
}
}
- public Optional<BulkResponse> delete(List<String> ids) {
+ public Optional<BulkResponse> delete(List<String> ids) throws IOException {
try {
- BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
- ids.forEach(id -> bulkRequestBuilder.add(
- client.prepareDelete(
+ BulkRequest request = new BulkRequest();
+ ids.forEach(id -> request.add(
+ new DeleteRequest(
aliasName.getValue(),
typeName.getValue(),
id)));
- return Optional.of(bulkRequestBuilder.get());
+ return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
} catch (ValidationException e) {
LOGGER.warn("Error while deleting index", e);
return Optional.empty();
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
index bdce72c..1a520fa 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
@@ -19,6 +19,7 @@
package org.apache.james.backends.es.v6;
+import static org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
@@ -26,23 +27,27 @@ import java.util.ArrayList;
import javax.inject.Inject;
+import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
public class IndexCreationFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class);
- public static final String CASE_INSENSITIVE = "case_insensitive";
- public static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
- public static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
- public static final String ENGLISH_SNOWBALL = "english_snowball";
+ private static final String INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE = "type=resource_already_exists_exception";
+ private static final String CASE_INSENSITIVE = "case_insensitive";
+ private static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
+ private static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
+ private static final String ENGLISH_SNOWBALL = "english_snowball";
private IndexName indexName;
private ArrayList<AliasName> aliases;
@@ -69,86 +74,86 @@ public class IndexCreationFactory {
return this;
}
- public Client createIndexAndAliases(Client client) {
+ public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
Preconditions.checkNotNull(indexName);
try {
createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica));
- aliases.forEach(alias -> createAliasIfNeeded(client, indexName, alias));
+ aliases.forEach(Throwing.consumer(alias -> createAliasIfNeeded(client, indexName, alias)));
} catch (IOException e) {
LOGGER.error("Error while creating index : ", e);
}
return client;
}
- private void createAliasIfNeeded(Client client, IndexName indexName, AliasName aliasName) {
+ private void createAliasIfNeeded(RestHighLevelClient client, IndexName indexName, AliasName aliasName) throws IOException {
if (!aliasExist(client, aliasName)) {
- client.admin()
- .indices()
- .aliases(new IndicesAliasesRequest()
- .addAlias(aliasName.getValue(), indexName.getValue()))
- .actionGet();
+ client.indices()
+ .updateAliases(
+ new IndicesAliasesRequest().addAliasAction(
+ new AliasActions(AliasActions.Type.ADD)
+ .index(indexName.getValue())
+ .alias(aliasName.getValue())),
+ RequestOptions.DEFAULT);
}
}
- private boolean aliasExist(Client client, AliasName aliasName) {
- return client.admin()
- .indices()
- .aliasesExist(new GetAliasesRequest()
- .aliases(aliasName.getValue()))
- .actionGet()
- .exists();
+ private boolean aliasExist(RestHighLevelClient client, AliasName aliasName) throws IOException {
+ return client.indices()
+ .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()),
+ RequestOptions.DEFAULT);
}
- private void createIndexIfNeeded(Client client, IndexName indexName, XContentBuilder settings) {
+ private void createIndexIfNeeded(RestHighLevelClient client, IndexName indexName, XContentBuilder settings) throws IOException {
try {
- client.admin()
- .indices()
- .prepareCreate(indexName.getValue())
- .setSettings(settings)
- .execute()
- .actionGet();
- } catch (IndexAlreadyExistsException exception) {
- LOGGER.info("Index [{}] already exist", indexName);
+ client.indices()
+ .create(
+ new CreateIndexRequest(indexName.getValue())
+ .source(settings),
+ RequestOptions.DEFAULT);
+ } catch (ElasticsearchStatusException exception) {
+ if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) {
+ LOGGER.info("Index [{}] already exist", indexName);
+ } else {
+ throw exception;
+ }
}
}
private XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException {
return jsonBuilder()
.startObject()
- .field("number_of_shards", nbShards)
- .field("number_of_replicas", nbReplica)
- .startObject("analysis")
- .startObject("analyzer")
- .startObject(CASE_INSENSITIVE)
- .field("tokenizer", "keyword")
- .startArray("filter")
- .value("lowercase")
- .endArray()
+ .startObject("settings")
+ .field("number_of_shards", nbShards)
+ .field("number_of_replicas", nbReplica)
+ .startObject("analysis")
+ .startObject("analyzer")
+ .startObject(CASE_INSENSITIVE)
+ .field("tokenizer", "keyword")
+ .startArray("filter")
+ .value("lowercase")
+ .endArray()
+ .endObject()
+ .startObject(KEEP_MAIL_AND_URL)
+ .field("tokenizer", "uax_url_email")
+ .startArray("filter")
+ .value("lowercase")
+ .value("stop")
+ .endArray()
+ .endObject()
+ .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
+ .field("tokenizer", "uax_url_email")
+ .startArray("filter")
+ .value("lowercase")
+ .value("stop")
+ .value(ENGLISH_SNOWBALL)
+ .endArray()
+ .endObject()
.endObject()
- .endObject()
- .startObject("analyzer")
- .startObject(KEEP_MAIL_AND_URL)
- .field("tokenizer", "uax_url_email")
- .startArray("filter")
- .value("lowercase")
- .value("stop")
- .endArray()
- .endObject()
- .endObject()
- .startObject("filter")
- .startObject(ENGLISH_SNOWBALL)
- .field("type", "snowball")
- .field("language", "English")
- .endObject()
- .endObject()
- .startObject("analyzer")
- .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
- .field("tokenizer", "uax_url_email")
- .startArray("filter")
- .value("lowercase")
- .value("stop")
- .value(ENGLISH_SNOWBALL)
- .endArray()
+ .startObject("filter")
+ .startObject(ENGLISH_SNOWBALL)
+ .field("type", "snowball")
+ .field("language", "English")
+ .endObject()
.endObject()
.endObject()
.endObject()
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
index 01b0bf0..eda3c50 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
@@ -19,8 +19,13 @@
package org.apache.james.backends.es.v6;
+import java.io.IOException;
+
import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetMappingsRequest;
+import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
public class NodeMappingFactory {
@@ -32,6 +37,7 @@ public class NodeMappingFactory {
public static final String INDEX = "index";
public static final String NOT_ANALYZED = "not_analyzed";
public static final String STRING = "string";
+ public static final String TEXT = "text";
public static final String PROPERTIES = "properties";
public static final String DATE = "date";
public static final String FORMAT = "format";
@@ -44,32 +50,32 @@ public class NodeMappingFactory {
public static final String SNOWBALL = "snowball";
public static final String IGNORE_ABOVE = "ignore_above";
- public static Client applyMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
+ public static RestHighLevelClient applyMapping(RestHighLevelClient client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) throws IOException {
if (!mappingAlreadyExist(client, indexName, typeName)) {
- createMapping(client, indexName, typeName, mappingsSources);
+ createMapping(client, indexName, mappingsSources);
}
return client;
}
- public static boolean mappingAlreadyExist(Client client, IndexName indexName, TypeName typeName) {
- return Iterators.toStream(client.admin()
- .indices()
- .prepareGetMappings(indexName.getValue())
- .execute()
- .actionGet()
- .getMappings()
- .valuesIt())
- .anyMatch(mapping -> mapping.keys().contains(typeName.getValue()));
+ public static boolean mappingAlreadyExist(RestHighLevelClient client, IndexName indexName, TypeName typeName) throws IOException {
+ return Iterators.toStream(client.indices()
+ .getMapping(
+ new GetMappingsRequest()
+ .indices(indexName.getValue()),
+ RequestOptions.DEFAULT)
+ .mappings()
+ .values()
+ .iterator())
+ .anyMatch(mapping -> mapping.type().contains(typeName.getValue()));
}
- public static void createMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
- client.admin()
- .indices()
- .preparePutMapping(indexName.getValue())
- .setType(typeName.getValue())
- .setSource(mappingsSources)
- .execute()
- .actionGet();
+ public static void createMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+ PutMappingRequest request = new PutMappingRequest(indexName.getValue())
+ .source(mappingsSources);
+ client.indices().putMapping(
+ new PutMappingRequest(indexName.getValue())
+ .source(mappingsSources),
+ RequestOptions.DEFAULT);
}
}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
deleted file mode 100644
index eca5bae..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6.search;
-
-import java.util.Iterator;
-import java.util.stream.Stream;
-
-import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.unit.TimeValue;
-
-public class ScrollIterable implements Iterable<SearchResponse> {
-
- private static final TimeValue TIMEOUT = new TimeValue(60000);
- private final Client client;
- private final SearchRequestBuilder searchRequestBuilder;
-
- public ScrollIterable(Client client, SearchRequestBuilder searchRequestBuilder) {
- this.client = client;
- this.searchRequestBuilder = searchRequestBuilder;
- }
-
- @Override
- public Iterator<SearchResponse> iterator() {
- return new ScrollIterator(client, searchRequestBuilder);
- }
-
- public Stream<SearchResponse> stream() {
- return Iterators.toStream(iterator());
- }
-
- public static class ScrollIterator implements Iterator<SearchResponse> {
-
- private final Client client;
- private ListenableActionFuture<SearchResponse> searchResponseFuture;
-
- public ScrollIterator(Client client, SearchRequestBuilder searchRequestBuilder) {
- this.client = client;
- this.searchResponseFuture = searchRequestBuilder.execute();
- }
-
- @Override
- public boolean hasNext() {
- return !allSearchResponsesConsumed(searchResponseFuture.actionGet());
- }
-
- @Override
- public SearchResponse next() {
- SearchResponse result = searchResponseFuture.actionGet();
- searchResponseFuture = client.prepareSearchScroll(result.getScrollId())
- .setScroll(TIMEOUT)
- .execute();
- return result;
- }
-
- private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
- return searchResponse.getHits().getHits().length == 0;
- }
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org