You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/05/16 08:48:28 UTC
[james-project] 13/23: JAMES-2719 Copy of backends-es module to a
new backends-es-v6 module
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 89d880ca5438274144782fda3d6e52038a7266d7
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Mon May 13 17:59:31 2019 +0700
JAMES-2719 Copy of backends-es module to a new backends-es-v6 module
---
backends-common/elasticsearch-v6/pom.xml | 104 +++++++
.../org/apache/james/backends/es/v6/AliasName.java | 49 ++++
.../james/backends/es/v6/ClientProvider.java | 26 ++
.../james/backends/es/v6/ClientProviderImpl.java | 86 ++++++
.../backends/es/v6/DeleteByQueryPerformer.java | 84 ++++++
.../backends/es/v6/ElasticSearchConfiguration.java | 240 ++++++++++++++++
.../james/backends/es/v6/ElasticSearchIndexer.java | 116 ++++++++
.../james/backends/es/v6/IndexCreationFactory.java | 158 ++++++++++
.../org/apache/james/backends/es/v6/IndexName.java | 49 ++++
.../james/backends/es/v6/NodeMappingFactory.java | 75 +++++
.../apache/james/backends/es/v6/ReadAliasName.java | 26 ++
.../org/apache/james/backends/es/v6/TypeName.java | 32 +++
.../backends/es/v6/UpdatedRepresentation.java | 69 +++++
.../james/backends/es/v6/WriteAliasName.java | 26 ++
.../backends/es/v6/search/ScrollIterable.java | 81 ++++++
.../es/v6/ClientProviderImplConnectionTest.java | 96 +++++++
.../backends/es/v6/ClientProviderImplTest.java | 142 +++++++++
.../james/backends/es/v6/DockerElasticSearch.java | 126 ++++++++
.../backends/es/v6/DockerElasticSearchRule.java | 50 ++++
.../es/v6/DockerElasticSearchSingleton.java | 28 ++
.../es/v6/ElasticSearchConfigurationTest.java | 319 +++++++++++++++++++++
.../backends/es/v6/ElasticSearchIndexerTest.java | 248 ++++++++++++++++
.../backends/es/v6/IndexCreationFactoryTest.java | 68 +++++
.../backends/es/v6/NodeMappingFactoryTest.java | 101 +++++++
.../backends/es/v6/search/ScrollIterableTest.java | 205 +++++++++++++
.../es/v6/utils/TestingClientProvider.java | 37 +++
.../src/test/resources/logback-test.xml | 12 +
backends-common/pom.xml | 1 +
28 files changed, 2654 insertions(+)
diff --git a/backends-common/elasticsearch-v6/pom.xml b/backends-common/elasticsearch-v6/pom.xml
new file mode 100644
index 0000000..cc661e3
--- /dev/null
+++ b/backends-common/elasticsearch-v6/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.james</groupId>
+ <artifactId>james-backends-common</artifactId>
+ <version>3.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>apache-james-backends-es-v6</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>james-server-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>throwing-lambdas</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.github.openfeign</groupId>
+ <artifactId>feign-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.github.openfeign</groupId>
+ <artifactId>feign-slf4j</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>2.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>2.2.1</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/AliasName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/AliasName.java
new file mode 100644
index 0000000..754ac6e
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/AliasName.java
@@ -0,0 +1,49 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import java.util.Objects;
+
+public class AliasName {
+ private final String value;
+
+ protected AliasName(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof AliasName) {
+ AliasName aliasName = (AliasName) o;
+
+ return Objects.equals(this.value, aliasName.value);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(value);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
new file mode 100644
index 0000000..0145d0a
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProvider.java
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.backends.es.v6;
+
+import org.elasticsearch.client.Client;
+
+public interface ClientProvider {
+
+ Client get();
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
new file mode 100644
index 0000000..aac59a5
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ClientProviderImpl.java
@@ -0,0 +1,86 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.backends.es.v6;
+
+import java.net.InetAddress;
+import java.util.Optional;
+
+import org.apache.james.util.Host;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.consumers.ConsumerChainer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class ClientProviderImpl implements ClientProvider {
+
+ public static ClientProviderImpl forHost(String address, Integer port, Optional<String> clusterName) {
+ return new ClientProviderImpl(ImmutableList.of(Host.from(address, port)), clusterName);
+ }
+
+ public static ClientProviderImpl fromHostsString(String hostsString, Optional<String> clusterName) {
+ Preconditions.checkNotNull(hostsString, "HostString should not be null");
+ return new ClientProviderImpl(Host.parseHosts(hostsString), clusterName);
+ }
+
+ public static ClientProviderImpl fromHosts(ImmutableList<Host> hosts, Optional<String> clusterName) {
+ Preconditions.checkNotNull(hosts, "Hosts should not be null");
+ return new ClientProviderImpl(hosts, clusterName);
+ }
+
+ private static final String CLUSTER_NAME_SETTING = "cluster.name";
+
+ private final ImmutableList<Host> hosts;
+ private final Optional<String> clusterName;
+
+ private ClientProviderImpl(ImmutableList<Host> hosts, Optional<String> clusterName) {
+ Preconditions.checkArgument(!hosts.isEmpty(), "You should provide at least one host");
+ this.hosts = hosts;
+ this.clusterName = clusterName;
+ }
+
+
+ @Override
+ public Client get() {
+ TransportClient transportClient = TransportClient.builder()
+ .settings(settings())
+ .build();
+ ConsumerChainer<Host> consumer = Throwing.consumer(host -> transportClient
+ .addTransportAddress(
+ new InetSocketTransportAddress(
+ InetAddress.getByName(host.getHostName()),
+ host.getPort())));
+ hosts.forEach(consumer.sneakyThrow());
+ return transportClient;
+ }
+
+ @VisibleForTesting Settings settings() {
+ if (clusterName.isPresent()) {
+ return Settings.builder()
+ .put(CLUSTER_NAME_SETTING, clusterName.get())
+ .build();
+ }
+ return Settings.EMPTY;
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
new file mode 100644
index 0000000..05fd04e
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/DeleteByQueryPerformer.java
@@ -0,0 +1,84 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.james.backends.es.v6.search.ScrollIterable;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class DeleteByQueryPerformer {
+ public static final TimeValue TIMEOUT = new TimeValue(60000);
+
+ private final Client client;
+ private final ExecutorService executor;
+ private final int batchSize;
+ private final WriteAliasName aliasName;
+ private final TypeName typeName;
+
+ @VisibleForTesting
+ public DeleteByQueryPerformer(Client client, ExecutorService executor, int batchSize, WriteAliasName aliasName, TypeName typeName) {
+ this.client = client;
+ this.executor = executor;
+ this.batchSize = batchSize;
+ this.aliasName = aliasName;
+ this.typeName = typeName;
+ }
+
+ public Future<Void> perform(QueryBuilder queryBuilder) {
+ return executor.submit(() -> doDeleteByQuery(queryBuilder));
+ }
+
+ protected Void doDeleteByQuery(QueryBuilder queryBuilder) {
+ new ScrollIterable(client,
+ client.prepareSearch(aliasName.getValue())
+ .setTypes(typeName.getValue())
+ .setScroll(TIMEOUT)
+ .setNoFields()
+ .setQuery(queryBuilder)
+ .setSize(batchSize))
+ .stream()
+ .map(searchResponse -> deleteRetrievedIds(client, searchResponse))
+ .forEach(ListenableActionFuture::actionGet);
+ return null;
+ }
+
+ private ListenableActionFuture<BulkResponse> deleteRetrievedIds(Client client, SearchResponse searchResponse) {
+ BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+ for (SearchHit hit : searchResponse.getHits()) {
+ bulkRequestBuilder.add(client.prepareDelete()
+ .setIndex(aliasName.getValue())
+ .setType(typeName.getValue())
+ .setId(hit.getId()));
+ }
+ return bulkRequestBuilder.execute();
+ }
+
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
new file mode 100644
index 0000000..f490941
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchConfiguration.java
@@ -0,0 +1,240 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.james.util.Host;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+public class ElasticSearchConfiguration {
+
+
+ public static class Builder {
+
+ private final ImmutableList.Builder<Host> hosts;
+ private Optional<String> clusterName;
+ private Optional<Integer> nbShards;
+ private Optional<Integer> nbReplica;
+ private Optional<Integer> minDelay;
+ private Optional<Integer> maxRetries;
+
+ public Builder() {
+ hosts = ImmutableList.builder();
+ clusterName = Optional.empty();
+ nbShards = Optional.empty();
+ nbReplica = Optional.empty();
+ minDelay = Optional.empty();
+ maxRetries = Optional.empty();
+ }
+
+ public Builder addHost(Host host) {
+ this.hosts.add(host);
+ return this;
+ }
+
+ public Builder clusterName(String clusterName) {
+ this.clusterName = Optional.ofNullable(clusterName);
+ return this;
+ }
+
+ public Builder addHosts(Collection<Host> hosts) {
+ this.hosts.addAll(hosts);
+ return this;
+ }
+
+ public Builder nbShards(int nbShards) {
+ Preconditions.checkArgument(nbShards > 0, "You need the number of shards to be strictly positive");
+ this.nbShards = Optional.of(nbShards);
+ return this;
+ }
+
+ public Builder nbReplica(int nbReplica) {
+ Preconditions.checkArgument(nbReplica >= 0, "You need the number of replica to be positive");
+ this.nbReplica = Optional.of(nbReplica);
+ return this;
+ }
+
+ public Builder minDelay(Optional<Integer> minDelay) {
+ this.minDelay = minDelay;
+ return this;
+ }
+
+ public Builder maxRetries(Optional<Integer> maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ public ElasticSearchConfiguration build() {
+ ImmutableList<Host> hosts = this.hosts.build();
+ Preconditions.checkState(!hosts.isEmpty(), "You need to specify ElasticSearch host");
+ return new ElasticSearchConfiguration(
+ hosts,
+ clusterName,
+ nbShards.orElse(DEFAULT_NB_SHARDS),
+ nbReplica.orElse(DEFAULT_NB_REPLICA),
+ minDelay.orElse(DEFAULT_CONNECTION_MIN_DELAY),
+ maxRetries.orElse(DEFAULT_CONNECTION_MAX_RETRIES));
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final String ELASTICSEARCH_HOSTS = "elasticsearch.hosts";
+ public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.clusterName";
+ public static final String ELASTICSEARCH_MASTER_HOST = "elasticsearch.masterHost";
+ public static final String ELASTICSEARCH_PORT = "elasticsearch.port";
+ public static final String ELASTICSEARCH_NB_REPLICA = "elasticsearch.nb.replica";
+ public static final String ELASTICSEARCH_NB_SHARDS = "elasticsearch.nb.shards";
+ public static final String ELASTICSEARCH_RETRY_CONNECTION_MIN_DELAY = "elasticsearch.retryConnection.minDelay";
+ public static final String ELASTICSEARCH_RETRY_CONNECTION_MAX_RETRIES = "elasticsearch.retryConnection.maxRetries";
+
+ public static final int DEFAULT_CONNECTION_MAX_RETRIES = 7;
+ public static final int DEFAULT_CONNECTION_MIN_DELAY = 3000;
+ public static final int DEFAULT_NB_SHARDS = 5;
+ public static final int DEFAULT_NB_REPLICA = 1;
+ public static final int DEFAULT_PORT = 9300;
+ private static final String LOCALHOST = "127.0.0.1";
+ public static final Optional<Integer> DEFAULT_PORT_AS_OPTIONAL = Optional.of(DEFAULT_PORT);
+
+ public static final ElasticSearchConfiguration DEFAULT_CONFIGURATION = builder()
+ .addHost(Host.from(LOCALHOST, DEFAULT_PORT))
+ .build();
+
+ public static ElasticSearchConfiguration fromProperties(Configuration configuration) throws ConfigurationException {
+ return builder()
+ .addHosts(getHosts(configuration))
+ .clusterName(configuration.getString(ELASTICSEARCH_CLUSTER_NAME))
+ .nbShards(configuration.getInteger(ELASTICSEARCH_NB_SHARDS, DEFAULT_NB_SHARDS))
+ .nbReplica(configuration.getInteger(ELASTICSEARCH_NB_REPLICA, DEFAULT_NB_REPLICA))
+ .minDelay(Optional.ofNullable(configuration.getInteger(ELASTICSEARCH_RETRY_CONNECTION_MIN_DELAY, null)))
+ .maxRetries(Optional.ofNullable(configuration.getInteger(ELASTICSEARCH_RETRY_CONNECTION_MAX_RETRIES, null)))
+ .build();
+ }
+
+ private static ImmutableList<Host> getHosts(Configuration propertiesReader) throws ConfigurationException {
+ AbstractConfiguration.setDefaultListDelimiter(',');
+ Optional<String> masterHost = Optional.ofNullable(
+ propertiesReader.getString(ELASTICSEARCH_MASTER_HOST, null));
+ Optional<Integer> masterPort = Optional.ofNullable(
+ propertiesReader.getInteger(ELASTICSEARCH_PORT, null));
+ List<String> multiHosts = Arrays.asList(propertiesReader.getStringArray(ELASTICSEARCH_HOSTS));
+
+ validateHostsConfigurationOptions(masterHost, masterPort, multiHosts);
+
+ if (masterHost.isPresent()) {
+ return ImmutableList.of(
+ Host.from(masterHost.get(),
+ masterPort.get()));
+ } else {
+ return multiHosts.stream()
+ .map(ipAndPort -> Host.parse(ipAndPort, DEFAULT_PORT_AS_OPTIONAL))
+ .collect(Guavate.toImmutableList());
+ }
+ }
+
+ @VisibleForTesting
+ static void validateHostsConfigurationOptions(Optional<String> masterHost,
+ Optional<Integer> masterPort,
+ List<String> multiHosts) throws ConfigurationException {
+ if (masterHost.isPresent() != masterPort.isPresent()) {
+ throw new ConfigurationException(ELASTICSEARCH_MASTER_HOST + " and " + ELASTICSEARCH_PORT + " should be specified together");
+ }
+ if (!multiHosts.isEmpty() && masterHost.isPresent()) {
+ throw new ConfigurationException("You should choose between mono host set up and " + ELASTICSEARCH_HOSTS);
+ }
+ if (multiHosts.isEmpty() && !masterHost.isPresent()) {
+ throw new ConfigurationException("You should specify either (" + ELASTICSEARCH_MASTER_HOST + " and " + ELASTICSEARCH_PORT + ") or " + ELASTICSEARCH_HOSTS);
+ }
+ }
+
+ private final ImmutableList<Host> hosts;
+ private final Optional<String> clusterName;
+ private final int nbShards;
+ private final int nbReplica;
+ private final int minDelay;
+ private final int maxRetries;
+
+ private ElasticSearchConfiguration(ImmutableList<Host> hosts, Optional<String> clusterName, int nbShards, int nbReplica, int minDelay, int maxRetries) {
+ this.hosts = hosts;
+ this.clusterName = clusterName;
+ this.nbShards = nbShards;
+ this.nbReplica = nbReplica;
+ this.minDelay = minDelay;
+ this.maxRetries = maxRetries;
+ }
+
+ public ImmutableList<Host> getHosts() {
+ return hosts;
+ }
+
+ public Optional<String> getClusterName() {
+ return clusterName;
+ }
+
+ public int getNbShards() {
+ return nbShards;
+ }
+
+ public int getNbReplica() {
+ return nbReplica;
+ }
+
+ public int getMinDelay() {
+ return minDelay;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof ElasticSearchConfiguration) {
+ ElasticSearchConfiguration that = (ElasticSearchConfiguration) o;
+
+ return Objects.equals(this.nbShards, that.nbShards)
+ && Objects.equals(this.clusterName, that.clusterName)
+ && Objects.equals(this.nbReplica, that.nbReplica)
+ && Objects.equals(this.minDelay, that.minDelay)
+ && Objects.equals(this.maxRetries, that.maxRetries)
+ && Objects.equals(this.hosts, that.hosts);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(hosts, clusterName, nbShards, nbReplica, minDelay, maxRetries);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
new file mode 100644
index 0000000..8572e4d
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ElasticSearchIndexer.java
@@ -0,0 +1,116 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.backends.es.v6;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang3.StringUtils;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.ValidationException;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class ElasticSearchIndexer {
+ private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
+ private static final int DEFAULT_BATCH_SIZE = 100;
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
+
+ private final Client client;
+ private final DeleteByQueryPerformer deleteByQueryPerformer;
+ private final AliasName aliasName;
+ private final TypeName typeName;
+
+ public ElasticSearchIndexer(Client client, ExecutorService executor,
+ WriteAliasName aliasName,
+ TypeName typeName) {
+ this(client, executor, aliasName, typeName, DEFAULT_BATCH_SIZE);
+ }
+
+ @VisibleForTesting
+ public ElasticSearchIndexer(Client client, ExecutorService executor,
+ WriteAliasName aliasName,
+ TypeName typeName,
+ int batchSize) {
+ this.client = client;
+ this.deleteByQueryPerformer = new DeleteByQueryPerformer(client, executor, batchSize, aliasName, typeName);
+ this.aliasName = aliasName;
+ this.typeName = typeName;
+ }
+
+ public IndexResponse index(String id, String content) {
+ checkArgument(content);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Indexing {}: {}", id, StringUtils.left(content, DEBUG_MAX_LENGTH_CONTENT));
+ }
+ return client.prepareIndex(aliasName.getValue(), typeName.getValue(), id)
+ .setSource(content)
+ .get();
+ }
+
+ public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts) {
+ try {
+ Preconditions.checkNotNull(updatedDocumentParts);
+ BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+ updatedDocumentParts.forEach(updatedDocumentPart -> bulkRequestBuilder.add(
+ client.prepareUpdate(
+ aliasName.getValue(),
+ typeName.getValue(),
+ updatedDocumentPart.getId())
+ .setDoc(updatedDocumentPart.getUpdatedDocumentPart())));
+ return Optional.of(bulkRequestBuilder.get());
+ } catch (ValidationException e) {
+ LOGGER.warn("Error while updating index", e);
+ return Optional.empty();
+ }
+ }
+
+ public Optional<BulkResponse> delete(List<String> ids) {
+ try {
+ BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+ ids.forEach(id -> bulkRequestBuilder.add(
+ client.prepareDelete(
+ aliasName.getValue(),
+ typeName.getValue(),
+ id)));
+ return Optional.of(bulkRequestBuilder.get());
+ } catch (ValidationException e) {
+ LOGGER.warn("Error while deleting index", e);
+ return Optional.empty();
+ }
+ }
+
+ public Future<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder) {
+ return deleteByQueryPerformer.perform(queryBuilder);
+ }
+
+ private void checkArgument(String content) {
+ Preconditions.checkArgument(content != null, "content should be provided");
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
new file mode 100644
index 0000000..bdce72c
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java
@@ -0,0 +1,158 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import javax.inject.Inject;
+
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
+import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.indices.IndexAlreadyExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class IndexCreationFactory {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreationFactory.class);
+ public static final String CASE_INSENSITIVE = "case_insensitive";
+ public static final String KEEP_MAIL_AND_URL = "keep_mail_and_url";
+ public static final String SNOWBALL_KEEP_MAIL_AND_URL = "snowball_keep_mail_and_token";
+ public static final String ENGLISH_SNOWBALL = "english_snowball";
+
+ private IndexName indexName;
+ private ArrayList<AliasName> aliases;
+ private int nbShards;
+ private int nbReplica;
+
+ @Inject
+ public IndexCreationFactory(ElasticSearchConfiguration configuration) {
+ indexName = null;
+ aliases = new ArrayList<>();
+ nbShards = configuration.getNbShards();
+ nbReplica = configuration.getNbReplica();
+ }
+
+ public IndexCreationFactory useIndex(IndexName indexName) {
+ Preconditions.checkNotNull(indexName);
+ this.indexName = indexName;
+ return this;
+ }
+
+ public IndexCreationFactory addAlias(AliasName aliasName) {
+ Preconditions.checkNotNull(aliasName);
+ this.aliases.add(aliasName);
+ return this;
+ }
+
+ public Client createIndexAndAliases(Client client) {
+ Preconditions.checkNotNull(indexName);
+ try {
+ createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica));
+ aliases.forEach(alias -> createAliasIfNeeded(client, indexName, alias));
+ } catch (IOException e) {
+ LOGGER.error("Error while creating index : ", e);
+ }
+ return client;
+ }
+
+ private void createAliasIfNeeded(Client client, IndexName indexName, AliasName aliasName) {
+ if (!aliasExist(client, aliasName)) {
+ client.admin()
+ .indices()
+ .aliases(new IndicesAliasesRequest()
+ .addAlias(aliasName.getValue(), indexName.getValue()))
+ .actionGet();
+ }
+ }
+
+ private boolean aliasExist(Client client, AliasName aliasName) {
+ return client.admin()
+ .indices()
+ .aliasesExist(new GetAliasesRequest()
+ .aliases(aliasName.getValue()))
+ .actionGet()
+ .exists();
+ }
+
+ private void createIndexIfNeeded(Client client, IndexName indexName, XContentBuilder settings) {
+ try {
+ client.admin()
+ .indices()
+ .prepareCreate(indexName.getValue())
+ .setSettings(settings)
+ .execute()
+ .actionGet();
+ } catch (IndexAlreadyExistsException exception) {
+ LOGGER.info("Index [{}] already exist", indexName);
+ }
+ }
+
+ private XContentBuilder generateSetting(int nbShards, int nbReplica) throws IOException {
+ return jsonBuilder()
+ .startObject()
+ .field("number_of_shards", nbShards)
+ .field("number_of_replicas", nbReplica)
+ .startObject("analysis")
+ .startObject("analyzer")
+ .startObject(CASE_INSENSITIVE)
+ .field("tokenizer", "keyword")
+ .startArray("filter")
+ .value("lowercase")
+ .endArray()
+ .endObject()
+ .endObject()
+ .startObject("analyzer")
+ .startObject(KEEP_MAIL_AND_URL)
+ .field("tokenizer", "uax_url_email")
+ .startArray("filter")
+ .value("lowercase")
+ .value("stop")
+ .endArray()
+ .endObject()
+ .endObject()
+ .startObject("filter")
+ .startObject(ENGLISH_SNOWBALL)
+ .field("type", "snowball")
+ .field("language", "English")
+ .endObject()
+ .endObject()
+ .startObject("analyzer")
+ .startObject(SNOWBALL_KEEP_MAIL_AND_URL)
+ .field("tokenizer", "uax_url_email")
+ .startArray("filter")
+ .value("lowercase")
+ .value("stop")
+ .value(ENGLISH_SNOWBALL)
+ .endArray()
+ .endObject()
+ .endObject()
+ .endObject()
+ .endObject();
+ }
+
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexName.java
new file mode 100644
index 0000000..39c3f28
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexName.java
@@ -0,0 +1,49 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import java.util.Objects;
+
+public class IndexName {
+ private final String value;
+
+ public IndexName(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof IndexName) {
+ IndexName indexName = (IndexName) o;
+
+ return Objects.equals(this.value, indexName.value);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(value);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
new file mode 100644
index 0000000..01b0bf0
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/NodeMappingFactory.java
@@ -0,0 +1,75 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import org.apache.james.util.streams.Iterators;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+public class NodeMappingFactory {
+
+ public static final String BOOLEAN = "boolean";
+ public static final String TYPE = "type";
+ public static final String LONG = "long";
+ public static final String DOUBLE = "double";
+ public static final String INDEX = "index";
+ public static final String NOT_ANALYZED = "not_analyzed";
+ public static final String STRING = "string";
+ public static final String PROPERTIES = "properties";
+ public static final String DATE = "date";
+ public static final String FORMAT = "format";
+ public static final String NESTED = "nested";
+ public static final String FIELDS = "fields";
+ public static final String RAW = "raw";
+ public static final String SPLIT_EMAIL = "splitEmail";
+ public static final String ANALYZER = "analyzer";
+ public static final String SEARCH_ANALYZER = "search_analyzer";
+ public static final String SNOWBALL = "snowball";
+ public static final String IGNORE_ABOVE = "ignore_above";
+
+ public static Client applyMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
+ if (!mappingAlreadyExist(client, indexName, typeName)) {
+ createMapping(client, indexName, typeName, mappingsSources);
+ }
+ return client;
+ }
+
+ public static boolean mappingAlreadyExist(Client client, IndexName indexName, TypeName typeName) {
+ return Iterators.toStream(client.admin()
+ .indices()
+ .prepareGetMappings(indexName.getValue())
+ .execute()
+ .actionGet()
+ .getMappings()
+ .valuesIt())
+ .anyMatch(mapping -> mapping.keys().contains(typeName.getValue()));
+ }
+
+ public static void createMapping(Client client, IndexName indexName, TypeName typeName, XContentBuilder mappingsSources) {
+ client.admin()
+ .indices()
+ .preparePutMapping(indexName.getValue())
+ .setType(typeName.getValue())
+ .setSource(mappingsSources)
+ .execute()
+ .actionGet();
+ }
+
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ReadAliasName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ReadAliasName.java
new file mode 100644
index 0000000..8763664
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/ReadAliasName.java
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+public class ReadAliasName extends AliasName {
+ public ReadAliasName(String value) {
+ super(value);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/TypeName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/TypeName.java
new file mode 100644
index 0000000..7f3dbf7
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/TypeName.java
@@ -0,0 +1,32 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+public class TypeName {
+ private final String value;
+
+ public TypeName(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/UpdatedRepresentation.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/UpdatedRepresentation.java
new file mode 100644
index 0000000..1d919c9
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/UpdatedRepresentation.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.backends.es.v6;
+
+import java.util.Objects;
+
+import org.elasticsearch.common.Strings;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
+public class UpdatedRepresentation {
+ private final String id;
+ private final String updatedDocumentPart;
+
+ public UpdatedRepresentation(String id, String updatedDocumentPart) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "Updated id must be specified " + id);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(updatedDocumentPart), "Updated document must be specified");
+ this.id = id;
+ this.updatedDocumentPart = updatedDocumentPart;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public String getUpdatedDocumentPart() {
+ return updatedDocumentPart;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof UpdatedRepresentation) {
+ UpdatedRepresentation other = (UpdatedRepresentation) o;
+ return Objects.equals(id, other.id)
+ && Objects.equals(updatedDocumentPart, other.updatedDocumentPart);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(id, updatedDocumentPart);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("id", id)
+ .add("updatedDocumentPart", updatedDocumentPart)
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/WriteAliasName.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/WriteAliasName.java
new file mode 100644
index 0000000..0947ad7
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/WriteAliasName.java
@@ -0,0 +1,26 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+public class WriteAliasName extends AliasName {
+ public WriteAliasName(String value) {
+ super(value);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
new file mode 100644
index 0000000..eca5bae
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java
@@ -0,0 +1,81 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+import org.apache.james.util.streams.Iterators;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+
+public class ScrollIterable implements Iterable<SearchResponse> {
+
+ private static final TimeValue TIMEOUT = new TimeValue(60000);
+ private final Client client;
+ private final SearchRequestBuilder searchRequestBuilder;
+
+ public ScrollIterable(Client client, SearchRequestBuilder searchRequestBuilder) {
+ this.client = client;
+ this.searchRequestBuilder = searchRequestBuilder;
+ }
+
+ @Override
+ public Iterator<SearchResponse> iterator() {
+ return new ScrollIterator(client, searchRequestBuilder);
+ }
+
+ public Stream<SearchResponse> stream() {
+ return Iterators.toStream(iterator());
+ }
+
+ public static class ScrollIterator implements Iterator<SearchResponse> {
+
+ private final Client client;
+ private ListenableActionFuture<SearchResponse> searchResponseFuture;
+
+ public ScrollIterator(Client client, SearchRequestBuilder searchRequestBuilder) {
+ this.client = client;
+ this.searchResponseFuture = searchRequestBuilder.execute();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !allSearchResponsesConsumed(searchResponseFuture.actionGet());
+ }
+
+ @Override
+ public SearchResponse next() {
+ SearchResponse result = searchResponseFuture.actionGet();
+ searchResponseFuture = client.prepareSearchScroll(result.getScrollId())
+ .setScroll(TIMEOUT)
+ .execute();
+ return result;
+ }
+
+ private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
+ return searchResponse.getHits().getHits().length == 0;
+ }
+ }
+
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplConnectionTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplConnectionTest.java
new file mode 100644
index 0000000..a30d96e
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplConnectionTest.java
@@ -0,0 +1,96 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.james.util.docker.DockerGenericContainer;
+import org.awaitility.Awaitility;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore("JAMES-1952")
+public class ClientProviderImplConnectionTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientProviderImplConnectionTest.class);
+ private static final String DOCKER_ES_IMAGE = "elasticsearch:2.2.1";
+ private static final int ES_APPLICATIVE_PORT = 9300;
+
+ @Rule
+ public DockerGenericContainer es1 = new DockerGenericContainer(DOCKER_ES_IMAGE)
+ .withAffinityToContainer()
+ .withExposedPorts(ES_APPLICATIVE_PORT);
+
+ @Rule
+ public DockerGenericContainer es2 = new DockerGenericContainer(DOCKER_ES_IMAGE)
+ .withAffinityToContainer()
+ .withExposedPorts(ES_APPLICATIVE_PORT);
+
+ @Test
+ public void connectingASingleServerShouldWork() throws Exception {
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollInterval(5, TimeUnit.SECONDS)
+ .until(() -> isConnected(ClientProviderImpl.forHost(es1.getContainerIp(), 9300, Optional.empty())));
+ }
+
+ @Test
+ public void connectingAClusterShouldWork() throws Exception {
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollInterval(5, TimeUnit.SECONDS)
+ .until(() -> isConnected(
+ ClientProviderImpl.fromHostsString(
+ es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + ","
+ + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT,
+ Optional.empty())));
+ }
+
+ @Test
+ public void connectingAClusterWithAFailedNodeShouldWork() throws Exception {
+ es2.stop();
+
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .pollInterval(5, TimeUnit.SECONDS)
+ .until(() -> isConnected(
+ ClientProviderImpl.fromHostsString(
+ es1.getContainerIp() + ":" + ES_APPLICATIVE_PORT + ","
+ + es2.getContainerIp() + ":" + ES_APPLICATIVE_PORT,
+ Optional.empty())));
+ }
+
+ private boolean isConnected(ClientProvider clientProvider) {
+ try (Client client = clientProvider.get()) {
+ client.prepareSearch()
+ .setQuery(QueryBuilders.existsQuery("any"))
+ .get();
+ return true;
+ } catch (Exception e) {
+ LOGGER.info("Caught exception while trying to connect", e);
+ return false;
+ }
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplTest.java
new file mode 100644
index 0000000..8079425
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ClientProviderImplTest.java
@@ -0,0 +1,142 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Optional;
+
+import org.elasticsearch.common.settings.Settings;
+import org.junit.Test;
+
+public class ClientProviderImplTest {
+
+ @Test
+ public void fromHostsStringShouldThrowOnNullString() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(null, Optional.empty()))
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowOnEmptyString() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void forHostShouldThrowOnNullHost() {
+ assertThatThrownBy(() -> ClientProviderImpl.forHost(null, 9200, Optional.empty()))
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ public void forHostShouldThrowOnEmptyHost() {
+ assertThatThrownBy(() -> ClientProviderImpl.forHost("", 9200, Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void forHostShouldThrowOnNegativePort() {
+ assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", -1, Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void forHostShouldThrowOnZeroPort() {
+ assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 0, Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void forHostShouldThrowOnTooBigPort() {
+ assertThatThrownBy(() -> ClientProviderImpl.forHost("localhost", 65536, Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldEmptyAddress() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString(":9200", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowOnAbsentPort() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowWhenTooMuchParts() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:9200:9200", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowOnEmptyPort() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:", Optional.empty()))
+ .isInstanceOf(NumberFormatException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowOnInvalidPort() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:invalid", Optional.empty()))
+ .isInstanceOf(NumberFormatException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowOnNegativePort() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:-1", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowOnZeroPort() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:0", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowOnTooBigPort() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:65536", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromHostsStringShouldThrowIfOneHostIsInvalid() {
+ assertThatThrownBy(() -> ClientProviderImpl.fromHostsString("localhost:9200,localhost", Optional.empty()))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void settingsShouldBeEmptyWhenClusterNameIsEmpty() {
+ ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.empty());
+
+ assertThat(clientProvider.settings()).isEqualTo(Settings.EMPTY);
+ }
+
+ @Test
+ public void settingsShouldContainClusterNameSettingWhenClusterNameIsGiven() {
+ String clusterName = "myClusterName";
+ ClientProviderImpl clientProvider = ClientProviderImpl.fromHostsString("localhost:9200", Optional.of(clusterName));
+
+ assertThat(clientProvider.settings().get("cluster.name")).isEqualTo(clusterName);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearch.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearch.java
new file mode 100644
index 0000000..6e159f7
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearch.java
@@ -0,0 +1,126 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Optional;
+
+import org.apache.http.HttpStatus;
+import org.apache.james.util.Host;
+import org.apache.james.util.docker.DockerGenericContainer;
+import org.apache.james.util.docker.Images;
+import org.apache.james.util.docker.RateLimiters;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+import com.google.common.collect.ImmutableList;
+
+import feign.Feign;
+import feign.Logger;
+import feign.RequestLine;
+import feign.Response;
+import feign.slf4j.Slf4jLogger;
+
+public class DockerElasticSearch {
+
+ interface ElasticSearchAPI {
+
+ static ElasticSearchAPI from(Host esHttpHost) {
+ return Feign.builder()
+ .logger(new Slf4jLogger(ElasticSearchAPI.class))
+ .logLevel(Logger.Level.FULL)
+ .target(ElasticSearchAPI.class, "http://" + esHttpHost.getHostName() + ":" + esHttpHost.getPort());
+ }
+
+ @RequestLine("DELETE /_all")
+ Response deleteAllIndexes();
+
+ @RequestLine("POST /_flush?force&wait_if_ongoing=true")
+ Response flush();
+ }
+
+ private static final int ES_HTTP_PORT = 9200;
+ private static final int ES_TCP_PORT = 9300;
+
+ private final DockerGenericContainer eSContainer;
+
+ public DockerElasticSearch() {
+ this.eSContainer = new DockerGenericContainer(Images.ELASTICSEARCH_2)
+ .withExposedPorts(ES_HTTP_PORT, ES_TCP_PORT)
+ .waitingFor(new HostPortWaitStrategy().withRateLimiter(RateLimiters.TWENTIES_PER_SECOND));
+ }
+
+ public void start() {
+ if (!eSContainer.isRunning()) {
+ eSContainer.start();
+ }
+ }
+
+ public void stop() {
+ eSContainer.stop();
+ }
+
+ public int getHttpPort() {
+ return eSContainer.getMappedPort(ES_HTTP_PORT);
+ }
+
+ public int getTcpPort() {
+ return eSContainer.getMappedPort(ES_TCP_PORT);
+ }
+
+ public String getIp() {
+ return eSContainer.getHostIp();
+ }
+
+ public Host getTcpHost() {
+ return Host.from(getIp(), getTcpPort());
+ }
+
+ public Host getHttpHost() {
+ return Host.from(getIp(), getHttpPort());
+ }
+
+ public void pause() {
+ eSContainer.pause();
+ }
+
+ public void unpause() {
+ eSContainer.unpause();
+ }
+
+ public void cleanUpData() {
+ assertThat(esAPI().deleteAllIndexes().status())
+ .isEqualTo(HttpStatus.SC_OK);
+ }
+
+ public void awaitForElasticSearch() {
+ assertThat(esAPI().flush().status())
+ .isEqualTo(HttpStatus.SC_OK);
+ }
+
+ public ClientProvider clientProvider() {
+ Optional<String> noClusterName = Optional.empty();
+ return ClientProviderImpl.fromHosts(ImmutableList.of(getTcpHost()), noClusterName);
+ }
+
+ private ElasticSearchAPI esAPI() {
+ return ElasticSearchAPI.from(getHttpHost());
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchRule.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchRule.java
new file mode 100644
index 0000000..c5b19e5
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchRule.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import org.apache.james.util.Host;
+import org.junit.rules.ExternalResource;
+
+public class DockerElasticSearchRule extends ExternalResource {
+
+ private final DockerElasticSearch dockerElasticSearch = DockerElasticSearchSingleton.INSTANCE;
+
+ @Override
+ protected void before() throws Throwable {
+ dockerElasticSearch.start();
+ }
+
+ @Override
+ protected void after() {
+ dockerElasticSearch.cleanUpData();
+ }
+
+ public ClientProvider clientProvider() {
+ return dockerElasticSearch.clientProvider();
+ }
+
+ public void awaitForElasticSearch() {
+ dockerElasticSearch.awaitForElasticSearch();
+ }
+
+ public Host getTcpHost() {
+ return dockerElasticSearch.getTcpHost();
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchSingleton.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchSingleton.java
new file mode 100644
index 0000000..4fa6677
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/DockerElasticSearchSingleton.java
@@ -0,0 +1,28 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+public class DockerElasticSearchSingleton {
+ public static DockerElasticSearch INSTANCE = new DockerElasticSearch();
+
+ static {
+ INSTANCE.start();
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchConfigurationTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchConfigurationTest.java
new file mode 100644
index 0000000..43d80ef
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchConfigurationTest.java
@@ -0,0 +1,319 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Optional;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.james.util.Host;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class ElasticSearchConfigurationTest {
+
+ @Test
+ public void getNbReplicaShouldReturnConfiguredValue() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ int value = 36;
+ configuration.addProperty("elasticsearch.nb.replica", value);
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getNbReplica())
+ .isEqualTo(value);
+ }
+
+ @Test
+ public void getNbReplicaShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getNbReplica())
+ .isEqualTo(ElasticSearchConfiguration.DEFAULT_NB_REPLICA);
+ }
+
+ @Test
+ public void getNbShardsShouldReturnConfiguredValue() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ int value = 36;
+ configuration.addProperty("elasticsearch.nb.shards", value);
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getNbShards())
+ .isEqualTo(value);
+ }
+
+ @Test
+ public void getNbShardsShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getNbShards())
+ .isEqualTo(ElasticSearchConfiguration.DEFAULT_NB_SHARDS);
+ }
+
+ @Test
+ public void getMaxRetriesShouldReturnConfiguredValue() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ int value = 36;
+ configuration.addProperty("elasticsearch.retryConnection.maxRetries", value);
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getMaxRetries())
+ .isEqualTo(value);
+ }
+
+ @Test
+ public void getMaxRetriesShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getMaxRetries())
+ .isEqualTo(ElasticSearchConfiguration.DEFAULT_CONNECTION_MAX_RETRIES);
+ }
+
+ @Test
+ public void getMinDelayShouldReturnConfiguredValue() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ int value = 36;
+ configuration.addProperty("elasticsearch.retryConnection.minDelay", value);
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getMinDelay())
+ .isEqualTo(value);
+ }
+
+ @Test
+ public void getMinDelayShouldReturnDefaultValueWhenMissing() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("elasticsearch.hosts", "127.0.0.1");
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getMinDelay())
+ .isEqualTo(ElasticSearchConfiguration.DEFAULT_CONNECTION_MIN_DELAY);
+ }
+
+ @Test
+ public void getHostsShouldReturnConfiguredHostsWhenNoPort() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ String hostname = "myHost";
+ configuration.addProperty("elasticsearch.hosts", hostname);
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getHosts())
+ .containsOnly(Host.from(hostname, ElasticSearchConfiguration.DEFAULT_PORT));
+ }
+
+ @Test
+ public void getHostsShouldReturnConfiguredHostsWhenListIsUsed() throws ConfigurationException {
+ String hostname = "myHost";
+ String hostname2 = "myOtherHost";
+ int port = 2154;
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("elasticsearch.hosts", hostname + "," + hostname2 + ":" + port);
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getHosts())
+ .containsOnly(Host.from(hostname, ElasticSearchConfiguration.DEFAULT_PORT),
+ Host.from(hostname2, port));
+ }
+
+ @Test
+ public void getHostsShouldReturnConfiguredHosts() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ String hostname = "myHost";
+ int port = 2154;
+ configuration.addProperty("elasticsearch.hosts", hostname + ":" + port);
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getHosts())
+ .containsOnly(Host.from(hostname, port));
+ }
+
+ @Test
+ public void getHostsShouldReturnConfiguredMasterHost() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ String hostname = "myHost";
+ configuration.addProperty("elasticsearch.masterHost", hostname);
+ int port = 9300;
+ configuration.addProperty("elasticsearch.port", port);
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getHosts())
+ .containsOnly(Host.from(hostname, port));
+ }
+
+ @Test
+ public void clusterNameShouldBeEmptyWhenNotGiven() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ String hostname = "myHost";
+ configuration.addProperty("elasticsearch.masterHost", hostname);
+ int port = 9300;
+ configuration.addProperty("elasticsearch.port", port);
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getClusterName())
+ .isEmpty();
+ }
+
+ @Test
+ public void clusterNameShouldBeEmptyWhenNull() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ String hostname = "myHost";
+ configuration.addProperty("elasticsearch.masterHost", hostname);
+ int port = 9300;
+ configuration.addProperty("elasticsearch.port", port);
+ configuration.addProperty("elasticsearch.clusterName", null);
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getClusterName())
+ .isEmpty();
+ }
+
+ @Test
+ public void clusterNameShouldKeepTheValueWhenGiven() throws ConfigurationException {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ String hostname = "myHost";
+ configuration.addProperty("elasticsearch.masterHost", hostname);
+ int port = 9300;
+ configuration.addProperty("elasticsearch.port", port);
+ String clusterName = "myClusterName";
+ configuration.addProperty("elasticsearch.clusterName", clusterName);
+
+ ElasticSearchConfiguration elasticSearchConfiguration = ElasticSearchConfiguration.fromProperties(configuration);
+
+ assertThat(elasticSearchConfiguration.getClusterName())
+ .contains(clusterName);
+ }
+
+ @Test
+ public void validateHostsConfigurationOptionsShouldThrowWhenNoHostSpecify() {
+ assertThatThrownBy(() ->
+ ElasticSearchConfiguration.validateHostsConfigurationOptions(
+ Optional.empty(),
+ Optional.empty(),
+ ImmutableList.of()))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessage("You should specify either (" + ElasticSearchConfiguration.ELASTICSEARCH_MASTER_HOST +
+ " and " + ElasticSearchConfiguration.ELASTICSEARCH_PORT +
+ ") or " + ElasticSearchConfiguration.ELASTICSEARCH_HOSTS);
+ }
+
+ @Test
+ public void validateHostsConfigurationOptionsShouldThrowWhenMonoAndMultiHostSpecified() {
+ assertThatThrownBy(() ->
+ ElasticSearchConfiguration.validateHostsConfigurationOptions(
+ Optional.of("localhost"),
+ Optional.of(9200),
+ ImmutableList.of("localhost:9200")))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessage("You should choose between mono host set up and " + ElasticSearchConfiguration.ELASTICSEARCH_HOSTS);
+ }
+
+ @Test
+ public void validateHostsConfigurationOptionsShouldThrowWhenMonoHostWithoutPort() {
+ assertThatThrownBy(() ->
+ ElasticSearchConfiguration.validateHostsConfigurationOptions(
+ Optional.of("localhost"),
+ Optional.empty(),
+ ImmutableList.of()))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessage(ElasticSearchConfiguration.ELASTICSEARCH_MASTER_HOST +
+ " and " + ElasticSearchConfiguration.ELASTICSEARCH_PORT + " should be specified together");
+ }
+
+ @Test
+ public void validateHostsConfigurationOptionsShouldThrowWhenMonoHostWithoutAddress() {
+ assertThatThrownBy(() ->
+ ElasticSearchConfiguration.validateHostsConfigurationOptions(
+ Optional.empty(),
+ Optional.of(9200),
+ ImmutableList.of()))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessage(ElasticSearchConfiguration.ELASTICSEARCH_MASTER_HOST + " and " +
+ ElasticSearchConfiguration.ELASTICSEARCH_PORT + " should be specified together");
+ }
+
+ @Test
+ public void validateHostsConfigurationOptionsShouldAcceptMonoHostConfiguration() throws Exception {
+ ElasticSearchConfiguration.validateHostsConfigurationOptions(
+ Optional.of("localhost"),
+ Optional.of(9200),
+ ImmutableList.of());
+ }
+
+ @Test
+ public void validateHostsConfigurationOptionsShouldAcceptMultiHostConfiguration() throws Exception {
+ ElasticSearchConfiguration.validateHostsConfigurationOptions(
+ Optional.empty(),
+ Optional.empty(),
+ ImmutableList.of("localhost:9200"));
+ }
+
+
+ @Test
+ public void nbReplicaShouldThrowWhenNegative() {
+ assertThatThrownBy(() ->
+ ElasticSearchConfiguration.builder()
+ .nbReplica(-1))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void nbShardsShouldThrowWhenNegative() {
+ assertThatThrownBy(() ->
+ ElasticSearchConfiguration.builder()
+ .nbShards(-1))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void nbShardsShouldThrowWhenZero() {
+ assertThatThrownBy(() ->
+ ElasticSearchConfiguration.builder()
+ .nbShards(0))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
new file mode 100644
index 0000000..4346958
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/ElasticSearchIndexerTest.java
@@ -0,0 +1,248 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
+
+import java.util.concurrent.Executors;
+
+import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class ElasticSearchIndexerTest {
+
+ private static final int MINIMUM_BATCH_SIZE = 1;
+ private static final IndexName INDEX_NAME = new IndexName("index_name");
+ private static final WriteAliasName ALIAS_NAME = new WriteAliasName("alias_name");
+ private static final TypeName TYPE_NAME = new TypeName("type_name");
+
+ @Rule
+ public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
+ private ElasticSearchIndexer testee;
+
+ @Before
+ public void setup() {
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(INDEX_NAME)
+ .addAlias(ALIAS_NAME)
+ .createIndexAndAliases(getESClient());
+ testee = new ElasticSearchIndexer(getESClient(),
+ Executors.newSingleThreadExecutor(NamedThreadFactory.withClassName(getClass())),
+ ALIAS_NAME, TYPE_NAME, MINIMUM_BATCH_SIZE);
+ }
+
+ private Client getESClient() {
+ return elasticSearch.clientProvider().get();
+ }
+
+ @Test
+ public void indexMessageShouldWork() {
+ String messageId = "1";
+ String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+ testee.index(messageId, content);
+ elasticSearch.awaitForElasticSearch();
+
+ try (Client client = getESClient()) {
+ SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setQuery(QueryBuilders.matchQuery("message", "trying"))
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void indexMessageShouldThrowWhenJsonIsNull() {
+ assertThatThrownBy(() -> testee.index("1", null))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void updateMessages() {
+ String messageId = "1";
+ String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}";
+
+ testee.index(messageId, content);
+ elasticSearch.awaitForElasticSearch();
+
+ testee.update(ImmutableList.of(new UpdatedRepresentation(messageId, "{\"message\": \"mastering out Elasticsearch\"}")));
+ elasticSearch.awaitForElasticSearch();
+
+ try (Client client = getESClient()) {
+ SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setQuery(QueryBuilders.matchQuery("message", "mastering"))
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+
+ try (Client client = getESClient()) {
+ SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setQuery(QueryBuilders.matchQuery("field", "unchanged"))
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void updateMessageShouldThrowWhenJsonIsNull() {
+ assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation("1", null))))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void updateMessageShouldThrowWhenIdIsNull() {
+ assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}"))))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void updateMessageShouldThrowWhenJsonIsEmpty() {
+ assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation("1", ""))))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void updateMessageShouldThrowWhenIdIsEmpty() {
+ assertThatThrownBy(() -> testee.update(ImmutableList.of(new UpdatedRepresentation("", "{\"message\": \"mastering out Elasticsearch\"}"))))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void deleteByQueryShouldWorkOnSingleMessage() throws Exception {
+ String messageId = "1:2";
+ String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
+
+ testee.index(messageId, content);
+ elasticSearch.awaitForElasticSearch();
+
+ testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+ elasticSearch.awaitForElasticSearch();
+
+ try (Client client = getESClient()) {
+ SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setQuery(QueryBuilders.matchAllQuery())
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception {
+ String messageId = "1:1";
+ String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
+
+ testee.index(messageId, content);
+
+ String messageId2 = "1:2";
+ String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"property\":\"1\"}";
+
+ testee.index(messageId2, content2);
+
+ String messageId3 = "2:3";
+ String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}";
+
+ testee.index(messageId3, content3);
+ elasticSearch.awaitForElasticSearch();
+
+ testee.deleteAllMatchingQuery(termQuery("property", "1")).get();
+ elasticSearch.awaitForElasticSearch();
+
+ try (Client client = getESClient()) {
+ SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setQuery(QueryBuilders.matchAllQuery())
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void deleteMessage() {
+ String messageId = "1:2";
+ String content = "{\"message\": \"trying out Elasticsearch\"}";
+
+ testee.index(messageId, content);
+ elasticSearch.awaitForElasticSearch();
+
+ testee.delete(ImmutableList.of(messageId));
+ elasticSearch.awaitForElasticSearch();
+
+ try (Client client = getESClient()) {
+ SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setQuery(QueryBuilders.matchAllQuery())
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ public void deleteShouldWorkWhenMultipleMessages() {
+ String messageId = "1:1";
+ String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}";
+
+ testee.index(messageId, content);
+
+ String messageId2 = "1:2";
+ String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"mailboxId\":\"1\"}";
+
+ testee.index(messageId2, content2);
+
+ String messageId3 = "2:3";
+ String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}";
+
+ testee.index(messageId3, content3);
+ elasticSearch.awaitForElasticSearch();
+
+ testee.delete(ImmutableList.of(messageId, messageId3));
+ elasticSearch.awaitForElasticSearch();
+
+ try (Client client = getESClient()) {
+ SearchResponse searchResponse = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setQuery(QueryBuilders.matchAllQuery())
+ .get();
+ assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void updateMessagesShouldNotThrowWhenEmptyList() {
+ testee.update(ImmutableList.of());
+ }
+
+ @Test
+ public void deleteMessagesShouldNotThrowWhenEmptyList() {
+ testee.delete(ImmutableList.of());
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/IndexCreationFactoryTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/IndexCreationFactoryTest.java
new file mode 100644
index 0000000..a5417eb
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/IndexCreationFactoryTest.java
@@ -0,0 +1,68 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class IndexCreationFactoryTest {
+ public static final IndexName INDEX_NAME = new IndexName("index");
+ public static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
+
+ @Rule
+ public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
+ private ClientProvider clientProvider;
+
+ @Before
+ public void setUp() {
+ clientProvider = elasticSearch.clientProvider();
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(INDEX_NAME)
+ .addAlias(ALIAS_NAME)
+ .createIndexAndAliases(clientProvider.get());
+ }
+
+ @Test
+ public void createIndexAndAliasShouldNotThrowWhenCalledSeveralTime() {
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(INDEX_NAME)
+ .addAlias(ALIAS_NAME)
+ .createIndexAndAliases(clientProvider.get());
+ }
+
+ @Test
+ public void useIndexShouldThrowWhenNull() {
+ assertThatThrownBy(() ->
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(null))
+ .isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ public void addAliasShouldThrowWhenNull() {
+ assertThatThrownBy(() ->
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .addAlias(null))
+ .isInstanceOf(NullPointerException.class);
+ }
+}
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/NodeMappingFactoryTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/NodeMappingFactoryTest.java
new file mode 100644
index 0000000..bcbefef
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/NodeMappingFactoryTest.java
@@ -0,0 +1,101 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class NodeMappingFactoryTest {
+ public static final String MESSAGE = "message";
+ public static final IndexName INDEX_NAME = new IndexName("index");
+ public static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
+ public static final TypeName TYPE_NAME = new TypeName("type");
+
+ @Rule
+ public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
+ private ClientProvider clientProvider;
+
+ @Before
+ public void setUp() throws Exception {
+ clientProvider = elasticSearch.clientProvider();
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(INDEX_NAME)
+ .addAlias(ALIAS_NAME)
+ .createIndexAndAliases(clientProvider.get());
+ NodeMappingFactory.applyMapping(clientProvider.get(),
+ INDEX_NAME,
+ TYPE_NAME,
+ getMappingsSources());
+ }
+
+ @Test
+ public void applyMappingShouldNotThrowWhenCalledSeveralTime() throws Exception {
+ NodeMappingFactory.applyMapping(clientProvider.get(),
+ INDEX_NAME,
+ TYPE_NAME,
+ getMappingsSources());
+ }
+
+ @Test
+ public void applyMappingShouldNotThrowWhenIndexerChanges() throws Exception {
+ NodeMappingFactory.applyMapping(clientProvider.get(),
+ INDEX_NAME,
+ TYPE_NAME,
+ getMappingsSources());
+
+ elasticSearch.awaitForElasticSearch();
+
+ NodeMappingFactory.applyMapping(clientProvider.get(),
+ INDEX_NAME,
+ TYPE_NAME,
+ getOtherMappingsSources());
+ }
+
+ private XContentBuilder getMappingsSources() throws Exception {
+ return jsonBuilder()
+ .startObject()
+ .startObject(TYPE_NAME.getValue())
+ .startObject(NodeMappingFactory.PROPERTIES)
+ .startObject(MESSAGE)
+ .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING)
+ .endObject()
+ .endObject()
+ .endObject()
+ .endObject();
+ }
+
+ private XContentBuilder getOtherMappingsSources() throws Exception {
+ return jsonBuilder()
+ .startObject()
+ .startObject(TYPE_NAME.getValue())
+ .startObject(NodeMappingFactory.PROPERTIES)
+ .startObject(MESSAGE)
+ .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING)
+ .field(NodeMappingFactory.INDEX, NodeMappingFactory.NOT_ANALYZED)
+ .endObject()
+ .endObject()
+ .endObject()
+ .endObject();
+ }
+}
\ No newline at end of file
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
new file mode 100644
index 0000000..4810975
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java
@@ -0,0 +1,205 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es.v6.search;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.james.backends.es.v6.ClientProvider;
+import org.apache.james.backends.es.v6.DockerElasticSearchRule;
+import org.apache.james.backends.es.v6.ElasticSearchConfiguration;
+import org.apache.james.backends.es.v6.IndexCreationFactory;
+import org.apache.james.backends.es.v6.IndexName;
+import org.apache.james.backends.es.v6.NodeMappingFactory;
+import org.apache.james.backends.es.v6.ReadAliasName;
+import org.apache.james.backends.es.v6.TypeName;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionFactory;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class ScrollIterableTest {
+
+ private static final TimeValue TIMEOUT = new TimeValue(6000);
+ private static final int SIZE = 2;
+ private static final String MESSAGE = "message";
+ private static final IndexName INDEX_NAME = new IndexName("index");
+ private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
+ private static final TypeName TYPE_NAME = new TypeName("messages");
+
+ private static final ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS);
+
+ @Rule
+ public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule();
+ private ClientProvider clientProvider;
+
+ @Before
+ public void setUp() throws Exception {
+ clientProvider = elasticSearch.clientProvider();
+ new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION)
+ .useIndex(INDEX_NAME)
+ .addAlias(ALIAS_NAME)
+ .createIndexAndAliases(clientProvider.get());
+ elasticSearch.awaitForElasticSearch();
+ NodeMappingFactory.applyMapping(clientProvider.get(), INDEX_NAME, TYPE_NAME, getMappingsSources());
+ }
+
+ private XContentBuilder getMappingsSources() throws IOException {
+ return jsonBuilder()
+ .startObject()
+ .startObject(TYPE_NAME.getValue())
+ .startObject(NodeMappingFactory.PROPERTIES)
+ .startObject(MESSAGE)
+ .field(NodeMappingFactory.TYPE, NodeMappingFactory.STRING)
+ .endObject()
+ .endObject()
+ .endObject()
+ .endObject();
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenEmpty() {
+ try (Client client = clientProvider.get()) {
+ SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setScroll(TIMEOUT)
+ .setQuery(matchAllQuery())
+ .setSize(SIZE);
+
+ assertThat(new ScrollIterable(client, searchRequestBuilder))
+ .isEmpty();
+ }
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenOneElement() {
+ try (Client client = clientProvider.get()) {
+ String id = "1";
+ client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id)
+ .setSource(MESSAGE, "Sample message")
+ .execute();
+
+ elasticSearch.awaitForElasticSearch();
+ WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
+
+ SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setScroll(TIMEOUT)
+ .setQuery(matchAllQuery())
+ .setSize(SIZE);
+
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder)))
+ .containsOnly(id);
+ }
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenSizeElement() {
+ try (Client client = clientProvider.get()) {
+ String id1 = "1";
+ client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
+ .setSource(MESSAGE, "Sample message")
+ .execute();
+
+ String id2 = "2";
+ client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
+ .setSource(MESSAGE, "Sample message")
+ .execute();
+
+ elasticSearch.awaitForElasticSearch();
+ WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
+
+ SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setScroll(TIMEOUT)
+ .setQuery(matchAllQuery())
+ .setSize(SIZE);
+
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder)))
+ .containsOnly(id1, id2);
+ }
+ }
+
+ @Test
+ public void scrollIterableShouldWorkWhenMoreThanSizeElement() {
+ try (Client client = clientProvider.get()) {
+ String id1 = "1";
+ client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1)
+ .setSource(MESSAGE, "Sample message")
+ .execute();
+
+ String id2 = "2";
+ client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2)
+ .setSource(MESSAGE, "Sample message")
+ .execute();
+
+ String id3 = "3";
+ client.prepareIndex(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id3)
+ .setSource(MESSAGE, "Sample message")
+ .execute();
+
+ elasticSearch.awaitForElasticSearch();
+ WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
+
+ SearchRequestBuilder searchRequestBuilder = client.prepareSearch(INDEX_NAME.getValue())
+ .setTypes(TYPE_NAME.getValue())
+ .setScroll(TIMEOUT)
+ .setQuery(matchAllQuery())
+ .setSize(SIZE);
+
+ assertThat(convertToIdList(new ScrollIterable(client, searchRequestBuilder)))
+ .containsOnly(id1, id2, id3);
+ }
+ }
+
+ private List<String> convertToIdList(ScrollIterable scrollIterable) {
+ return scrollIterable.stream()
+ .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()))
+ .map(SearchHit::getId)
+ .collect(Collectors.toList());
+ }
+
+ private void hasIdsInIndex(Client client, String... ids) {
+ SearchHit[] hits = client.prepareSearch(INDEX_NAME.getValue())
+ .setQuery(QueryBuilders.idsQuery(TYPE_NAME.getValue()).addIds(ids))
+ .execute()
+ .actionGet()
+ .getHits()
+ .hits();
+
+ assertThat(hits)
+ .extracting(SearchHit::getId)
+ .contains(ids);
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/utils/TestingClientProvider.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/utils/TestingClientProvider.java
new file mode 100644
index 0000000..967b80e
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/utils/TestingClientProvider.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.backends.es.v6.utils;
+
+import org.apache.james.backends.es.v6.ClientProvider;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.node.Node;
+
+public class TestingClientProvider implements ClientProvider {
+
+ private final Node node;
+
+ public TestingClientProvider(Node node) {
+ this.node = node;
+ }
+
+ @Override
+ public Client get() {
+ return node.client();
+ }
+}
diff --git a/backends-common/elasticsearch-v6/src/test/resources/logback-test.xml b/backends-common/elasticsearch-v6/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..dd2d81e
--- /dev/null
+++ b/backends-common/elasticsearch-v6/src/test/resources/logback-test.xml
@@ -0,0 +1,12 @@
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date %-5level [%thread] - [%logger]- %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>
\ No newline at end of file
diff --git a/backends-common/pom.xml b/backends-common/pom.xml
index 5cab887..f6a96ff 100644
--- a/backends-common/pom.xml
+++ b/backends-common/pom.xml
@@ -38,6 +38,7 @@
<module>elasticsearch</module>
<module>jpa</module>
<module>rabbitmq</module>
+ <module>elasticsearch-v6</module>
</modules>
</project>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org