You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/28 04:00:28 UTC
[james-project] 01/11: JAMES-2767 replace old backend-es module by
the new one
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 82f9e48e498186d7c03a97772321069dc3710b9a
Author: Tran Tien Duc <dt...@linagora.com>
AuthorDate: Thu May 23 10:58:36 2019 +0700
JAMES-2767 replace old backend-es module by the new one
---
backends-common/elasticsearch-v6/pom.xml | 102 -------
.../org/apache/james/backends/es/v6/AliasName.java | 49 ---
.../james/backends/es/v6/ClientProvider.java | 26 --
.../james/backends/es/v6/ClientProviderImpl.java | 80 -----
.../backends/es/v6/ElasticSearchConfiguration.java | 240 ---------------
.../james/backends/es/v6/ElasticSearchIndexer.java | 126 --------
.../james/backends/es/v6/IndexCreationFactory.java | 195 ------------
.../org/apache/james/backends/es/v6/IndexName.java | 49 ---
.../james/backends/es/v6/NodeMappingFactory.java | 82 ------
.../apache/james/backends/es/v6/ReadAliasName.java | 26 --
.../backends/es/v6/UpdatedRepresentation.java | 69 -----
.../james/backends/es/v6/WriteAliasName.java | 26 --
.../backends/es/v6/search/ScrollIterable.java | 92 ------
.../es/v6/ClientProviderImplConnectionTest.java | 103 -------
.../backends/es/v6/ClientProviderImplTest.java | 142 ---------
.../james/backends/es/v6/DockerElasticSearch.java | 119 --------
.../backends/es/v6/DockerElasticSearchRule.java | 49 ---
.../es/v6/DockerElasticSearchSingleton.java | 28 --
.../es/v6/ElasticSearchConfigurationTest.java | 327 ---------------------
.../backends/es/v6/ElasticSearchIndexerTest.java | 249 ----------------
.../backends/es/v6/IndexCreationFactoryTest.java | 69 -----
.../backends/es/v6/NodeMappingFactoryTest.java | 94 ------
.../backends/es/v6/search/ScrollIterableTest.java | 203 -------------
.../src/test/resources/logback-test.xml | 12 -
backends-common/elasticsearch/pom.xml | 24 +-
.../apache/james/backends/es/ClientProvider.java | 4 +-
.../james/backends/es/ClientProviderImpl.java | 28 +-
.../james/backends/es/DeleteByQueryPerformer.java | 84 ------
.../backends/es/ElasticSearchConfiguration.java | 38 +--
.../james/backends/es/ElasticSearchIndexer.java | 84 +++---
.../james/backends/es/IndexCreationFactory.java | 245 ++++++++-------
.../james/backends/es}/ListenerToFuture.java | 2 +-
.../james/backends/es/NodeMappingFactory.java | 49 +--
.../org/apache/james/backends/es/TypeName.java | 32 --
.../james/backends/es/search/ScrollIterable.java | 49 +--
.../apache/james/backends/es}/AliasNameTest.java | 2 +-
.../es/ClientProviderImplConnectionTest.java | 43 +--
.../james/backends/es/DockerElasticSearch.java | 27 +-
.../james/backends/es/DockerElasticSearchRule.java | 5 +-
.../es/ElasticSearchConfigurationTest.java | 44 +--
.../backends/es/ElasticSearchIndexerTest.java | 109 +++----
.../backends/es/IndexCreationFactoryTest.java | 5 +-
.../james/backends/es/NodeMappingFactoryTest.java | 37 +--
.../backends/es/search/ScrollIterableTest.java | 158 +++++-----
.../backends/es/utils/TestingClientProvider.java | 37 ---
backends-common/pom.xml | 1 -
pom.xml | 11 -
47 files changed, 501 insertions(+), 3174 deletions(-)
diff --git a/backends-common/elasticsearch-v6/pom.xml b/backends-common/elasticsearch-v6/pom.xml
deleted file mode 100644
index d062168..0000000
--- a/backends-common/elasticsearch-v6/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.james</groupId>
- <artifactId>james-backends-common</artifactId>
- <version>3.4.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>apache-james-backends-es-v6</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>${james.groupId}</groupId>
- <artifactId>james-server-util</artifactId>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
- <artifactId>james-server-testing</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.github.fge</groupId>
- <artifactId>throwing-lambdas</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.github.openfeign</groupId>
- <artifactId>feign-core</artifactId>
- </dependency>
- <dependency>
- <groupId>io.github.openfeign</groupId>
- <artifactId>feign-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>nl.jqno.equalsverifier</groupId>
- <artifactId>equalsverifier</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.awaitility</groupId>
- <artifactId>awaitility</artifactId>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>6.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>testcontainers</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/AliasName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/AliasName.java
deleted file mode 100644
index 754ac6e..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/AliasName.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import java.util.Objects;
-
-public class AliasName {
- private final String value;
-
- protected AliasName(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-
- @Override
- public final boolean equals(Object o) {
- if (o instanceof AliasName) {
- AliasName aliasName = (AliasName) o;
-
- return Objects.equals(this.value, aliasName.value);
- }
- return false;
- }
-
- @Override
- public final int hashCode() {
- return Objects.hash(value);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
deleted file mode 100644
index 9ba5d21..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.backends.es.v6;
-
-import org.elasticsearch.client.RestHighLevelClient;
-
-public interface ClientProvider {
-
- RestHighLevelClient get();
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
deleted file mode 100644
index 074296b..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.backends.es.v6;
-
-import java.util.Optional;
-
-import org.apache.http.HttpHost;
-import org.apache.james.util.Host;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.settings.Settings;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-public class ClientProviderImpl implements ClientProvider {
-
- public static ClientProviderImpl forHost(String address, Integer port, Optional<String> clusterName) {
- return new ClientProviderImpl(ImmutableList.of(Host.from(address, port)), clusterName);
- }
-
- public static ClientProviderImpl fromHostsString(String hostsString, Optional<String> clusterName) {
- Preconditions.checkNotNull(hostsString, "HostString should not be null");
- return new ClientProviderImpl(Host.parseHosts(hostsString), clusterName);
- }
-
- public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts, Optional<String> clusterName) {
- Preconditions.checkNotNull(hosts, "Hosts should not be null");
- return new ClientProviderImpl(hosts, clusterName);
- }
-
- private static final String CLUSTER_NAME_SETTING = "cluster.name";
- private static final String HTTP_HOST_SCHEME = "http";
-
- private final ImmutableList<Host> hosts;
- private final Optional<String> clusterName;
-
- private ClientProviderImpl(ImmutableList<Host> hosts, Optional<String> clusterName) {
- Preconditions.checkArgument(!hosts.isEmpty(), "You should provide at least one host");
- this.hosts = hosts;
- this.clusterName = clusterName;
- }
-
- private HttpHost[] hostsToHttpHosts() {
- return hosts.stream()
- .map(host -> new HttpHost(host.getHostName(), host.getPort(), HTTP_HOST_SCHEME))
- .toArray(HttpHost[]::new);
- }
-
- @Override
- public RestHighLevelClient get() {
- return new RestHighLevelClient(RestClient.builder(hostsToHttpHosts()));
- }
-
- @VisibleForTesting Settings settings() {
- if (clusterName.isPresent()) {
- return Settings.builder()
- .put(CLUSTER_NAME_SETTING, clusterName.get())
- .build();
- }
- return Settings.EMPTY;
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
deleted file mode 100644
index 032203e..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-
-import org.apache.commons.configuration.AbstractConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.james.util.Host;
-
-import com.github.steveash.guavate.Guavate;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-public class ElasticSearchConfiguration {
-
-
- public static class Builder {
-
- private final ImmutableList.Builder<Host> hosts;
- private Optional<String> clusterName;
- private Optional<Integer> nbShards;
- private Optional<Integer> nbReplica;
- private Optional<Integer> minDelay;
- private Optional<Integer> maxRetries;
-
- public Builder() {
- hosts = ImmutableList.builder();
- clusterName = Optional.empty();
- nbShards = Optional.empty();
- nbReplica = Optional.empty();
- minDelay = Optional.empty();
- maxRetries = Optional.empty();
- }
-
- public Builder addHost(Host host) {
- this.hosts.add(host);
- return this;
- }
-
- public Builder clusterName(String clusterName) {
- this.clusterName = Optional.ofNullable(clusterName);
- return this;
- }
-
- public Builder addHosts(Collection<Host> hosts) {
- this.hosts.addAll(hosts);
- return this;
- }
-
- public Builder nbShards(int nbShards) {
- Preconditions.checkArgument(nbShards > 0, "You need the number of shards to be strictly positive");
- this.nbShards = Optional.of(nbShards);
- return this;
- }
-
- public Builder nbReplica(int nbReplica) {
- Preconditions.checkArgument(nbReplica >= 0, "You need the number of replica to be positive");
- this.nbReplica = Optional.of(nbReplica);
- return this;
- }
-
- public Builder minDelay(Optional<Integer> minDelay) {
- this.minDelay = minDelay;
- return this;
- }
-
- public Builder maxRetries(Optional<Integer> maxRetries) {
- this.maxRetries = maxRetries;
- return this;
- }
-
- public ElasticSearchConfiguration build() {
- ImmutableList<Host> hosts = this.hosts.build();
- Preconditions.checkState(!hosts.isEmpty(), "You need to specify ElasticSearch host");
- return new ElasticSearchConfiguration(
- hosts,
- clusterName,
- nbShards.orElse(DEFAULT_NB_SHARDS),
- nbReplica.orElse(DEFAULT_NB_REPLICA),
- minDelay.orElse(DEFAULT_CONNECTION_MIN_DELAY),
- maxRetries.orElse(DEFAULT_CONNECTION_MAX_RETRIES));
- }
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts";
- public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.clusterName";
- public static final String ELASTICSEARCH_MASTER_HOST = "elasticsearch.masterHost";
- public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
- public static final String ELASTICSEARCH_NB_REPLICA = "elasticsearch.nb.replica";
- public static final String ELASTICSEARCH_NB_SHARDS = "elasticsearch.nb.shards";
- public static final String ELASTICSEARCH_RETRY_CONNECTION_MIN_DELAY = "elasticsearch.retryConnection.minDelay";
- public static final String ELASTICSEARCH_RETRY_CONNECTION_MAX_RETRIES = "elasticsearch.retryConnection.maxRetries";
-
- public static final int DEFAULT_CONNECTION_MAX_RETRIES = 7;
- public static final int DEFAULT_CONNECTION_MIN_DELAY = 3000;
- public static final int DEFAULT_NB_SHARDS = 5;
- public static final int DEFAULT_NB_REPLICA = 1;
- public static final int DEFAULT_PORT = 9200;
- private static final String LOCALHOST = "127.0.0.1";
- public static final Optional<Integer> DEFAULT_PORT_AS_OPTIONAL = Optional.of(DEFAULT_PORT);
-
- public static final ElasticSearchConfiguration DEFAULT_CONFIGURATION = builder()
- .addHost(Host.from(LOCALHOST, DEFAULT_PORT))
- .build();
-
- public static ElasticSearchConfiguration fromProperties(Configuration configuration) throws ConfigurationException {
- return builder()
- .addHosts(getHosts(configuration))
- .clusterName(configuration.getString(ELASTICSEARCH_CLUSTER_NAME))
- .nbShards(configuration.getInteger(ELASTICSEARCH_NB_SHARDS, DEFAULT_NB_SHARDS))
- .nbReplica(configuration.getInteger(ELASTICSEARCH_NB_REPLICA, DEFAULT_NB_REPLICA))
- .minDelay(Optional.ofNullable(configuration.getInteger(ELASTICSEARCH_RETRY_CONNECTION_MIN_DELAY, null)))
- .maxRetries(Optional.ofNullable(configuration.getInteger(ELASTICSEARCH_RETRY_CONNECTION_MAX_RETRIES, null)))
- .build();
- }
-
- private static ImmutableList<Host> getHosts(Configuration propertiesReader) throws ConfigurationException {
- AbstractConfiguration.setDefaultListDelimiter(',');
- Optional<String> masterHost = Optional.ofNullable(
- propertiesReader.getString(ELASTICSEARCH_MASTER_HOST, null));
- Optional<Integer> masterPort = Optional.ofNullable(
- propertiesReader.getInteger(ELASTICSEARCH_PORT, null));
- List<String> multiHosts = Arrays.asList(propertiesReader.getStringArray(ELASTICSEARCH_HOSTS));
-
- validateHostsConfigurationOptions(masterHost, masterPort, multiHosts);
-
- if (masterHost.isPresent()) {
- return ImmutableList.of(
- Host.from(masterHost.get(),
- masterPort.get()));
- } else {
- return multiHosts.stream()
- .map(ipAndPort -> Host.parse(ipAndPort, DEFAULT_PORT_AS_OPTIONAL))
- .collect(Guavate.toImmutableList());
- }
- }
-
- @VisibleForTesting
- static void validateHostsConfigurationOptions(Optional<String> masterHost,
- Optional<Integer> masterPort,
- List<String> multiHosts) throws ConfigurationException {
- if (masterHost.isPresent() != masterPort.isPresent()) {
- throw new ConfigurationException(ELASTICSEARCH_MASTER_HOST + " and " + ELASTICSEARCH_PORT + " should be specified together");
- }
- if (!multiHosts.isEmpty() && masterHost.isPresent()) {
- throw new ConfigurationException("You should choose between mono host set up and " + ELASTICSEARCH_HOSTS);
- }
- if (multiHosts.isEmpty() && !masterHost.isPresent()) {
- throw new ConfigurationException("You should specify either (" + ELASTICSEARCH_MASTER_HOST + " and " + ELASTICSEARCH_PORT + ") or " + ELASTICSEARCH_HOSTS);
- }
- }
-
- private final ImmutableList<Host> hosts;
- private final Optional<String> clusterName;
- private final int nbShards;
- private final int nbReplica;
- private final int minDelay;
- private final int maxRetries;
-
- private ElasticSearchConfiguration(ImmutableList<Host> hosts, Optional<String> clusterName, int nbShards, int nbReplica, int minDelay, int maxRetries) {
- this.hosts = hosts;
- this.clusterName = clusterName;
- this.nbShards = nbShards;
- this.nbReplica = nbReplica;
- this.minDelay = minDelay;
- this.maxRetries = maxRetries;
- }
-
- public ImmutableList<Host> getHosts() {
- return hosts;
- }
-
- public Optional<String> getClusterName() {
- return clusterName;
- }
-
- public int getNbShards() {
- return nbShards;
- }
-
- public int getNbReplica() {
- return nbReplica;
- }
-
- public int getMinDelay() {
- return minDelay;
- }
-
- public int getMaxRetries() {
- return maxRetries;
- }
-
- @Override
- public final boolean equals(Object o) {
- if (o instanceof ElasticSearchConfiguration) {
- ElasticSearchConfiguration that = (ElasticSearchConfiguration) o;
-
- return Objects.equals(this.nbShards, that.nbShards)
- && Objects.equals(this.clusterName, that.clusterName)
- && Objects.equals(this.nbReplica, that.nbReplica)
- && Objects.equals(this.minDelay, that.minDelay)
- && Objects.equals(this.maxRetries, that.maxRetries)
- && Objects.equals(this.hosts, that.hosts);
- }
- return false;
- }
-
- @Override
- public final int hashCode() {
- return Objects.hash(hosts, clusterName, nbShards, nbReplica, minDelay, maxRetries);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
deleted file mode 100644
index c2e5a7d..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.backends.es.v6;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-
-import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.ValidationException;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-public class ElasticSearchIndexer {
- private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
- private static final int DEFAULT_BATCH_SIZE = 100;
- private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
-
- private final RestHighLevelClient client;
- private final AliasName aliasName;
- private final int batchSize;
-
- public ElasticSearchIndexer(RestHighLevelClient client,
- WriteAliasName aliasName) {
- this(client, aliasName, DEFAULT_BATCH_SIZE);
- }
-
- @VisibleForTesting
- public ElasticSearchIndexer(RestHighLevelClient client,
- WriteAliasName aliasName,
- int batchSize) {
- this.client = client;
- this.aliasName = aliasName;
- this.batchSize = batchSize;
- }
-
- public IndexResponse index(String id, String content) throws IOException {
- checkArgument(content);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Indexing {}: {}", id, StringUtils.left(content, DEBUG_MAX_LENGTH_CONTENT));
- }
- return client.index(
- new IndexRequest(aliasName.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id)
- .source(content, XContentType.JSON),
- RequestOptions.DEFAULT);
- }
-
- public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) throws IOException {
- try {
- Preconditions.checkNotNull(updatedDocumentParts);
- BulkRequest request = new BulkRequest();
- updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
- new UpdateRequest(aliasName.getValue(),
- NodeMappingFactory.DEFAULT_MAPPING_NAME,
- updatedDocumentPart.getId())
- .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)));
- return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
- } catch (ValidationException e) {
- LOGGER.warn("Error while updating index", e);
- return Optional.empty();
- }
- }
-
- public Optional<BulkResponse> delete(List<String> ids) throws IOException {
- try {
- BulkRequest request = new BulkRequest();
- ids.forEach(id -> request.add(
- new DeleteRequest(aliasName.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id)));
- return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
- } catch (ValidationException e) {
- LOGGER.warn("Error while deleting index", e);
- return Optional.empty();
- }
- }
-
- public void deleteAllMatchingQuery(QueryBuilder queryBuilder) {
- DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
- .setDocTypes(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .setScroll(TIMEOUT)
- .setQuery(queryBuilder)
- .setBatchSize(batchSize);
-
- client.deleteByQueryAsync(request, RequestOptions.DEFAULT, new ListenerToFuture<>());
- }
-
- private void checkArgument(String content) {
- Preconditions.checkArgument(content != null, "content should be provided");
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
deleted file mode 100644
index 17c51c7..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import static org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-import java.io.IOException;
-
-import javax.inject.Inject;
-
-import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-public class IndexCreationFactory {
-
- public static class AliasSpecificationStep {
- private final int nbShards;
- private final int nbReplica;
- private final IndexName indexName;
- private final ImmutableList.Builder<AliasName> aliases;
-
- AliasSpecificationStep(int nbShards, int nbReplica, IndexName indexName) {
- this.nbShards = nbShards;
- this.nbReplica = nbReplica;
- this.indexName = indexName;
- this.aliases = ImmutableList.builder();
- }
-
- public AliasSpecificationStep addAlias(AliasName aliasName) {
- Preconditions.checkNotNull(aliasName);
- this.aliases.add(aliasName);
- return this;
- }
-
- public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
- return new IndexCreationPerformer(nbShards, nbReplica, indexName, aliases.build()).createIndexAndAliases(client);
- }
- }
-
- static class IndexCreationPerformer {
- private final int nbShards;
- private final int nbReplica;
- private final IndexName indexName;
- private final ImmutableList<AliasName> aliases;
-
- public IndexCreationPerformer(int nbShards, int nbReplica, IndexName indexName, ImmutableList<AliasName> aliases) {
- this.nbShards = nbShards;
- this.nbReplica = nbReplica;
- this.indexName = indexName;
- this.aliases = aliases;
- }
-
- public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
- Preconditions.checkNotNull(indexName);
- try {
- createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica));
- aliases.forEach(Throwing.consumer(alias -> createAliasIfNeeded(client, indexName, alias)));
- } catch (IOException e) {
- LOGGER.error("Error while creating index : ", e);
- }
- return client;
- }
-
- private void createAliasIfNeeded(RestHighLevelClient client, IndexName indexName, AliasName aliasName) throws IOException {
- if (!aliasExist(client, aliasName)) {
- client.indices()
- .updateAliases(
- new IndicesAliasesRequest().addAliasAction(
- new AliasActions(AliasActions.Type.ADD)
- .index(indexName.getValue())
- .alias(aliasName.getValue())),
- RequestOptions.DEFAULT);
- }
- }
-
- private boolean aliasExist(RestHighLevelClient client, AliasName aliasName) throws IOException {
- return client.indices()
- .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()),
- RequestOptions.DEFAULT);
- }
-
- private void createIndexIfNeeded(RestHighLevelClient client, IndexName indexName, XContentBuilder settings) throws IOException {
- try {
- client.indices()
- .create(
- new CreateIndexRequest(indexName.getValue())
- .source(settings),
- RequestOptions.DEFAULT);
- } catch (ElasticsearchStatusException exception) {
- if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) {
- LOGGER.info("Index [{}] already exist", indexName);
- } else {
- throw exception;
- }
- }
- }
-
- private XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException {
- return jsonBuilder()
- .startObject()
- .startObject("settings")
- .field("number_of_shards", nbShards)
- .field("number_of_replicas", nbReplica)
- .startObject("analysis")
- .startObject("normalizer")
- .startObject(CASE_INSENSITIVE)
- .field("type", "custom")
- .startArray("char_filter")
- .endArray()
- .startArray("filter")
- .value("lowercase")
- .value("asciifolding")
- .endArray()
- .endObject()
- .endObject()
- .startObject("analyzer")
- .startObject(KEEP_MAIL_AND_URL)
- .field("tokenizer", "uax_url_email")
- .startArray("filter")
- .value("lowercase")
- .value("stop")
- .endArray()
- .endObject()
- .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
- .field("tokenizer", "uax_url_email")
- .startArray("filter")
- .value("lowercase")
- .value("stop")
- .value(ENGLISH_SNOWBALL)
- .endArray()
- .endObject()
- .endObject()
- .startObject("filter")
- .startObject(ENGLISH_SNOWBALL)
- .field("type", "snowball")
- .field("language", "English")
- .endObject()
- .endObject()
- .endObject()
- .endObject()
- .endObject();
- }
- }
-
- private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class);
- private static final String INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE = "type=resource_already_exists_exception";
-
- private final int nbShards;
- private final int nbReplica;
-
- public static final String CASE_INSENSITIVE = "case_insensitive";
- public static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
- public static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
- public static final String ENGLISH_SNOWBALL = "english_snowball";
-
- @Inject
- public IndexCreationFactory(ElasticSearchConfiguration configuration) {
- this.nbShards = configuration.getNbShards();
- this.nbReplica = configuration.getNbReplica();
- }
-
- public AliasSpecificationStep useIndex(IndexName indexName) {
- Preconditions.checkNotNull(indexName);
- return new AliasSpecificationStep(nbShards, nbReplica, indexName);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexName.java
deleted file mode 100644
index 39c3f28..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexName.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import java.util.Objects;
-
-public class IndexName {
- private final String value;
-
- public IndexName(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-
- @Override
- public final boolean equals(Object o) {
- if (o instanceof IndexName) {
- IndexName indexName = (IndexName) o;
-
- return Objects.equals(this.value, indexName.value);
- }
- return false;
- }
-
- @Override
- public final int hashCode() {
- return Objects.hash(value);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
deleted file mode 100644
index 5fc2df1..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import java.io.IOException;
-
-import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.GetMappingsRequest;
-import org.elasticsearch.client.indices.PutMappingRequest;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-
-public class NodeMappingFactory {
-
- public static final String DEFAULT_MAPPING_NAME = "_doc";
- public static final String BOOLEAN = "boolean";
- public static final String TYPE = "type";
- public static final String LONG = "long";
- public static final String DOUBLE = "double";
- public static final String INDEX = "index";
- public static final String NOT_ANALYZED = "not_analyzed";
- public static final String STRING = "string";
- public static final String TEXT = "text";
- public static final String KEYWORD = "keyword";
- public static final String PROPERTIES = "properties";
- public static final String DATE = "date";
- public static final String FORMAT = "format";
- public static final String NESTED = "nested";
- public static final String FIELDS = "fields";
- public static final String RAW = "raw";
- public static final String SPLIT_EMAIL = "splitEmail";
- public static final String ANALYZER = "analyzer";
- public static final String NORMALIZER = "normalizer";
- public static final String SEARCH_ANALYZER = "search_analyzer";
- public static final String SNOWBALL = "snowball";
- public static final String IGNORE_ABOVE = "ignore_above";
-
- public static RestHighLevelClient applyMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
- if (!mappingAlreadyExist(client, indexName)) {
- createMapping(client, indexName, mappingsSources);
- }
- return client;
- }
-
- public static boolean mappingAlreadyExist(RestHighLevelClient client, IndexName indexName) throws IOException {
- return Iterators.toStream(client.indices()
- .getMapping(
- new GetMappingsRequest()
- .indices(indexName.getValue()),
- RequestOptions.DEFAULT)
- .mappings()
- .values()
- .iterator())
- .anyMatch(mappingMetaData -> !mappingMetaData.getSourceAsMap().isEmpty());
- }
-
- public static void createMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
- client.indices().putMapping(
- new PutMappingRequest(indexName.getValue())
- .source(mappingsSources),
- RequestOptions.DEFAULT);
- }
-
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ReadAliasName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ReadAliasName.java
deleted file mode 100644
index 8763664..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ReadAliasName.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-public class ReadAliasName extends AliasName {
- public ReadAliasName(String value) {
- super(value);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/UpdatedRepresentation.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/UpdatedRepresentation.java
deleted file mode 100644
index 1d919c9..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/UpdatedRepresentation.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.backends.es.v6;
-
-import java.util.Objects;
-
-import org.elasticsearch.common.Strings;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-
-public class UpdatedRepresentation {
- private final String id;
- private final String updatedDocumentPart;
-
- public UpdatedRepresentation(String id, String updatedDocumentPart) {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "Updated id must be specified " + id);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(updatedDocumentPart), "Updated document must be specified");
- this.id = id;
- this.updatedDocumentPart = updatedDocumentPart;
- }
-
- public String getId() {
- return id;
- }
-
- public String getUpdatedDocumentPart() {
- return updatedDocumentPart;
- }
-
- @Override
- public final boolean equals(Object o) {
- if (o instanceof UpdatedRepresentation) {
- UpdatedRepresentation other = (UpdatedRepresentation) o;
- return Objects.equals(id, other.id)
- && Objects.equals(updatedDocumentPart, other.updatedDocumentPart);
- }
- return false;
- }
-
- @Override
- public final int hashCode() {
- return Objects.hash(id, updatedDocumentPart);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("id", id)
- .add("updatedDocumentPart", updatedDocumentPart)
- .toString();
- }
-}
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/WriteAliasName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/WriteAliasName.java
deleted file mode 100644
index 0947ad7..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/WriteAliasName.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-public class WriteAliasName extends AliasName {
- public WriteAliasName(String value) {
- super(value);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
deleted file mode 100644
index 475343a..0000000
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6.search;
-
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
-import org.apache.james.backends.es.v6.ListenerToFuture;
-import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchScrollRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.unit.TimeValue;
-
-public class ScrollIterable implements Iterable<SearchResponse> {
- private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
-
- private final RestHighLevelClient client;
- private final SearchRequest searchRequest;
-
- public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) {
- this.client = client;
- this.searchRequest = searchRequest;
- }
-
- @Override
- public Iterator<SearchResponse> iterator() {
- return new ScrollIterator(client, searchRequest);
- }
-
- public Stream<SearchResponse> stream() {
- return Iterators.toStream(iterator());
- }
-
- public static class ScrollIterator implements Iterator<SearchResponse> {
- private final RestHighLevelClient client;
- private CompletableFuture<SearchResponse> searchResponseFuture;
-
- ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) {
- this.client = client;
- ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
- client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener);
-
- this.searchResponseFuture = listener.getFuture();
- }
-
- @Override
- public boolean hasNext() {
- SearchResponse join = searchResponseFuture.join();
- return !allSearchResponsesConsumed(join);
- }
-
- @Override
- public SearchResponse next() {
- SearchResponse result = searchResponseFuture.join();
- ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
- client.scrollAsync(
- new SearchScrollRequest()
- .scrollId(result.getScrollId())
- .scroll(TIMEOUT),
- RequestOptions.DEFAULT,
- listener);
- searchResponseFuture = listener.getFuture();
- return result;
- }
-
- private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
- return searchResponse.getHits().getHits().length == 0;
- }
- }
-
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplConnectionTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplConnectionTest.java
deleted file mode 100644
index 0a36d6a..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplConnectionTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.james.util.docker.DockerGenericContainer;
-import org.apache.james.util.docker.Images;
-import org.awaitility.Awaitility;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ClientProviderImplConnectionTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClientProviderImplConnectionTest.class);
- private static final int ES_APPLICATIVE_PORT = 9200;
-
- @ClassRule
- public static DockerGenericContainer es1 = new DockerGenericContainer(Images.ELASTICSEARCH_6)
- .withEnv("discovery.type", "single-node")
- .withAffinityToContainer()
- .withExposedPorts(ES_APPLICATIVE_PORT);
-
- @Rule
- public DockerGenericContainer es2 = new DockerGenericContainer(Images.ELASTICSEARCH_6)
- .withEnv("discovery.type", "single-node")
- .withAffinityToContainer()
- .withExposedPorts(ES_APPLICATIVE_PORT);
-
- @Test
- public void connectingASingleServerShouldWork() {
- Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
- .pollInterval(5, TimeUnit.SECONDS)
- .until(() -> isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), ES_APPLICATIVE_PORT, Optional.empty())));
- }
-
- @Test
- public void connectingAClusterShouldWork() {
- Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
- .pollInterval(5, TimeUnit.SECONDS)
- .until(() -> isConnected(
- ClientProviderImpl.fromHostsString(
- es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + ","
- + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT,
- Optional.empty())));
- }
-
- @Test
- public void connectingAClusterWithAFailedNodeShouldWork() {
- String es1Ip = es1.getContainerIp();
- String es2Ip = es2.getContainerIp();
- es2.stop();
-
- Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
- .pollInterval(5, TimeUnit.SECONDS)
- .until(() -> isConnected(
- ClientProviderImpl.fromHostsString(
- es1Ip + ":" + ES_APPLICATIVE_PORT + ","
- + es2Ip + ":" + ES_APPLICATIVE_PORT,
- Optional.empty())));
- }
-
- private boolean isConnected(ClientProvider clientProvider) {
- try (RestHighLevelClient client = clientProvider.get()) {
- client.search(
- new SearchRequest()
- .source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))),
- RequestOptions.DEFAULT);
- return true;
- } catch (Exception e) {
- LOGGER.info("Caught exception while trying to connect", e);
- return false;
- }
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplTest.java
deleted file mode 100644
index 8079425..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import java.util.Optional;
-
-import org.elasticsearch.common.settings.Settings;
-import org.junit.Test;
-
-public class ClientProviderImplTest {
-
- @Test
- public void fromHostsStringShouldThrowOnNullString() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(null, Optional.empty()))
- .isInstanceOf(NullPointerException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowOnEmptyString() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void forHostShouldThrowOnNullHost() {
- assertThatThrownBy(() -> ClientProviderImpl.forHost(null, 9200, Optional.empty()))
- .isInstanceOf(NullPointerException.class);
- }
-
- @Test
- public void forHostShouldThrowOnEmptyHost() {
- assertThatThrownBy(() -> ClientProviderImpl.forHost("", 9200, Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void forHostShouldThrowOnNegativePort() {
- assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", -1, Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void forHostShouldThrowOnZeroPort() {
- assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 0, Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void forHostShouldThrowOnTooBigPort() {
- assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 65536, Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void fromHostsStringShouldEmptyAddress() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(":9200", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowOnAbsentPort() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowWhenTooMuchParts() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:9200:9200", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowOnEmptyPort() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:", Optional.empty()))
- .isInstanceOf(NumberFormatException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowOnInvalidPort() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:invalid", Optional.empty()))
- .isInstanceOf(NumberFormatException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowOnNegativePort() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:-1", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowOnZeroPort() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:0", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowOnTooBigPort() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:65536", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void fromHostsStringShouldThrowIfOneHostIsInvalid() {
- assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:9200,localhost", Optional.empty()))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void settingsShouldBeEmptyWhenClusterNameIsEmpty() {
- ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.empty());
-
- assertThat(clientProvider.settings()).isEqualTo(Settings.EMPTY);
- }
-
- @Test
- public void settingsShouldContainClusterNameSettingWhenClusterNameIsGiven() {
- String clusterName = "myClusterName";
- ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.of(clusterName));
-
- assertThat(clientProvider.settings().get("cluster.name")).isEqualTo(clusterName);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearch.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearch.java
deleted file mode 100644
index 413e7ef..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearch.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Optional;
-
-import org.apache.http.HttpStatus;
-import org.apache.james.util.Host;
-import org.apache.james.util.docker.DockerGenericContainer;
-import org.apache.james.util.docker.Images;
-import org.apache.james.util.docker.RateLimiters;
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-import com.google.common.collect.ImmutableList;
-
-import feign.Feign;
-import feign.Logger;
-import feign.RequestLine;
-import feign.Response;
-import feign.slf4j.Slf4jLogger;
-
-public class DockerElasticSearch {
-
- interface ElasticSearchAPI {
-
- static ElasticSearchAPI from(Host esHttpHost) {
- return Feign.builder()
- .logger(new Slf4jLogger(ElasticSearchAPI.class))
- .logLevel(Logger.Level.FULL)
- .target(ElasticSearchAPI.class, "http://" + esHttpHost.getHostName() + ":" + esHttpHost.getPort());
- }
-
- @RequestLine("DELETE /_all")
- Response deleteAllIndexes();
-
- @RequestLine("POST /_flush?force&wait_if_ongoing=true")
- Response flush();
- }
-
- private static final int ES_HTTP_PORT = 9200;
-
- private final DockerGenericContainer eSContainer;
-
- public DockerElasticSearch() {
- this.eSContainer = new DockerGenericContainer(Images.ELASTICSEARCH_6)
- .withEnv("discovery.type", "single-node")
- .withAffinityToContainer()
- .withExposedPorts(ES_HTTP_PORT)
- .waitingFor(new HostPortWaitStrategy().withRateLimiter(RateLimiters.TWENTIES_PER_SECOND));
- }
-
- public void start() {
- if (!eSContainer.isRunning()) {
- eSContainer.start();
- }
- }
-
- public void stop() {
- eSContainer.stop();
- }
-
- public int getHttpPort() {
- return eSContainer.getMappedPort(ES_HTTP_PORT);
- }
-
- public String getIp() {
- return eSContainer.getHostIp();
- }
-
- public Host getHttpHost() {
- return Host.from(getIp(), getHttpPort());
- }
-
- public void pause() {
- eSContainer.pause();
- }
-
- public void unpause() {
- eSContainer.unpause();
- }
-
- public void cleanUpData() {
- assertThat(esAPI().deleteAllIndexes().status())
- .isEqualTo(HttpStatus.SC_OK);
- }
-
- public void awaitForElasticSearch() {
- assertThat(esAPI().flush().status())
- .isEqualTo(HttpStatus.SC_OK);
- }
-
- public ClientProvider clientProvider() {
- Optional<String> noClusterName = Optional.empty();
- return ClientProviderImpl.fromHosts(ImmutableList.of(getHttpHost()), noClusterName);
- }
-
- private ElasticSearchAPI esAPI() {
- return ElasticSearchAPI.from(getHttpHost());
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchRule.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchRule.java
deleted file mode 100644
index 08d891b..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import org.junit.rules.ExternalResource;
-
-public class DockerElasticSearchRule extends ExternalResource {
-
- private final DockerElasticSearch dockerElasticSearch = DockerElasticSearchSingleton.INSTANCE;
-
- @Override
- protected void before() throws Throwable {
- dockerElasticSearch.start();
- }
-
- @Override
- protected void after() {
- dockerElasticSearch.cleanUpData();
- }
-
- public ClientProvider clientProvider() {
- return dockerElasticSearch.clientProvider();
- }
-
- public void awaitForElasticSearch() {
- dockerElasticSearch.awaitForElasticSearch();
- }
-
- public DockerElasticSearch getDockerElasticSearch() {
- return dockerElasticSearch;
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchSingleton.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchSingleton.java
deleted file mode 100644
index 4fa6677..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchSingleton.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-public class DockerElasticSearchSingleton {
- public static DockerElasticSearch INSTANCE = new DockerElasticSearch();
-
- static {
- INSTANCE.start();
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchConfigurationTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchConfigurationTest.java
deleted file mode 100644
index 1ba2a1a..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchConfigurationTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import java.util.Optional;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.james.util.Host;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-import nl.jqno.equalsverifier.EqualsVerifier;
-
-public class ElasticSearchConfigurationTest {
-
- @Test
- public void elasticSearchConfigurationShouldRespectBeanContract() {
- EqualsVerifier.forClass(ElasticSearchConfiguration.class)
- .verify();
- }
-
- @Test
- public void getNbReplicaShouldReturnConfiguredValue() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- int value = 36;
- configuration.addProperty("elasticsearch.nb.replica", value);
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getNbReplica())
- .isEqualTo(value);
- }
-
- @Test
- public void getNbReplicaShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getNbReplica())
- .isEqualTo(ElasticSearchConfiguration.DEFAULT_NB_REPLICA);
- }
-
- @Test
- public void getNbShardsShouldReturnConfiguredValue() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- int value = 36;
- configuration.addProperty("elasticsearch.nb.shards", value);
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getNbShards())
- .isEqualTo(value);
- }
-
- @Test
- public void getNbShardsShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getNbShards())
- .isEqualTo(ElasticSearchConfiguration.DEFAULT_NB_SHARDS);
- }
-
- @Test
- public void getMaxRetriesShouldReturnConfiguredValue() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- int value = 36;
- configuration.addProperty("elasticsearch.retryConnection.maxRetries", value);
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getMaxRetries())
- .isEqualTo(value);
- }
-
- @Test
- public void getMaxRetriesShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getMaxRetries())
- .isEqualTo(ElasticSearchConfiguration.DEFAULT_CONNECTION_MAX_RETRIES);
- }
-
- @Test
- public void getMinDelayShouldReturnConfiguredValue() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- int value = 36;
- configuration.addProperty("elasticsearch.retryConnection.minDelay", value);
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getMinDelay())
- .isEqualTo(value);
- }
-
- @Test
- public void getMinDelayShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getMinDelay())
- .isEqualTo(ElasticSearchConfiguration.DEFAULT_CONNECTION_MIN_DELAY);
- }
-
- @Test
- public void getHostsShouldReturnConfiguredHostsWhenNoPort() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- String hostname = "myHost";
- configuration.addProperty("elasticsearch.hosts", hostname);
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getHosts())
- .containsOnly(Host.from(hostname, ElasticSearchConfiguration.DEFAULT_PORT));
- }
-
- @Test
- public void getHostsShouldReturnConfiguredHostsWhenListIsUsed() throws ConfigurationException {
- String hostname = "myHost";
- String hostname2 = "myOtherHost";
- int port = 2154;
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- configuration.addProperty("elasticsearch.hosts", hostname + "," + hostname2 + ":" + port);
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getHosts())
- .containsOnly(Host.from(hostname, ElasticSearchConfiguration.DEFAULT_PORT),
- Host.from(hostname2, port));
- }
-
- @Test
- public void getHostsShouldReturnConfiguredHosts() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- String hostname = "myHost";
- int port = 2154;
- configuration.addProperty("elasticsearch.hosts", hostname + ":" + port);
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getHosts())
- .containsOnly(Host.from(hostname, port));
- }
-
- @Test
- public void getHostsShouldReturnConfiguredMasterHost() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- String hostname = "myHost";
- configuration.addProperty("elasticsearch.masterHost", hostname);
- int port = 9300;
- configuration.addProperty("elasticsearch.port", port);
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getHosts())
- .containsOnly(Host.from(hostname, port));
- }
-
- @Test
- public void clusterNameShouldBeEmptyWhenNotGiven() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- String hostname = "myHost";
- configuration.addProperty("elasticsearch.masterHost", hostname);
- int port = 9300;
- configuration.addProperty("elasticsearch.port", port);
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getClusterName())
- .isEmpty();
- }
-
- @Test
- public void clusterNameShouldBeEmptyWhenNull() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- String hostname = "myHost";
- configuration.addProperty("elasticsearch.masterHost", hostname);
- int port = 9300;
- configuration.addProperty("elasticsearch.port", port);
- configuration.addProperty("elasticsearch.clusterName", null);
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getClusterName())
- .isEmpty();
- }
-
- @Test
- public void clusterNameShouldKeepTheValueWhenGiven() throws ConfigurationException {
- PropertiesConfiguration configuration = new PropertiesConfiguration();
- String hostname = "myHost";
- configuration.addProperty("elasticsearch.masterHost", hostname);
- int port = 9300;
- configuration.addProperty("elasticsearch.port", port);
- String clusterName = "myClusterName";
- configuration.addProperty("elasticsearch.clusterName", clusterName);
-
- ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
-
- assertThat(elasticSearchConfiguration.getClusterName())
- .contains(clusterName);
- }
-
- @Test
- public void validateHostsConfigurationOptionsShouldThrowWhenNoHostSpecify() {
- assertThatThrownBy(() ->
- ElasticSearchConfiguration.validateHostsConfigurationOptions(
- Optional.empty(),
- Optional.empty(),
- ImmutableList.of()))
- .isInstanceOf(ConfigurationException.class)
- .hasMessage("You should specify either (" + ElasticSearchConfiguration.ELASTICSEARCH_MASTER_HOST +
- " and " + ElasticSearchConfiguration.ELASTICSEARCH_PORT +
- ") or " + ElasticSearchConfiguration.ELASTICSEARCH_HOSTS);
- }
-
- @Test
- public void validateHostsConfigurationOptionsShouldThrowWhenMonoAndMultiHostSpecified() {
- assertThatThrownBy(() ->
- ElasticSearchConfiguration.validateHostsConfigurationOptions(
- Optional.of("localhost"),
- Optional.of(9200),
- ImmutableList.of("localhost:9200")))
- .isInstanceOf(ConfigurationException.class)
- .hasMessage("You should choose between mono host set up and " + ElasticSearchConfiguration.ELASTICSEARCH_HOSTS);
- }
-
- @Test
- public void validateHostsConfigurationOptionsShouldThrowWhenMonoHostWithoutPort() {
- assertThatThrownBy(() ->
- ElasticSearchConfiguration.validateHostsConfigurationOptions(
- Optional.of("localhost"),
- Optional.empty(),
- ImmutableList.of()))
- .isInstanceOf(ConfigurationException.class)
- .hasMessage(ElasticSearchConfiguration.ELASTICSEARCH_MASTER_HOST +
- " and " + ElasticSearchConfiguration.ELASTICSEARCH_PORT + " should be specified together");
- }
-
- @Test
- public void validateHostsConfigurationOptionsShouldThrowWhenMonoHostWithoutAddress() {
- assertThatThrownBy(() ->
- ElasticSearchConfiguration.validateHostsConfigurationOptions(
- Optional.empty(),
- Optional.of(9200),
- ImmutableList.of()))
- .isInstanceOf(ConfigurationException.class)
- .hasMessage(ElasticSearchConfiguration.ELASTICSEARCH_MASTER_HOST + " and " +
- ElasticSearchConfiguration.ELASTICSEARCH_PORT + " should be specified together");
- }
-
- @Test
- public void validateHostsConfigurationOptionsShouldAcceptMonoHostConfiguration() throws Exception {
- ElasticSearchConfiguration.validateHostsConfigurationOptions(
- Optional.of("localhost"),
- Optional.of(9200),
- ImmutableList.of());
- }
-
- @Test
- public void validateHostsConfigurationOptionsShouldAcceptMultiHostConfiguration() throws Exception {
- ElasticSearchConfiguration.validateHostsConfigurationOptions(
- Optional.empty(),
- Optional.empty(),
- ImmutableList.of("localhost:9200"));
- }
-
-
- @Test
- public void nbReplicaShouldThrowWhenNegative() {
- assertThatThrownBy(() ->
- ElasticSearchConfiguration.builder()
- .nbReplica(-1))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void nbShardsShouldThrowWhenNegative() {
- assertThatThrownBy(() ->
- ElasticSearchConfiguration.builder()
- .nbShards(-1))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void nbShardsShouldThrowWhenZero() {
- assertThatThrownBy(() ->
- ElasticSearchConfiguration.builder()
- .nbShards(0))
- .isInstanceOf(IllegalArgumentException.class);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
deleted file mode 100644
index 3ca61c1..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.awaitility.Awaitility.await;
-import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-
-import org.awaitility.Duration;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class ElasticSearchIndexerTest {
-
- private static final int MINIMUM_BATCH_SIZE = 1;
- private static final IndexName INDEX_NAME = new IndexName("index_name");
- private static final WriteAliasName ALIAS_NAME = new WriteAliasName("alias_name");
-
- @Rule
- public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
- private ElasticSearchIndexer testee;
-
- @Before
- public void setup() {
- new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
- .useIndex(INDEX_NAME)
- .addAlias(ALIAS_NAME)
- .createIndexAndAliases(getESClient());
- testee = new ElasticSearchIndexer(getESClient(), ALIAS_NAME, MINIMUM_BATCH_SIZE);
- }
-
- private RestHighLevelClient getESClient() {
- return elasticSearch.clientProvider().get();
- }
-
- @Test
- public void indexMessageShouldWork() throws Exception {
- String messageId = "1";
- String content = "{\"message\": \"trying out Elasticsearch\"}";
-
- testee.index(messageId, content);
- elasticSearch.awaitForElasticSearch();
-
- try (RestHighLevelClient client = getESClient()) {
- SearchResponse searchResponse = client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "trying"))),
- RequestOptions.DEFAULT);
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
- }
- }
-
- @Test
- public void indexMessageShouldThrowWhenJsonIsNull() {
- assertThatThrownBy(() -> testee.index("1", null))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void updateMessages() throws Exception {
- String messageId = "1";
- String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}";
-
- testee.index(messageId, content);
- elasticSearch.awaitForElasticSearch();
-
- testee.update(ImmutableList.of(new UpdatedRepresentation(messageId, "{\"message\": \"mastering out Elasticsearch\"}")));
- elasticSearch.awaitForElasticSearch();
-
- try (RestHighLevelClient client = getESClient()) {
- SearchResponse searchResponse = client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "mastering"))),
- RequestOptions.DEFAULT);
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
- }
-
- try (RestHighLevelClient client = getESClient()) {
- SearchResponse searchResponse = client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("field", "unchanged"))),
- RequestOptions.DEFAULT);
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
- }
- }
-
- @Test
- public void updateMessageShouldThrowWhenJsonIsNull() {
- assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation("1", null))))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void updateMessageShouldThrowWhenIdIsNull() {
- assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}"))))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void updateMessageShouldThrowWhenJsonIsEmpty() {
- assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation("1", ""))))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void updateMessageShouldThrowWhenIdIsEmpty() {
- assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation("", "{\"message\": \"mastering out Elasticsearch\"}"))))
- .isInstanceOf(IllegalArgumentException.class);
- }
-
- @Test
- public void deleteByQueryShouldWorkOnSingleMessage() throws Exception {
- String messageId = "1:2";
- String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
-
- testee.index(messageId, content);
- elasticSearch.awaitForElasticSearch();
-
- testee.deleteAllMatchingQuery(termQuery("property", "1"));
- elasticSearch.awaitForElasticSearch();
-
- try (RestHighLevelClient client = getESClient()) {
- await().atMost(Duration.TEN_SECONDS)
- .until(() -> client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT)
- .getHits().getTotalHits() == 0);
- }
- }
-
- @Test
- public void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception {
- String messageId = "1:1";
- String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
-
- testee.index(messageId, content);
-
- String messageId2 = "1:2";
- String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"property\":\"1\"}";
-
- testee.index(messageId2, content2);
-
- String messageId3 = "2:3";
- String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}";
-
- testee.index(messageId3, content3);
- elasticSearch.awaitForElasticSearch();
-
- testee.deleteAllMatchingQuery(termQuery("property", "1"));
- elasticSearch.awaitForElasticSearch();
-
- try (RestHighLevelClient client = getESClient()) {
- await().atMost(Duration.TEN_SECONDS)
- .until(() -> client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT)
- .getHits().getTotalHits() == 1);
- }
- }
-
- @Test
- public void deleteMessage() throws Exception {
- String messageId = "1:2";
- String content = "{\"message\": \"trying out Elasticsearch\"}";
-
- testee.index(messageId, content);
- elasticSearch.awaitForElasticSearch();
-
- testee.delete(ImmutableList.of(messageId));
- elasticSearch.awaitForElasticSearch();
-
- try (RestHighLevelClient client = getESClient()) {
- SearchResponse searchResponse = client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
- }
- }
-
- @Test
- public void deleteShouldWorkWhenMultipleMessages() throws Exception {
- String messageId = "1:1";
- String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}";
-
- testee.index(messageId, content);
-
- String messageId2 = "1:2";
- String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"mailboxId\":\"1\"}";
-
- testee.index(messageId2, content2);
-
- String messageId3 = "2:3";
- String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}";
-
- testee.index(messageId3, content3);
- elasticSearch.awaitForElasticSearch();
-
- testee.delete(ImmutableList.of(messageId, messageId3));
- elasticSearch.awaitForElasticSearch();
-
- try (RestHighLevelClient client = getESClient()) {
- SearchResponse searchResponse = client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
- }
- }
-
- @Test
- public void updateMessagesShouldNotThrowWhenEmptyList() throws Exception {
- testee.update(ImmutableList.of());
- }
-
- @Test
- public void deleteMessagesShouldNotThrowWhenEmptyList() throws Exception {
- testee.delete(ImmutableList.of());
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/IndexCreationFactoryTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/IndexCreationFactoryTest.java
deleted file mode 100644
index 49be066..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/IndexCreationFactoryTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class IndexCreationFactoryTest {
- private static final IndexName INDEX_NAME = new IndexName("index");
- private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
-
- @Rule
- public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
- private ClientProvider clientProvider;
-
- @Before
- public void setUp() {
- clientProvider = elasticSearch.clientProvider();
- new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
- .useIndex(INDEX_NAME)
- .addAlias(ALIAS_NAME)
- .createIndexAndAliases(clientProvider.get());
- }
-
- @Test
- public void createIndexAndAliasShouldNotThrowWhenCalledSeveralTime() {
- new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
- .useIndex(INDEX_NAME)
- .addAlias(ALIAS_NAME)
- .createIndexAndAliases(clientProvider.get());
- }
-
- @Test
- public void useIndexShouldThrowWhenNull() {
- assertThatThrownBy(() ->
- new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
- .useIndex(null))
- .isInstanceOf(NullPointerException.class);
- }
-
- @Test
- public void addAliasShouldThrowWhenNull() {
- assertThatThrownBy(() ->
- new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
- .useIndex(INDEX_NAME)
- .addAlias(null))
- .isInstanceOf(NullPointerException.class);
- }
-}
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/NodeMappingFactoryTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/NodeMappingFactoryTest.java
deleted file mode 100644
index 86fad09..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/NodeMappingFactoryTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6;
-
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class NodeMappingFactoryTest {
- private static final String MESSAGE = "message";
- private static final IndexName INDEX_NAME = new IndexName("index");
- private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
-
- @Rule
- public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
- private ClientProvider clientProvider;
-
- @Before
- public void setUp() throws Exception {
- clientProvider = elasticSearch.clientProvider();
- new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
- .useIndex(INDEX_NAME)
- .addAlias(ALIAS_NAME)
- .createIndexAndAliases(clientProvider.get());
- NodeMappingFactory.applyMapping(clientProvider.get(),
- INDEX_NAME,
- getMappingsSources());
- }
-
- @Test
- public void applyMappingShouldNotThrowWhenCalledSeveralTime() throws Exception {
- NodeMappingFactory.applyMapping(clientProvider.get(),
- INDEX_NAME,
- getMappingsSources());
- }
-
- @Test
- public void applyMappingShouldNotThrowWhenIncrementalChanges() throws Exception {
- NodeMappingFactory.applyMapping(clientProvider.get(),
- INDEX_NAME,
- getMappingsSources());
-
- elasticSearch.awaitForElasticSearch();
-
- assertThatCode(() ->NodeMappingFactory.applyMapping(clientProvider.get(),
- INDEX_NAME,
- getOtherMappingsSources()))
- .doesNotThrowAnyException();
- }
-
- private XContentBuilder getMappingsSources() throws Exception {
- return jsonBuilder()
- .startObject()
- .startObject(NodeMappingFactory.PROPERTIES)
- .startObject(MESSAGE)
- .field(NodeMappingFactory.TYPE, NodeMappingFactory.TEXT)
- .endObject()
- .endObject()
- .endObject();
- }
-
- private XContentBuilder getOtherMappingsSources() throws Exception {
- return jsonBuilder()
- .startObject()
- .startObject(NodeMappingFactory.PROPERTIES)
- .startObject(MESSAGE)
- .field(NodeMappingFactory.TYPE, NodeMappingFactory.TEXT)
- .field(NodeMappingFactory.INDEX, false)
- .endObject()
- .endObject()
- .endObject();
- }
-}
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
deleted file mode 100644
index 2d239aa..0000000
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es.v6.search;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Awaitility.await;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.james.backends.es.v6.ClientProvider;
-import org.apache.james.backends.es.v6.DockerElasticSearchRule;
-import org.apache.james.backends.es.v6.ElasticSearchConfiguration;
-import org.apache.james.backends.es.v6.IndexCreationFactory;
-import org.apache.james.backends.es.v6.IndexName;
-import org.apache.james.backends.es.v6.NodeMappingFactory;
-import org.apache.james.backends.es.v6.ReadAliasName;
-import org.awaitility.Duration;
-import org.awaitility.core.ConditionFactory;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-
-public class ScrollIterableTest {
-
- private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
- private static final int SIZE = 2;
- private static final String MESSAGE = "message";
- private static final IndexName INDEX_NAME = new IndexName("index");
- private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
-
- private static final ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
-
- @Rule
- public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
- private ClientProvider clientProvider;
-
- @Before
- public void setUp() {
- clientProvider = elasticSearch.clientProvider();
- new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
- .useIndex(INDEX_NAME)
- .addAlias(ALIAS_NAME)
- .createIndexAndAliases(clientProvider.get());
- elasticSearch.awaitForElasticSearch();
- }
-
- @Test
- public void scrollIterableShouldWorkWhenEmpty() throws Exception {
- try (RestHighLevelClient client = clientProvider.get()) {
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
- .scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
-
- assertThat(new ScrollIterable(client, searchRequest))
- .isEmpty();
- }
- }
-
- @Test
- public void scrollIterableShouldWorkWhenOneElement() throws Exception {
- try (RestHighLevelClient client = clientProvider.get()) {
- String id = "1";
- client.index(new IndexRequest(INDEX_NAME.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
-
- elasticSearch.awaitForElasticSearch();
- WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
-
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
- .scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
-
- assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
- .containsOnly(id);
- }
- }
-
- @Test
- public void scrollIterableShouldWorkWhenSizeElement() throws Exception {
- try (RestHighLevelClient client = clientProvider.get()) {
- String id1 = "1";
- client.index(new IndexRequest(INDEX_NAME.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id1)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
-
- String id2 = "2";
- client.index(new IndexRequest(INDEX_NAME.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id2)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
-
- elasticSearch.awaitForElasticSearch();
- WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
-
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
- .scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
-
- assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
- .containsOnly(id1, id2);
- }
- }
-
- @Test
- public void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception {
- try (RestHighLevelClient client = clientProvider.get()) {
- String id1 = "1";
- client.index(new IndexRequest(INDEX_NAME.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id1)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
-
- String id2 = "2";
- client.index(new IndexRequest(INDEX_NAME.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id2)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
-
- String id3 = "3";
- client.index(new IndexRequest(INDEX_NAME.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id3)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
-
- elasticSearch.awaitForElasticSearch();
- WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
-
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
- .scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
-
- assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
- .containsOnly(id1, id2, id3);
- }
- }
-
- private List<String> convertToIdList(ScrollIterable scrollIterable) {
- return scrollIterable.stream()
- .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
- .map(SearchHit::getId)
- .collect(Collectors.toList());
- }
-
- private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
- .scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery()));
-
- SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
- .getHits()
- .getHits();
-
- assertThat(hits)
- .extracting(SearchHit::getId)
- .contains(ids);
- }
-}
diff --git a/backends-common/elasticsearch-v6/src/test/resources/logback-test.xml b/backends-common/elasticsearch-v6/src/test/resources/logback-test.xml
deleted file mode 100644
index dd2d81e..0000000
--- a/backends-common/elasticsearch-v6/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,12 +0,0 @@
-<configuration>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%date %-5level [%thread] - [%logger]- %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT" />
- </root>
-</configuration>
\ No newline at end of file
diff --git a/backends-common/elasticsearch/pom.xml b/backends-common/elasticsearch/pom.xml
index 1df2811..d5f5847 100644
--- a/backends-common/elasticsearch/pom.xml
+++ b/backends-common/elasticsearch/pom.xml
@@ -17,7 +17,9 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.james</groupId>
@@ -64,6 +66,11 @@
<artifactId>feign-slf4j</artifactId>
</dependency>
<dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
@@ -73,16 +80,9 @@
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>2.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>2.2.1</version>
- <type>test-jar</type>
- <scope>test</scope>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>6.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -99,4 +99,4 @@
<scope>test</scope>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
index 81ed92f..2a3aba1 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
@@ -18,9 +18,9 @@
****************************************************************/
package org.apache.james.backends.es;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
public interface ClientProvider {
- Client get();
+ RestHighLevelClient get();
}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
index 8d92ae9..3ba32f0 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java
@@ -18,17 +18,14 @@
****************************************************************/
package org.apache.james.backends.es;
-import java.net.InetAddress;
import java.util.Optional;
+import org.apache.http.HttpHost;
import org.apache.james.util.Host;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.consumers.ConsumerChainer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -50,6 +47,7 @@ public class ClientProviderImpl implements ClientProvider {
}
private static final String CLUSTER_NAME_SETTING = "cluster.name";
+ private static final String HTTP_HOST_SCHEME = "http";
private final ImmutableList<Host> hosts;
private final Optional<String> clusterName;
@@ -60,19 +58,15 @@ public class ClientProviderImpl implements ClientProvider {
this.clusterName = clusterName;
}
+ private HttpHost[] hostsToHttpHosts() {
+ return hosts.stream()
+ .map(host -> new HttpHost(host.getHostName(), host.getPort(), HTTP_HOST_SCHEME))
+ .toArray(HttpHost[]::new);
+ }
@Override
- public Client get() {
- TransportClient transportClient = TransportClient.builder()
- .settings(settings())
- .build();
- ConsumerChainer<Host> consumer = Throwing.consumer(host -> transportClient
- .addTransportAddress(
- new InetSocketTransportAddress(
- InetAddress.getByName(host.getHostName()),
- host.getPort())));
- hosts.forEach(consumer.sneakyThrow());
- return transportClient;
+ public RestHighLevelClient get() {
+ return new RestHighLevelClient(RestClient.builder(hostsToHttpHosts()));
}
@VisibleForTesting Settings settings() {
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
deleted file mode 100644
index b465f96..0000000
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.james.backends.es.search.ScrollIterable;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.SearchHit;
-
-import com.google.common.annotations.VisibleForTesting;
-
-public class DeleteByQueryPerformer {
- public static final TimeValue TIMEOUT = new TimeValue(60000);
-
- private final Client client;
- private final ExecutorService executor;
- private final int batchSize;
- private final WriteAliasName aliasName;
- private final TypeName typeName;
-
- @VisibleForTesting
- public DeleteByQueryPerformer(Client client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
- this.client = client;
- this.executor = executor;
- this.batchSize = batchSize;
- this.aliasName = aliasName;
- this.typeName = typeName;
- }
-
- public Future<Void> perform(QueryBuilder queryBuilder) {
- return executor.submit(() -> doDeleteByQuery(queryBuilder));
- }
-
- protected Void doDeleteByQuery(QueryBuilder queryBuilder) {
- new ScrollIterable(client,
- client.prepareSearch(aliasName.getValue())
- .setTypes(typeName.getValue())
- .setScroll(TIMEOUT)
- .setNoFields()
- .setQuery(queryBuilder)
- .setSize(batchSize))
- .stream()
- .map(searchResponse -> deleteRetrievedIds(client, searchResponse))
- .forEach(ListenableActionFuture::actionGet);
- return null;
- }
-
- private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) {
- BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
- for (SearchHit hit : searchResponse.getHits()) {
- bulkRequestBuilder.add(client.prepareDelete()
- .setIndex(aliasName.getValue())
- .setType(typeName.getValue())
- .setId(hit.getId()));
- }
- return bulkRequestBuilder.execute();
- }
-
-}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java
index 84d115c..2f5c6bb 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchConfiguration.java
@@ -1,21 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
package org.apache.james.backends.es;
@@ -123,7 +123,7 @@ public class ElasticSearchConfiguration {
public static final int DEFAULT_CONNECTION_MIN_DELAY = 3000;
public static final int DEFAULT_NB_SHARDS = 5;
public static final int DEFAULT_NB_REPLICA = 1;
- public static final int DEFAULT_PORT = 9300;
+ public static final int DEFAULT_PORT = 9200;
private static final String LOCALHOST = "127.0.0.1";
public static final Optional<Integer> DEFAULT_PORT_AS_OPTIONAL = Optional.of(DEFAULT_PORT);
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
index b5caad1..0231e09 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
@@ -18,18 +18,24 @@
****************************************************************/
package org.apache.james.backends.es;
+import java.io.IOException;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.ValidationException;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,75 +45,79 @@ import com.google.common.base.Preconditions;
public class ElasticSearchIndexer {
private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
private static final int DEFAULT_BATCH_SIZE = 100;
+ private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
- private final Client client;
- private final DeleteByQueryPerformer deleteByQueryPerformer;
+ private final RestHighLevelClient client;
private final AliasName aliasName;
- private final TypeName typeName;
+ private final int batchSize;
- public ElasticSearchIndexer(Client client, ExecutorService executor,
- WriteAliasName aliasName,
- TypeName typeName) {
- this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
+ public ElasticSearchIndexer(RestHighLevelClient client,
+ WriteAliasName aliasName) {
+ this(client, aliasName, DEFAULT_BATCH_SIZE);
}
@VisibleForTesting
- public ElasticSearchIndexer(Client client, ExecutorService executor,
+ public ElasticSearchIndexer(RestHighLevelClient client,
WriteAliasName aliasName,
- TypeName typeName,
int batchSize) {
this.client = client;
- this.deleteByQueryPerformer = new DeleteByQueryPerformer(client, executor, batchSize, aliasName, typeName);
this.aliasName = aliasName;
- this.typeName = typeName;
+ this.batchSize = batchSize;
}
- public IndexResponse index(String id, String content) {
+ public IndexResponse index(String id, String content) throws IOException {
checkArgument(content);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Indexing {}: {}", id, StringUtils.left(content, DEBUG_MAX_LENGTH_CONTENT));
}
- return client.prepareIndex(aliasName.getValue(), typeName.getValue(), id)
- .setSource(content)
- .get();
+ return client.index(
+ new IndexRequest(aliasName.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id)
+ .source(content, XContentType.JSON),
+ RequestOptions.DEFAULT);
}
- public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) {
+ public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) throws IOException {
try {
Preconditions.checkNotNull(updatedDocumentParts);
- BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
- updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(
- client.prepareUpdate(
- aliasName.getValue(),
- typeName.getValue(),
+ BulkRequest request = new BulkRequest();
+ updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
+ new UpdateRequest(aliasName.getValue(),
+ NodeMappingFactory.DEFAULT_MAPPING_NAME,
updatedDocumentPart.getId())
- .setDoc(updatedDocumentPart.getUpdatedDocumentPart())));
- return Optional.of(bulkRequestBuilder.get());
+ .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)));
+ return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
} catch (ValidationException e) {
LOGGER.warn("Error while updating index", e);
return Optional.empty();
}
}
- public Optional<BulkResponse> delete(List<String> ids) {
+ public Optional<BulkResponse> delete(List<String> ids) throws IOException {
try {
- BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
- ids.forEach(id -> bulkRequestBuilder.add(
- client.prepareDelete(
- aliasName.getValue(),
- typeName.getValue(),
- id)));
- return Optional.of(bulkRequestBuilder.get());
+ BulkRequest request = new BulkRequest();
+ ids.forEach(id -> request.add(
+ new DeleteRequest(aliasName.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id)));
+ return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
} catch (ValidationException e) {
LOGGER.warn("Error while deleting index", e);
return Optional.empty();
}
}
- public Future<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder) {
- return deleteByQueryPerformer.perform(queryBuilder);
+ public void deleteAllMatchingQuery(QueryBuilder queryBuilder) {
+ DeleteByQueryRequest request = new DeleteByQueryRequest(aliasName.getValue())
+ .setDocTypes(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .setScroll(TIMEOUT)
+ .setQuery(queryBuilder)
+ .setBatchSize(batchSize);
+
+ client.deleteByQueryAsync(request, RequestOptions.DEFAULT, new ListenerToFuture<>());
}
private void checkArgument(String content) {
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
index 586c3bf..e0d10e0 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
@@ -19,140 +19,177 @@
package org.apache.james.backends.es;
+import static org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
-import java.util.ArrayList;
import javax.inject.Inject;
+import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
public class IndexCreationFactory {
- private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class);
- public static final String CASE_INSENSITIVE = "case_insensitive";
- public static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
- public static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
- public static final String ENGLISH_SNOWBALL = "english_snowball";
-
- private IndexName indexName;
- private ArrayList<AliasName> aliases;
- private int nbShards;
- private int nbReplica;
+ public static class AliasSpecificationStep {
+ private final int nbShards;
+ private final int nbReplica;
+ private final IndexName indexName;
+ private final ImmutableList.Builder<AliasName> aliases;
+
+ AliasSpecificationStep(int nbShards, int nbReplica, IndexName indexName) {
+ this.nbShards = nbShards;
+ this.nbReplica = nbReplica;
+ this.indexName = indexName;
+ this.aliases = ImmutableList.builder();
+ }
- @Inject
- public IndexCreationFactory(ElasticSearchConfiguration configuration) {
- indexName = null;
- aliases = new ArrayList<>();
- nbShards = configuration.getNbShards();
- nbReplica = configuration.getNbReplica();
- }
+ public AliasSpecificationStep addAlias(AliasName aliasName) {
+ Preconditions.checkNotNull(aliasName);
+ this.aliases.add(aliasName);
+ return this;
+ }
- public IndexCreationFactory useIndex(IndexName indexName) {
- Preconditions.checkNotNull(indexName);
- this.indexName = indexName;
- return this;
+ public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
+ return new IndexCreationPerformer(nbShards, nbReplica, indexName, aliases.build()).createIndexAndAliases(client);
+ }
}
- public IndexCreationFactory addAlias(AliasName aliasName) {
- Preconditions.checkNotNull(aliasName);
- this.aliases.add(aliasName);
- return this;
- }
+ static class IndexCreationPerformer {
+ private final int nbShards;
+ private final int nbReplica;
+ private final IndexName indexName;
+ private final ImmutableList<AliasName> aliases;
+
+ public IndexCreationPerformer(int nbShards, int nbReplica, IndexName indexName, ImmutableList<AliasName> aliases) {
+ this.nbShards = nbShards;
+ this.nbReplica = nbReplica;
+ this.indexName = indexName;
+ this.aliases = aliases;
+ }
- public Client createIndexAndAliases(Client client) {
- Preconditions.checkNotNull(indexName);
- try {
- createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica));
- aliases.forEach(alias -> createAliasIfNeeded(client, indexName, alias));
- } catch (IOException e) {
- LOGGER.error("Error while creating index : ", e);
+ public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
+ Preconditions.checkNotNull(indexName);
+ try {
+ createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica));
+ aliases.forEach(Throwing.consumer(alias -> createAliasIfNeeded(client, indexName, alias)));
+ } catch (IOException e) {
+ LOGGER.error("Error while creating index : ", e);
+ }
+ return client;
}
- return client;
- }
- private void createAliasIfNeeded(Client client, IndexName indexName, AliasName aliasName) {
- if (!aliasExist(client, aliasName)) {
- client.admin()
- .indices()
- .aliases(new IndicesAliasesRequest()
- .addAlias(aliasName.getValue(), indexName.getValue()))
- .actionGet();
+ private void createAliasIfNeeded(RestHighLevelClient client, IndexName indexName, AliasName aliasName) throws IOException {
+ if (!aliasExist(client, aliasName)) {
+ client.indices()
+ .updateAliases(
+ new IndicesAliasesRequest().addAliasAction(
+ new AliasActions(AliasActions.Type.ADD)
+ .index(indexName.getValue())
+ .alias(aliasName.getValue())),
+ RequestOptions.DEFAULT);
+ }
}
- }
- private boolean aliasExist(Client client, AliasName aliasName) {
- return client.admin()
- .indices()
- .aliasesExist(new GetAliasesRequest()
- .aliases(aliasName.getValue()))
- .actionGet()
- .exists();
- }
+ private boolean aliasExist(RestHighLevelClient client, AliasName aliasName) throws IOException {
+ return client.indices()
+ .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()),
+ RequestOptions.DEFAULT);
+ }
- private void createIndexIfNeeded(Client client, IndexName indexName, XContentBuilder settings) {
- try {
- client.admin()
- .indices()
- .prepareCreate(indexName.getValue())
- .setSettings(settings)
- .execute()
- .actionGet();
- } catch (IndexAlreadyExistsException exception) {
- LOGGER.info("Index [{}] already exist", indexName);
+ private void createIndexIfNeeded(RestHighLevelClient client, IndexName indexName, XContentBuilder settings) throws IOException {
+ try {
+ client.indices()
+ .create(
+ new CreateIndexRequest(indexName.getValue())
+ .source(settings),
+ RequestOptions.DEFAULT);
+ } catch (ElasticsearchStatusException exception) {
+ if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) {
+ LOGGER.info("Index [{}] already exist", indexName);
+ } else {
+ throw exception;
+ }
+ }
}
- }
- private XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException {
- return jsonBuilder()
- .startObject()
- .field("number_of_shards", nbShards)
- .field("number_of_replicas", nbReplica)
- .startObject("analysis")
- .startObject("analyzer")
- .startObject(CASE_INSENSITIVE)
- .field("tokenizer", "keyword")
- .startArray("filter")
- .value("lowercase")
- .endArray()
+ private XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException {
+ return jsonBuilder()
+ .startObject()
+ .startObject("settings")
+ .field("number_of_shards", nbShards)
+ .field("number_of_replicas", nbReplica)
+ .startObject("analysis")
+ .startObject("normalizer")
+ .startObject(CASE_INSENSITIVE)
+ .field("type", "custom")
+ .startArray("char_filter")
+ .endArray()
+ .startArray("filter")
+ .value("lowercase")
+ .value("asciifolding")
+ .endArray()
+ .endObject()
+ .endObject()
+ .startObject("analyzer")
+ .startObject(KEEP_MAIL_AND_URL)
+ .field("tokenizer", "uax_url_email")
+ .startArray("filter")
+ .value("lowercase")
+ .value("stop")
+ .endArray()
+ .endObject()
+ .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
+ .field("tokenizer", "uax_url_email")
+ .startArray("filter")
+ .value("lowercase")
+ .value("stop")
+ .value(ENGLISH_SNOWBALL)
+ .endArray()
+ .endObject()
+ .endObject()
+ .startObject("filter")
+ .startObject(ENGLISH_SNOWBALL)
+ .field("type", "snowball")
+ .field("language", "English")
+ .endObject()
+ .endObject()
.endObject()
.endObject()
- .startObject("analyzer")
- .startObject(KEEP_MAIL_AND_URL)
- .field("tokenizer", "uax_url_email")
- .startArray("filter")
- .value("lowercase")
- .value("stop")
- .endArray()
- .endObject()
- .endObject()
- .startObject("filter")
- .startObject(ENGLISH_SNOWBALL)
- .field("type", "snowball")
- .field("language", "English")
- .endObject()
- .endObject()
- .startObject("analyzer")
- .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
- .field("tokenizer", "uax_url_email")
- .startArray("filter")
- .value("lowercase")
- .value("stop")
- .value(ENGLISH_SNOWBALL)
- .endArray()
- .endObject()
- .endObject()
- .endObject()
- .endObject();
+ .endObject();
+ }
}
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class);
+ private static final String INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE = "type=resource_already_exists_exception";
+
+ private final int nbShards;
+ private final int nbReplica;
+
+ public static final String CASE_INSENSITIVE = "case_insensitive";
+ public static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
+ public static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
+ public static final String ENGLISH_SNOWBALL = "english_snowball";
+
+ @Inject
+ public IndexCreationFactory(ElasticSearchConfiguration configuration) {
+ this.nbShards = configuration.getNbShards();
+ this.nbReplica = configuration.getNbReplica();
+ }
+
+ public AliasSpecificationStep useIndex(IndexName indexName) {
+ Preconditions.checkNotNull(indexName);
+ return new AliasSpecificationStep(nbShards, nbReplica, indexName);
+ }
}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ListenerToFuture.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java
similarity index 97%
rename from backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ListenerToFuture.java
rename to backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java
index 11f751e..1df57b0 100644
--- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ListenerToFuture.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java
@@ -17,7 +17,7 @@
* under the License. *
****************************************************************/
-package org.apache.james.backends.es.v6;
+package org.apache.james.backends.es;
import java.util.concurrent.CompletableFuture;
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
index 36db27e..c7d0f82 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
@@ -19,12 +19,18 @@
package org.apache.james.backends.es;
+import java.io.IOException;
+
import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetMappingsRequest;
+import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
public class NodeMappingFactory {
+ public static final String DEFAULT_MAPPING_NAME = "_doc";
public static final String BOOLEAN = "boolean";
public static final String TYPE = "type";
public static final String LONG = "long";
@@ -32,6 +38,8 @@ public class NodeMappingFactory {
public static final String INDEX = "index";
public static final String NOT_ANALYZED = "not_analyzed";
public static final String STRING = "string";
+ public static final String TEXT = "text";
+ public static final String KEYWORD = "keyword";
public static final String PROPERTIES = "properties";
public static final String DATE = "date";
public static final String FORMAT = "format";
@@ -40,36 +48,35 @@ public class NodeMappingFactory {
public static final String RAW = "raw";
public static final String SPLIT_EMAIL = "splitEmail";
public static final String ANALYZER = "analyzer";
+ public static final String NORMALIZER = "normalizer";
public static final String SEARCH_ANALYZER = "search_analyzer";
public static final String SNOWBALL = "snowball";
public static final String IGNORE_ABOVE = "ignore_above";
- public static Client applyMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
- if (!mappingAlreadyExist(client, indexName, typeName)) {
- createMapping(client, indexName, typeName, mappingsSources);
+ public static RestHighLevelClient applyMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+ if (!mappingAlreadyExist(client, indexName)) {
+ createMapping(client, indexName, mappingsSources);
}
return client;
}
- public static boolean mappingAlreadyExist(Client client, IndexName indexName, TypeName typeName) {
- return Iterators.toStream(client.admin()
- .indices()
- .prepareGetMappings(indexName.getValue())
- .execute()
- .actionGet()
- .getMappings()
- .valuesIt())
- .anyMatch(mapping -> mapping.keys().contains(typeName.getValue()));
+ public static boolean mappingAlreadyExist(RestHighLevelClient client, IndexName indexName) throws IOException {
+ return Iterators.toStream(client.indices()
+ .getMapping(
+ new GetMappingsRequest()
+ .indices(indexName.getValue()),
+ RequestOptions.DEFAULT)
+ .mappings()
+ .values()
+ .iterator())
+ .anyMatch(mappingMetaData -> !mappingMetaData.getSourceAsMap().isEmpty());
}
- public static void createMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
- client.admin()
- .indices()
- .preparePutMapping(indexName.getValue())
- .setType(typeName.getValue())
- .setSource(mappingsSources)
- .execute()
- .actionGet();
+ public static void createMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+ client.indices().putMapping(
+ new PutMappingRequest(indexName.getValue())
+ .source(mappingsSources),
+ RequestOptions.DEFAULT);
}
}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/TypeName.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/TypeName.java
deleted file mode 100644
index e6c638b..0000000
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/TypeName.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es;
-
-public class TypeName {
- private final String value;
-
- public TypeName(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java
index a93d3f1..ad8ce22 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrollIterable.java
@@ -20,29 +20,32 @@
package org.apache.james.backends.es.search;
import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
+import org.apache.james.backends.es.ListenerToFuture;
import org.apache.james.util.streams.Iterators;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
public class ScrollIterable implements Iterable<SearchResponse> {
+ private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
- private static final TimeValue TIMEOUT = new TimeValue(60000);
- private final Client client;
- private final SearchRequestBuilder searchRequestBuilder;
+ private final RestHighLevelClient client;
+ private final SearchRequest searchRequest;
- public ScrollIterable(Client client, SearchRequestBuilder searchRequestBuilder) {
+ public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) {
this.client = client;
- this.searchRequestBuilder = searchRequestBuilder;
+ this.searchRequest = searchRequest;
}
@Override
public Iterator<SearchResponse> iterator() {
- return new ScrollIterator(client, searchRequestBuilder);
+ return new ScrollIterator(client, searchRequest);
}
public Stream<SearchResponse> stream() {
@@ -50,26 +53,34 @@ public class ScrollIterable implements Iterable<SearchResponse> {
}
public static class ScrollIterator implements Iterator<SearchResponse> {
+ private final RestHighLevelClient client;
+ private CompletableFuture<SearchResponse> searchResponseFuture;
- private final Client client;
- private ListenableActionFuture<SearchResponse> searchResponseFuture;
-
- public ScrollIterator(Client client, SearchRequestBuilder searchRequestBuilder) {
+ ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) {
this.client = client;
- this.searchResponseFuture = searchRequestBuilder.execute();
+ ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
+ client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener);
+
+ this.searchResponseFuture = listener.getFuture();
}
@Override
public boolean hasNext() {
- return !allSearchResponsesConsumed(searchResponseFuture.actionGet());
+ SearchResponse join = searchResponseFuture.join();
+ return !allSearchResponsesConsumed(join);
}
@Override
public SearchResponse next() {
- SearchResponse result = searchResponseFuture.actionGet();
- searchResponseFuture = client.prepareSearchScroll(result.getScrollId())
- .setScroll(TIMEOUT)
- .execute();
+ SearchResponse result = searchResponseFuture.join();
+ ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
+ client.scrollAsync(
+ new SearchScrollRequest()
+ .scrollId(result.getScrollId())
+ .scroll(TIMEOUT),
+ RequestOptions.DEFAULT,
+ listener);
+ searchResponseFuture = listener.getFuture();
return result;
}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/AliasNameTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/AliasNameTest.java
similarity index 97%
rename from backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/AliasNameTest.java
rename to backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/AliasNameTest.java
index 3dc2f0c..9acdd47 100644
--- a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/AliasNameTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/AliasNameTest.java
@@ -17,7 +17,7 @@
* under the License. *
****************************************************************/
-package org.apache.james.backends.es.v6;
+package org.apache.james.backends.es;
import org.junit.jupiter.api.Test;
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
index 87d6f52..5072968 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionTest.java
@@ -23,41 +23,45 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.james.util.docker.DockerGenericContainer;
+import org.apache.james.util.docker.Images;
import org.awaitility.Awaitility;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
-import org.junit.Ignore;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Ignore("JAMES-1952")
public class ClientProviderImplConnectionTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProviderImplConnectionTest.class);
- private static final String DOCKER_ES_IMAGE = "elasticsearch:2.2.1";
- private static final int ES_APPLICATIVE_PORT = 9300;
+ private static final int ES_APPLICATIVE_PORT = 9200;
- @Rule
- public DockerGenericContainer es1 = new DockerGenericContainer(DOCKER_ES_IMAGE)
+ @ClassRule
+ public static DockerGenericContainer es1 = new DockerGenericContainer(Images.ELASTICSEARCH_6)
+ .withEnv("discovery.type", "single-node")
.withAffinityToContainer()
.withExposedPorts(ES_APPLICATIVE_PORT);
@Rule
- public DockerGenericContainer es2 = new DockerGenericContainer(DOCKER_ES_IMAGE)
+ public DockerGenericContainer es2 = new DockerGenericContainer(Images.ELASTICSEARCH_6)
+ .withEnv("discovery.type", "single-node")
.withAffinityToContainer()
.withExposedPorts(ES_APPLICATIVE_PORT);
@Test
- public void connectingASingleServerShouldWork() throws Exception {
+ public void connectingASingleServerShouldWork() {
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollInterval(5, TimeUnit.SECONDS)
- .until(() -> isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), 9300, Optional.empty())));
+ .until(() -> isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), ES_APPLICATIVE_PORT, Optional.empty())));
}
@Test
- public void connectingAClusterShouldWork() throws Exception {
+ public void connectingAClusterShouldWork() {
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollInterval(5, TimeUnit.SECONDS)
@@ -69,7 +73,9 @@ public class ClientProviderImplConnectionTest {
}
@Test
- public void connectingAClusterWithAFailedNodeShouldWork() throws Exception {
+ public void connectingAClusterWithAFailedNodeShouldWork() {
+ String es1Ip = es1.getContainerIp();
+ String es2Ip = es2.getContainerIp();
es2.stop();
Awaitility.await()
@@ -77,16 +83,17 @@ public class ClientProviderImplConnectionTest {
.pollInterval(5, TimeUnit.SECONDS)
.until(() -> isConnected(
ClientProviderImpl.fromHostsString(
- es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + ","
- + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT,
+ es1Ip + ":" + ES_APPLICATIVE_PORT + ","
+ + es2Ip + ":" + ES_APPLICATIVE_PORT,
Optional.empty())));
}
private boolean isConnected(ClientProvider clientProvider) {
- try (Client client = clientProvider.get()) {
- client.prepareSearch()
- .setQuery(QueryBuilders.existsQuery("any"))
- .get();
+ try (RestHighLevelClient client = clientProvider.get()) {
+ client.search(
+ new SearchRequest()
+ .source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))),
+ RequestOptions.DEFAULT);
return true;
} catch (Exception e) {
LOGGER.info("Caught exception while trying to connect", e);
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java
index 0d93b6a..9c66865 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java
@@ -57,26 +57,17 @@ public class DockerElasticSearch {
}
private static final int ES_HTTP_PORT = 9200;
- private static final int ES_TCP_PORT = 9300;
private final DockerGenericContainer eSContainer;
public DockerElasticSearch() {
- this(Images.ELASTICSEARCH_2);
- }
-
- public DockerElasticSearch(String imageName) {
- this.eSContainer = new DockerGenericContainer(imageName)
- .withExposedPorts(ES_HTTP_PORT, ES_TCP_PORT)
+ this.eSContainer = new DockerGenericContainer(Images.ELASTICSEARCH_6)
+ .withEnv("discovery.type", "single-node")
+ .withAffinityToContainer()
+ .withExposedPorts(ES_HTTP_PORT)
.waitingFor(new HostPortWaitStrategy().withRateLimiter(RateLimiters.TWENTIES_PER_SECOND));
}
- public DockerElasticSearch withEnv(String key, String value) {
- this.eSContainer
- .withEnv(key, value);
- return this;
- }
-
public void start() {
if (!eSContainer.isRunning()) {
eSContainer.start();
@@ -91,18 +82,10 @@ public class DockerElasticSearch {
return eSContainer.getMappedPort(ES_HTTP_PORT);
}
- public int getTcpPort() {
- return eSContainer.getMappedPort(ES_TCP_PORT);
- }
-
public String getIp() {
return eSContainer.getHostIp();
}
- public Host getTcpHost() {
- return Host.from(getIp(), getTcpPort());
- }
-
public Host getHttpHost() {
return Host.from(getIp(), getHttpPort());
}
@@ -127,7 +110,7 @@ public class DockerElasticSearch {
public ClientProvider clientProvider() {
Optional<String> noClusterName = Optional.empty();
- return ClientProviderImpl.fromHosts(ImmutableList.of(getTcpHost()), noClusterName);
+ return ClientProviderImpl.fromHosts(ImmutableList.of(getHttpHost()), noClusterName);
}
private ElasticSearchAPI esAPI() {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java
index 7e140dc..094c2ba 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearchRule.java
@@ -19,7 +19,6 @@
package org.apache.james.backends.es;
-import org.apache.james.util.Host;
import org.junit.rules.ExternalResource;
public class DockerElasticSearchRule extends ExternalResource {
@@ -44,7 +43,7 @@ public class DockerElasticSearchRule extends ExternalResource {
dockerElasticSearch.awaitForElasticSearch();
}
- public Host getTcpHost() {
- return dockerElasticSearch.getTcpHost();
+ public DockerElasticSearch getDockerElasticSearch() {
+ return dockerElasticSearch;
}
}
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchConfigurationTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchConfigurationTest.java
index 2397f4d..c53f759 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchConfigurationTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchConfigurationTest.java
@@ -1,21 +1,21 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
package org.apache.james.backends.es;
@@ -31,9 +31,17 @@ import org.junit.Test;
import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+
public class ElasticSearchConfigurationTest {
@Test
+ public void elasticSearchConfigurationShouldRespectBeanContract() {
+ EqualsVerifier.forClass(ElasticSearchConfiguration.class)
+ .verify();
+ }
+
+ @Test
public void getNbReplicaShouldReturnConfiguredValue() throws ConfigurationException {
PropertiesConfiguration configuration = new PropertiesConfiguration();
int value = 36;
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index 4904ce6..f857f2c 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -21,14 +21,16 @@ package org.apache.james.backends.es;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Awaitility.await;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-import java.util.concurrent.Executors;
-
-import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.awaitility.Duration;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -40,7 +42,6 @@ public class ElasticSearchIndexerTest {
private static final int MINIMUM_BATCH_SIZE = 1;
private static final IndexName INDEX_NAME = new IndexName("index_name");
private static final WriteAliasName ALIAS_NAME = new WriteAliasName("alias_name");
- private static final TypeName TYPE_NAME = new TypeName("type_name");
@Rule
public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
@@ -52,28 +53,26 @@ public class ElasticSearchIndexerTest {
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
.createIndexAndAliases(getESClient());
- testee = new ElasticSearchIndexer(getESClient(),
- Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass())),
- ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
+ testee = new ElasticSearchIndexer(getESClient(), ALIAS_NAME, MINIMUM_BATCH_SIZE);
}
- private Client getESClient() {
+ private RestHighLevelClient getESClient() {
return elasticSearch.clientProvider().get();
}
@Test
- public void indexMessageShouldWork() {
+ public void indexMessageShouldWork() throws Exception {
String messageId = "1";
String content = "{\"message\": \"trying out Elasticsearch\"}";
testee.index(messageId, content);
elasticSearch.awaitForElasticSearch();
- try (Client client = getESClient()) {
- SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setQuery(QueryBuilders.matchQuery("message", "trying"))
- .get();
+ try (RestHighLevelClient client = getESClient()) {
+ SearchResponse searchResponse = client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "trying"))),
+ RequestOptions.DEFAULT);
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
}
}
@@ -85,7 +84,7 @@ public class ElasticSearchIndexerTest {
}
@Test
- public void updateMessages() {
+ public void updateMessages() throws Exception {
String messageId = "1";
String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}";
@@ -95,19 +94,19 @@ public class ElasticSearchIndexerTest {
testee.update(ImmutableList.of(new UpdatedRepresentation(messageId, "{\"message\": \"mastering out Elasticsearch\"}")));
elasticSearch.awaitForElasticSearch();
- try (Client client = getESClient()) {
- SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setQuery(QueryBuilders.matchQuery("message", "mastering"))
- .get();
+ try (RestHighLevelClient client = getESClient()) {
+ SearchResponse searchResponse = client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "mastering"))),
+ RequestOptions.DEFAULT);
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
}
- try (Client client = getESClient()) {
- SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setQuery(QueryBuilders.matchQuery("field", "unchanged"))
- .get();
+ try (RestHighLevelClient client = getESClient()) {
+ SearchResponse searchResponse = client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("field", "unchanged"))),
+ RequestOptions.DEFAULT);
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
}
}
@@ -144,15 +143,16 @@ public class ElasticSearchIndexerTest {
testee.index(messageId, content);
elasticSearch.awaitForElasticSearch();
- testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+ testee.deleteAllMatchingQuery(termQuery("property", "1"));
elasticSearch.awaitForElasticSearch();
- try (Client client = getESClient()) {
- SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setQuery(QueryBuilders.matchAllQuery())
- .get();
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+ try (RestHighLevelClient client = getESClient()) {
+ await().atMost(Duration.TEN_SECONDS)
+ .until(() -> client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+ RequestOptions.DEFAULT)
+ .getHits().getTotalHits() == 0);
}
}
@@ -174,20 +174,21 @@ public class ElasticSearchIndexerTest {
testee.index(messageId3, content3);
elasticSearch.awaitForElasticSearch();
- testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+ testee.deleteAllMatchingQuery(termQuery("property", "1"));
elasticSearch.awaitForElasticSearch();
- try (Client client = getESClient()) {
- SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setQuery(QueryBuilders.matchAllQuery())
- .get();
- assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ try (RestHighLevelClient client = getESClient()) {
+ await().atMost(Duration.TEN_SECONDS)
+ .until(() -> client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+ RequestOptions.DEFAULT)
+ .getHits().getTotalHits() == 1);
}
}
@Test
- public void deleteMessage() {
+ public void deleteMessage() throws Exception {
String messageId = "1:2";
String content = "{\"message\": \"trying out Elasticsearch\"}";
@@ -197,17 +198,17 @@ public class ElasticSearchIndexerTest {
testee.delete(ImmutableList.of(messageId));
elasticSearch.awaitForElasticSearch();
- try (Client client = getESClient()) {
- SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setQuery(QueryBuilders.matchAllQuery())
- .get();
+ try (RestHighLevelClient client = getESClient()) {
+ SearchResponse searchResponse = client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+ RequestOptions.DEFAULT);
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
}
}
@Test
- public void deleteShouldWorkWhenMultipleMessages() {
+ public void deleteShouldWorkWhenMultipleMessages() throws Exception {
String messageId = "1:1";
String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}";
@@ -227,22 +228,22 @@ public class ElasticSearchIndexerTest {
testee.delete(ImmutableList.of(messageId, messageId3));
elasticSearch.awaitForElasticSearch();
- try (Client client = getESClient()) {
- SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setQuery(QueryBuilders.matchAllQuery())
- .get();
+ try (RestHighLevelClient client = getESClient()) {
+ SearchResponse searchResponse = client.search(
+ new SearchRequest(INDEX_NAME.getValue())
+ .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
+ RequestOptions.DEFAULT);
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
}
}
@Test
- public void updateMessagesShouldNotThrowWhenEmptyList() {
+ public void updateMessagesShouldNotThrowWhenEmptyList() throws Exception {
testee.update(ImmutableList.of());
}
@Test
- public void deleteMessagesShouldNotThrowWhenEmptyList() {
+ public void deleteMessagesShouldNotThrowWhenEmptyList() throws Exception {
testee.delete(ImmutableList.of());
}
}
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
index fe4421e..50babf5 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
@@ -26,8 +26,8 @@ import org.junit.Rule;
import org.junit.Test;
public class IndexCreationFactoryTest {
- public static final IndexName INDEX_NAME = new IndexName("index");
- public static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
+ private static final IndexName INDEX_NAME = new IndexName("index");
+ private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
@Rule
public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
@@ -62,6 +62,7 @@ public class IndexCreationFactoryTest {
public void addAliasShouldThrowWhenNull() {
assertThatThrownBy(() ->
new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(INDEX_NAME)
.addAlias(null))
.isInstanceOf(NullPointerException.class);
}
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
index 228b4ed..6ffa45b 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
@@ -19,6 +19,7 @@
package org.apache.james.backends.es;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -27,10 +28,9 @@ import org.junit.Rule;
import org.junit.Test;
public class NodeMappingFactoryTest {
- public static final String MESSAGE = "message";
- public static final IndexName INDEX_NAME = new IndexName("index");
- public static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
- public static final TypeName TYPE_NAME = new TypeName("type");
+ private static final String MESSAGE = "message";
+ private static final IndexName INDEX_NAME = new IndexName("index");
+ private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
@Rule
public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
@@ -45,7 +45,6 @@ public class NodeMappingFactoryTest {
.createIndexAndAliases(clientProvider.get());
NodeMappingFactory.applyMapping(clientProvider.get(),
INDEX_NAME,
- TYPE_NAME,
getMappingsSources());
}
@@ -53,33 +52,29 @@ public class NodeMappingFactoryTest {
public void applyMappingShouldNotThrowWhenCalledSeveralTime() throws Exception {
NodeMappingFactory.applyMapping(clientProvider.get(),
INDEX_NAME,
- TYPE_NAME,
getMappingsSources());
}
@Test
- public void applyMappingShouldNotThrowWhenIndexerChanges() throws Exception {
+ public void applyMappingShouldNotThrowWhenIncrementalChanges() throws Exception {
NodeMappingFactory.applyMapping(clientProvider.get(),
INDEX_NAME,
- TYPE_NAME,
getMappingsSources());
elasticSearch.awaitForElasticSearch();
- NodeMappingFactory.applyMapping(clientProvider.get(),
+ assertThatCode(() ->NodeMappingFactory.applyMapping(clientProvider.get(),
INDEX_NAME,
- TYPE_NAME,
- getOtherMappingsSources());
+ getOtherMappingsSources()))
+ .doesNotThrowAnyException();
}
private XContentBuilder getMappingsSources() throws Exception {
return jsonBuilder()
.startObject()
- .startObject(TYPE_NAME.getValue())
- .startObject(NodeMappingFactory.PROPERTIES)
- .startObject(MESSAGE)
- .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING)
- .endObject()
+ .startObject(NodeMappingFactory.PROPERTIES)
+ .startObject(MESSAGE)
+ .field(NodeMappingFactory.TYPE, NodeMappingFactory.TEXT)
.endObject()
.endObject()
.endObject();
@@ -88,12 +83,10 @@ public class NodeMappingFactoryTest {
private XContentBuilder getOtherMappingsSources() throws Exception {
return jsonBuilder()
.startObject()
- .startObject(TYPE_NAME.getValue())
- .startObject(NodeMappingFactory.PROPERTIES)
- .startObject(MESSAGE)
- .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING)
- .field(NodeMappingFactory.INDEX, NodeMappingFactory.NOT_ANALYZED)
- .endObject()
+ .startObject(NodeMappingFactory.PROPERTIES)
+ .startObject(MESSAGE)
+ .field(NodeMappingFactory.TYPE, NodeMappingFactory.TEXT)
+ .field(NodeMappingFactory.INDEX, false)
.endObject()
.endObject()
.endObject();
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java
index 392b3e6..a6507a8 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrollIterableTest.java
@@ -21,8 +21,6 @@ package org.apache.james.backends.es.search;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import java.io.IOException;
import java.util.Arrays;
@@ -36,27 +34,27 @@ import org.apache.james.backends.es.IndexCreationFactory;
import org.apache.james.backends.es.IndexName;
import org.apache.james.backends.es.NodeMappingFactory;
import org.apache.james.backends.es.ReadAliasName;
-import org.apache.james.backends.es.TypeName;
import org.awaitility.Duration;
import org.awaitility.core.ConditionFactory;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class ScrollIterableTest {
- private static final TimeValue TIMEOUT = new TimeValue(6000);
+ private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
private static final int SIZE = 2;
private static final String MESSAGE = "message";
private static final IndexName INDEX_NAME = new IndexName("index");
private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
- private static final TypeName TYPE_NAME = new TypeName("messages");
private static final ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
@@ -65,120 +63,118 @@ public class ScrollIterableTest {
private ClientProvider clientProvider;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
clientProvider = elasticSearch.clientProvider();
new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
.createIndexAndAliases(clientProvider.get());
elasticSearch.awaitForElasticSearch();
- NodeMappingFactory.applyMapping(clientProvider.get(), INDEX_NAME, TYPE_NAME, getMappingsSources());
- }
-
- private XContentBuilder getMappingsSources() throws IOException {
- return jsonBuilder()
- .startObject()
- .startObject(TYPE_NAME.getValue())
- .startObject(NodeMappingFactory.PROPERTIES)
- .startObject(MESSAGE)
- .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING)
- .endObject()
- .endObject()
- .endObject()
- .endObject();
}
@Test
- public void scrollIterableShouldWorkWhenEmpty() {
- try (Client client = clientProvider.get()) {
- SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setScroll(TIMEOUT)
- .setQuery(matchAllQuery())
- .setSize(SIZE);
-
- assertThat(new ScrollIterable(client, searchRequestBuilder))
+ public void scrollIterableShouldWorkWhenEmpty() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
+
+ assertThat(new ScrollIterable(client, searchRequest))
.isEmpty();
}
}
@Test
- public void scrollIterableShouldWorkWhenOneElement() {
- try (Client client = clientProvider.get()) {
+ public void scrollIterableShouldWorkWhenOneElement() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
String id = "1";
- client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id)
- .setSource(MESSAGE, "Sample message")
- .execute();
+ client.index(new IndexRequest(INDEX_NAME.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
elasticSearch.awaitForElasticSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
- SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setScroll(TIMEOUT)
- .setQuery(matchAllQuery())
- .setSize(SIZE);
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
- assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder)))
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
.containsOnly(id);
}
}
@Test
- public void scrollIterableShouldWorkWhenSizeElement() {
- try (Client client = clientProvider.get()) {
+ public void scrollIterableShouldWorkWhenSizeElement() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
String id1 = "1";
- client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
- .setSource(MESSAGE, "Sample message")
- .execute();
+ client.index(new IndexRequest(INDEX_NAME.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id1)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
String id2 = "2";
- client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
- .setSource(MESSAGE, "Sample message")
- .execute();
+ client.index(new IndexRequest(INDEX_NAME.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id2)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
elasticSearch.awaitForElasticSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
- SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setScroll(TIMEOUT)
- .setQuery(matchAllQuery())
- .setSize(SIZE);
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
- assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder)))
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
.containsOnly(id1, id2);
}
}
@Test
- public void scrollIterableShouldWorkWhenMoreThanSizeElement() {
- try (Client client = clientProvider.get()) {
+ public void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception {
+ try (RestHighLevelClient client = clientProvider.get()) {
String id1 = "1";
- client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
- .setSource(MESSAGE, "Sample message")
- .execute();
+ client.index(new IndexRequest(INDEX_NAME.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id1)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
String id2 = "2";
- client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
- .setSource(MESSAGE, "Sample message")
- .execute();
+ client.index(new IndexRequest(INDEX_NAME.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id2)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
String id3 = "3";
- client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id3)
- .setSource(MESSAGE, "Sample message")
- .execute();
+ client.index(new IndexRequest(INDEX_NAME.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id3)
+ .source(MESSAGE, "Sample message"),
+ RequestOptions.DEFAULT);
elasticSearch.awaitForElasticSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
- SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
- .setTypes(TYPE_NAME.getValue())
- .setScroll(TIMEOUT)
- .setQuery(matchAllQuery())
- .setSize(SIZE);
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery())
+ .size(SIZE));
- assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder)))
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequest)))
.containsOnly(id1, id2, id3);
}
}
@@ -190,13 +186,15 @@ public class ScrollIterableTest {
.collect(Collectors.toList());
}
- private void hasIdsInIndex(Client client, String... ids) {
- SearchHit[] hits = client.prepareSearch(INDEX_NAME.getValue())
- .setQuery(QueryBuilders.idsQuery(TYPE_NAME.getValue()).addIds(ids))
- .execute()
- .actionGet()
+ private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
+ SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ .scroll(TIMEOUT)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery()));
+
+ SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
.getHits()
- .hits();
+ .getHits();
assertThat(hits)
.extracting(SearchHit::getId)
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/utils/TestingClientProvider.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/utils/TestingClientProvider.java
deleted file mode 100644
index 2de4fe5..0000000
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/utils/TestingClientProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.backends.es.utils;
-
-import org.apache.james.backends.es.ClientProvider;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.node.Node;
-
-public class TestingClientProvider implements ClientProvider {
-
- private final Node node;
-
- public TestingClientProvider(Node node) {
- this.node = node;
- }
-
- @Override
- public Client get() {
- return node.client();
- }
-}
diff --git a/backends-common/pom.xml b/backends-common/pom.xml
index f6a96ff..5cab887 100644
--- a/backends-common/pom.xml
+++ b/backends-common/pom.xml
@@ -38,7 +38,6 @@
<module>elasticsearch</module>
<module>jpa</module>
<module>rabbitmq</module>
- <module>elasticsearch-v6</module>
</modules>
</project>
diff --git a/pom.xml b/pom.xml
index 933c8f8..7100134 100644
--- a/pom.xml
+++ b/pom.xml
@@ -698,17 +698,6 @@
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
- <artifactId>apache-james-backends-es-v6</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
- <artifactId>apache-james-backends-es-v6</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-jpa</artifactId>
<version>${project.version}</version>
</dependency>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org