You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/10/16 18:50:41 UTC
[2/7] camel git commit: CAMEL-11868: New ElasticSearch5 REST component
CAMEL-11868: New ElasticSearch5 REST component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/928f185f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/928f185f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/928f185f
Branch: refs/heads/master
Commit: 928f185f9c5fcb1eb48d8f626b7f569210bca5f7
Parents: 24ae294
Author: fharms <fl...@gmail.com>
Authored: Mon Oct 16 19:26:43 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Oct 16 20:00:24 2017 +0200
----------------------------------------------------------------------
components/camel-elasticsearch5-rest/pom.xml | 120 ++++++++
.../elasticsearch5/ElasticsearchComponent.java | 233 +++++++++++++++
.../ElasticsearchConfiguration.java | 245 +++++++++++++++
.../elasticsearch5/ElasticsearchConstants.java | 37 +++
.../elasticsearch5/ElasticsearchEndpoint.java | 60 ++++
.../elasticsearch5/ElasticsearchOperation.java | 55 ++++
.../elasticsearch5/ElasticsearchProducer.java | 298 +++++++++++++++++++
.../BulkRequestAggregationStrategy.java | 52 ++++
.../ElasticsearchActionRequestConverter.java | 206 +++++++++++++
.../apache/camel/component/elasticsearch5-rest | 18 ++
.../elasticsearch5/ElasticsearchBaseTest.java | 124 ++++++++
.../elasticsearch5/ElasticsearchBulkTest.java | 113 +++++++
.../ElasticsearchClusterBaseTest.java | 120 ++++++++
.../ElasticsearchClusterIndexTest.java | 90 ++++++
...icsearchGetSearchDeleteExistsUpdateTest.java | 288 ++++++++++++++++++
.../elasticsearch5/ElasticsearchIndexTest.java | 90 ++++++
.../src/test/resources/log4j2.properties | 7 +
components/pom.xml | 1 +
parent/pom.xml | 3 +-
.../spring-boot/components-starter/pom.xml | 1 +
20 files changed, 2160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/pom.xml b/components/camel-elasticsearch5-rest/pom.xml
new file mode 100644
index 0000000..77c4516
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.20.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-elasticsearch5-rest</artifactId>
+ <packaging>jar</packaging>
+ <name>Camel :: ElasticSearch5 :: REST</name>
+ <description>Camel ElasticSearch 5.x REST support</description>
+
+ <properties>
+ <elasticsearch.version>${elasticsearch5-version}</elasticsearch.version>
+ <camel.osgi.export.pkg>org.apache.camel.component.elasticsearch5.*;${camel.osgi.version}</camel.osgi.export.pkg>
+ <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=elasticsearch-rest</camel.osgi.export.service>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch5-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+ <version>${elasticsearch-sniffer-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson2-version}</version>
+ </dependency>
+
+ <!-- for testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${elasticsearch5-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codelibs</groupId>
+ <artifactId>elasticsearch-cluster-runner</artifactId>
+ <version>${elasticsearch5-cluster-runner-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <es.path.data>target/data</es.path.data>
+ </systemPropertyVariables>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
new file mode 100644
index 0000000..79eca0b
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.Metadata;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * Represents the component that manages {@link ElasticsearchEndpoint}.
+ */
+public class ElasticsearchComponent extends DefaultComponent {
+
+ @Metadata(label = "advanced")
+ private RestClient client;
+ @Metadata(label = "advanced")
+ private String hostAddresses;
+ @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT)
+ private int socketTimeout = ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT;
+ @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.MAX_RETRY_TIMEOUT)
+ private int maxRetryTimeout = ElasticsearchConstants.MAX_RETRY_TIMEOUT;
+ @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT)
+ private int connectionTimeout = ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT;
+
+ @Metadata(label = "advance")
+ private String user;
+ @Metadata(secret = true)
+ private String password;
+ @Metadata(label = "advanced", defaultValue = "false")
+ private boolean enableSSL;
+ @Metadata(label = "advanced", defaultValue = "false")
+ private boolean enableSniffer;
+ @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL)
+ private int snifferInterval = ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL;
+ @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY)
+ private int sniffAfterFailureDelay = ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY;
+
+ public ElasticsearchComponent() {
+ super();
+ }
+
+ public ElasticsearchComponent(CamelContext context) {
+ super(context);
+ }
+
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ ElasticsearchConfiguration config = new ElasticsearchConfiguration();
+ config.setHostAddresses(this.getHostAddresses());
+ config.setSocketTimeout(this.getSocketTimeout());
+ config.setMaxRetryTimeout(this.getMaxRetryTimeout());
+ config.setConnectionTimeout(this.getConnectionTimeout());
+ config.setUser(this.getUser());
+ config.setEnableSSL(this.getEnableSSL());
+ config.setPassword(this.getPassword());
+ config.setEnableSniffer(this.getEnableSniffer());
+ config.setSnifferInterval(this.getSnifferInterval());
+ config.setSniffAfterFailureDelay(this.getSniffAfterFailureDelay());
+ config.setClusterName(remaining);
+
+ setProperties(config, parameters);
+ config.setHostAddressesList(parseHostAddresses(config.getHostAddresses(), config));
+
+ Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config, client);
+ return endpoint;
+ }
+
+ private List<HttpHost> parseHostAddresses(String ipsString, ElasticsearchConfiguration config) throws UnknownHostException {
+ if (ipsString == null || ipsString.isEmpty()) {
+ return null;
+ }
+ List<String> addressesStr = Arrays.asList(ipsString.split(","));
+ List<HttpHost> addressesTrAd = new ArrayList<>(addressesStr.size());
+ for (String address : addressesStr) {
+ String[] split = address.split(":");
+ String hostname;
+ if (split.length > 0) {
+ hostname = split[0];
+ } else {
+ throw new IllegalArgumentException();
+ }
+ Integer port = split.length > 1 ? Integer.parseInt(split[1]) : ElasticsearchConstants.DEFAULT_PORT;
+ addressesTrAd.add(new HttpHost(hostname, port, config.getEnableSSL() ? "HTTPS" : "HTTP"));
+ }
+ return addressesTrAd;
+ }
+
+ public RestClient getClient() {
+ return client;
+ }
+
+ /**
+ * To use an existing configured Elasticsearch client, instead of creating a client per endpoint.
+ * This allow to customize the client with specific settings.
+ */
+ public void setClient(RestClient client) {
+ this.client = client;
+ }
+ /**
+ * Comma separated list with ip:port formatted remote transport addresses to use.
+ * The ip and port options must be left blank for hostAddresses to be considered instead.
+ */
+ public String getHostAddresses() {
+ return hostAddresses;
+ }
+
+ public void setHostAddresses(String hostAddresses) {
+ this.hostAddresses = hostAddresses;
+ }
+
+ /**
+ * The timeout in ms to wait before the socket will timeout.
+ */
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ /**
+ * The time in ms to wait before connection will timeout.
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ /**
+ * Basic authenticate user
+ */
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * Password for authenticate
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * Enable SSL
+ */
+ public Boolean getEnableSSL() {
+ return enableSSL;
+ }
+
+ public void setEnableSSL(Boolean enableSSL) {
+ this.enableSSL = enableSSL;
+ }
+
+ /**
+ * The time in ms before retry
+ */
+ public int getMaxRetryTimeout() {
+ return maxRetryTimeout;
+ }
+
+ public void setMaxRetryTimeout(int maxRetryTimeout) {
+ this.maxRetryTimeout = maxRetryTimeout;
+ }
+
+ /**
+ * Enable automatically discover nodes from a running Elasticsearch cluster
+ */
+ public Boolean getEnableSniffer() {
+ return enableSniffer;
+ }
+
+ public void setEnableSniffer(Boolean enableSniffer) {
+ this.enableSniffer = enableSniffer;
+ }
+
+ /**
+ * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when
+ * sniffOnFailure is disabled or when there are no failures between consecutive sniff executions
+ */
+ public int getSnifferInterval() {
+ return snifferInterval;
+ }
+
+ public void setSnifferInterval(int snifferInterval) {
+ this.snifferInterval = snifferInterval;
+ }
+
+ /**
+ * The delay of a sniff execution scheduled after a failure (in milliseconds)
+ */
+ public int getSniffAfterFailureDelay() {
+ return sniffAfterFailureDelay;
+ }
+
+ public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
+ this.sniffAfterFailureDelay = sniffAfterFailureDelay;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
new file mode 100644
index 0000000..e1f109b
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.util.List;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+import org.apache.http.HttpHost;
+
+@UriParams
+public class ElasticsearchConfiguration {
+
+ private List<HttpHost> hostAddressesList;
+
+ @UriPath @Metadata(required = "true")
+ private String clusterName;
+ @UriParam
+ private ElasticsearchOperation operation;
+ @UriParam
+ private String indexName;
+ @UriParam
+ private String indexType;
+ @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS)
+ private int waitForActiveShards = ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS;
+ @UriParam @Metadata(required = "true")
+ private String hostAddresses;
+ @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT)
+ private int socketTimeout = ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT;
+ @UriParam(defaultValue = "" + ElasticsearchConstants.MAX_RETRY_TIMEOUT)
+ private int maxRetryTimeout = ElasticsearchConstants.MAX_RETRY_TIMEOUT;
+ @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT)
+ private int connectionTimeout = ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT;
+ @UriParam(defaultValue = "false")
+ private boolean disconnect;
+
+ private String user;
+ private String password;
+ private boolean enableSSL;
+ //Sniffer parameter.
+ private boolean enableSniffer;
+ private int snifferInterval = ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL;
+ private int sniffAfterFailureDelay = ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY;
+ /**
+ * Name of the cluster
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ /**
+ * What operation to perform
+ */
+ public ElasticsearchOperation getOperation() {
+ return operation;
+ }
+
+ public void setOperation(ElasticsearchOperation operation) {
+ this.operation = operation;
+ }
+
+ /**
+ * The name of the index to act against
+ */
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public void setIndexName(String indexName) {
+ this.indexName = indexName;
+ }
+
+ /**
+ * The type of the index to act against
+ */
+ public String getIndexType() {
+ return indexType;
+ }
+
+ public void setIndexType(String indexType) {
+ this.indexType = indexType;
+ }
+
+ /**
+ * Comma separated list with ip:port formatted remote transport addresses to use.
+ * The ip and port options must be left blank for hostAddresses to be considered instead.
+ */
+ public String getHostAddresses() {
+ return hostAddresses;
+ }
+
+ public void setHostAddresses(String hostAddresses) {
+ this.hostAddresses = hostAddresses;
+ }
+
+ /**
+ * Index creation waits for the write consistency number of shards to be available
+ */
+ public int getWaitForActiveShards() {
+ return waitForActiveShards;
+ }
+
+ public void setWaitForActiveShards(int waitForActiveShards) {
+ this.waitForActiveShards = waitForActiveShards;
+ }
+
+ public List<HttpHost> getHostAddressesList() {
+ return hostAddressesList;
+ }
+
+ public void setHostAddressesList(List<HttpHost> hostAddressesList) {
+ this.hostAddressesList = hostAddressesList;
+ }
+
+ /**
+ * The timeout in ms to wait before the socket will timeout.
+ */
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ /**
+ * The time in ms to wait before connection will timeout.
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ /**
+ * Basic authenticate user
+ */
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * Password for authenticate
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * Enable SSL
+ */
+ public Boolean getEnableSSL() {
+ return enableSSL;
+ }
+
+ public void setEnableSSL(Boolean enableSSL) {
+ this.enableSSL = enableSSL;
+ }
+
+ /**
+ * The time in ms before retry
+ */
+ public int getMaxRetryTimeout() {
+ return maxRetryTimeout;
+ }
+
+ public void setMaxRetryTimeout(int maxRetryTimeout) {
+ this.maxRetryTimeout = maxRetryTimeout;
+ }
+
+ /**
+ * Disconnect after it finish calling the producer
+ */
+ public Boolean getDisconnect() {
+ return disconnect;
+ }
+
+ public void setDisconnect(Boolean disconnect) {
+ this.disconnect = disconnect;
+ }
+
+ /**
+ * Enable automatically discover nodes from a running Elasticsearch cluster
+ */
+ public Boolean getEnableSniffer() {
+ return enableSniffer;
+ }
+
+ public void setEnableSniffer(Boolean enableSniffer) {
+ this.enableSniffer = enableSniffer;
+ }
+
+ /**
+ * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when
+ * sniffOnFailure is disabled or when there are no failures between consecutive sniff executions
+ */
+ public int getSnifferInterval() {
+ return snifferInterval;
+ }
+
+ public void setSnifferInterval(int snifferInterval) {
+ this.snifferInterval = snifferInterval;
+ }
+
+ /**
+ * The delay of a sniff execution scheduled after a failure (in milliseconds)
+ */
+ public int getSniffAfterFailureDelay() {
+ return sniffAfterFailureDelay;
+ }
+
+ public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
+ this.sniffAfterFailureDelay = sniffAfterFailureDelay;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
new file mode 100644
index 0000000..9f2d493
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.elasticsearch5;
+
+
+public interface ElasticsearchConstants {
+
+ String PARAM_OPERATION = "operation";
+ String PARAM_INDEX_ID = "indexId";
+ String PARAM_INDEX_NAME = "indexName";
+ String PARAM_INDEX_TYPE = "indexType";
+ String PARAM_WAIT_FOR_ACTIVE_SHARDS = "waitForActiveShards";
+
+ int DEFAULT_PORT = 9200;
+ int DEFAULT_FOR_WAIT_ACTIVE_SHARDS = 1; // Meaning only wait for the primary shard
+ int DEFAULT_SOCKET_TIMEOUT = 30000; // Meaning how long time to wait before the socket timeout
+ int MAX_RETRY_TIMEOUT = 30000; // Meaning how long to wait before retry again
+ int DEFAULT_CONNECTION_TIMEOUT = 30000; // Meaning how many seconds before it timeout when establish connection
+ int DEFAULT_SNIFFER_INTERVAL = 60000 * 5; // Meaning how often it should search for elasticsearch nodes
+ int DEFAULT_AFTER_FAILURE_DELAY = 60000; // Meaning when should the sniff execution scheduled after a failure
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
new file mode 100644
index 0000000..37bafd6
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * The elasticsearch component is used for interfacing with ElasticSearch server using 5.x REST API.
+ */
+@UriEndpoint(firstVersion = "2.21.0", scheme = "elasticsearch5-rest", title = "Elastichsearch Rest or Elasticsearch 5 Rest",
+ syntax = "elasticsearch5-rest:clusterName", producerOnly = true, label = "monitoring,search")
+public class ElasticsearchEndpoint extends DefaultEndpoint {
+
+ @UriParam
+ protected final ElasticsearchConfiguration configuration;
+
+ private RestClient client;
+
+ public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config, RestClient client) throws Exception {
+ super(uri, component);
+ this.configuration = config;
+ this.client = client;
+ }
+
+ public Producer createProducer() throws Exception {
+ return new ElasticsearchProducer(this, configuration);
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ throw new UnsupportedOperationException("Cannot consume from an ElasticsearchEndpoint: " + getEndpointUri());
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public RestClient getClient() {
+ return client;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java
new file mode 100644
index 0000000..11b57f8
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+/**
+ * The ElasticSearch server operations list which are implemented
+ *
+ * Index - Index a document associated with a given index and type
+ * Update - Updates a document based on a script
+ * Bulk - Executes a bulk of index / delete operations
+ * BulkIndex - Executes a bulk of index / delete operations
+ * GetById - Gets the document that was indexed from an index with a type and id
+ * MultiGet - Multiple get documents
+ * Delete - Deletes a document from the index based on the index, type and id
+ * Search - Search across one or more indices and one or more types with a query
+ * Exists - Checks the index exists or not (using search with size=0 and terminate_after=1 parameters)
+ *
+ */
+public enum ElasticsearchOperation {
+ Index("Index"),
+ Update("Update"),
+ Bulk("Bulk"),
+ BulkIndex("BulkIndex"),
+ GetById("GetById"),
+ MultiGet("MultiGet"),
+ Delete("Delete"),
+ DeleteIndex("DeleteIndex"),
+ Search("Search"),
+ Exists("Exists");
+
+ private final String text;
+
+ ElasticsearchOperation(final String text) {
+ this.text = text;
+ }
+
+ @Override
+ public String toString() {
+ return text;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
new file mode 100644
index 0000000..f87fb54
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.lang.reflect.InvocationTargetException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.elasticsearch5.converter.ElasticsearchActionRequestConverter;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.MultiGetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.elasticsearch.client.sniff.SnifferBuilder;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents an Elasticsearch producer.
+ */
+public class ElasticsearchProducer extends DefaultProducer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class);
+
+ protected final ElasticsearchConfiguration configuration;
+ private RestClient client;
+ private Sniffer sniffer;
+
+ public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) {
+ super(endpoint);
+ this.configuration = configuration;
+ this.client = endpoint.getClient();
+ }
+
+ private ElasticsearchOperation resolveOperation(Exchange exchange) {
+ // 1. Operation can be driven by either (in order of preference):
+ // a. If the body is an ActionRequest the operation is set by the type
+ // of request.
+ // b. If the body is not an ActionRequest, the operation is set by the
+ // header if it exists.
+ // c. If neither the operation can not be derived from the body or
+ // header, the configuration is used.
+ // In the event we can't discover the operation from a, b or c we throw
+ // an error.
+ Object request = exchange.getIn().getBody();
+ if (request instanceof IndexRequest) {
+ return ElasticsearchOperation.Index;
+ } else if (request instanceof GetRequest) {
+ return ElasticsearchOperation.GetById;
+ } else if (request instanceof MultiGetRequest) {
+ return ElasticsearchOperation.MultiGet;
+ } else if (request instanceof UpdateRequest) {
+ return ElasticsearchOperation.Update;
+ } else if (request instanceof BulkRequest) {
+ // do we want bulk or bulk_index?
+ if (configuration.getOperation() == ElasticsearchOperation.BulkIndex) {
+ return configuration.getOperation().BulkIndex;
+ } else {
+ return configuration.getOperation().Bulk;
+ }
+ } else if (request instanceof DeleteRequest) {
+ return ElasticsearchOperation.Delete;
+ } else if (request instanceof SearchRequest) {
+ return ElasticsearchOperation.Search;
+ } else if (request instanceof DeleteIndexRequest) {
+ return ElasticsearchOperation.DeleteIndex;
+ }
+
+ ElasticsearchOperation operationConfig = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.class);
+ if (operationConfig == null) {
+ operationConfig = configuration.getOperation();
+ }
+ if (operationConfig == null) {
+ throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported");
+ }
+ return operationConfig;
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ if (configuration.getDisconnect() && client == null) {
+ startClient();
+ }
+ RestHighLevelClient restHighLevelClient = new RestHighLevelClient(client);
+ // 2. Index and type will be set by:
+ // a. If the incoming body is already an action request
+ // b. If the body is not an action request we will use headers if they
+ // are set.
+ // c. If the body is not an action request and the headers aren't set we
+ // will use the configuration.
+ // No error is thrown by the component in the event none of the above
+ // conditions are met. The java es client
+ // will throw.
+
+ Message message = exchange.getIn();
+ final ElasticsearchOperation operation = resolveOperation(exchange);
+
+ // Set the index/type headers on the exchange if necessary. This is used
+ // for type conversion.
+ boolean configIndexName = false;
+ String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
+ if (indexName == null) {
+ message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
+ configIndexName = true;
+ }
+
+ boolean configIndexType = false;
+ String indexType = message.getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class);
+ if (indexType == null) {
+ message.setHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, configuration.getIndexType());
+ configIndexType = true;
+ }
+
+ boolean configWaitForActiveShards = false;
+ Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
+ if (waitForActiveShards == null) {
+ message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
+ configWaitForActiveShards = true;
+ }
+
+ if (operation == ElasticsearchOperation.Index) {
+ IndexRequest indexRequest = ElasticsearchActionRequestConverter.toIndexRequest(message.getBody(), exchange);
+ message.setBody(restHighLevelClient.index(indexRequest).getId());
+ } else if (operation == ElasticsearchOperation.Update) {
+ UpdateRequest updateRequest = ElasticsearchActionRequestConverter.toUpdateRequest(message.getBody(Map.class), exchange);
+ message.setBody(restHighLevelClient.update(updateRequest).getId());
+ } else if (operation == ElasticsearchOperation.GetById) {
+ GetRequest getRequest = ElasticsearchActionRequestConverter.toGetRequest(message.getBody(), exchange);
+ message.setBody(restHighLevelClient.get(getRequest));
+ } else if (operation == ElasticsearchOperation.Bulk) {
+ BulkRequest bulkRequest = message.getBody(BulkRequest.class);
+ message.setBody(restHighLevelClient.bulk(bulkRequest).getItems());
+ } else if (operation == ElasticsearchOperation.BulkIndex) {
+ BulkRequest bulkRequest = ElasticsearchActionRequestConverter.toBulkRequest(message.getBody(), exchange);
+ List<String> indexedIds = Arrays.stream(restHighLevelClient.bulk(bulkRequest).getItems())
+ .map(BulkItemResponse::getId)
+ .collect(Collectors.toList());
+ message.setBody(indexedIds);
+ } else if (operation == ElasticsearchOperation.Delete) {
+ DeleteRequest deleteRequest = ElasticsearchActionRequestConverter.toDeleteRequest(message.getBody(), exchange);
+ message.setBody(restHighLevelClient.delete(deleteRequest).getResult());
+ } else if (operation == ElasticsearchOperation.DeleteIndex) {
+ DeleteRequest deleteRequest = ElasticsearchActionRequestConverter.toDeleteRequest(message.getBody(), exchange);
+ message.setBody(client.performRequest("Delete", deleteRequest.index()).getStatusLine().getStatusCode());
+ } else if (operation == ElasticsearchOperation.Exists) {
+ // ExistsRequest API is deprecated, using SearchRequest instead with size=0 and terminate_after=1
+ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+ sourceBuilder.size(0);
+ sourceBuilder.terminateAfter(1);
+ SearchRequest searchRequest = new SearchRequest(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
+ searchRequest.source(sourceBuilder);
+ try {
+ restHighLevelClient.search(searchRequest);
+ message.setBody(true);
+ } catch (ElasticsearchStatusException e) {
+ if (e.status().equals(RestStatus.NOT_FOUND)) {
+ message.setBody(false);
+ } else {
+ throw new IllegalStateException(e);
+ }
+
+ }
+ } else if (operation == ElasticsearchOperation.Search) {
+ SearchRequest searchRequest = ElasticsearchActionRequestConverter.toSearchRequest(message.getBody(), exchange);
+ message.setBody(restHighLevelClient.search(searchRequest).getHits());
+ } else {
+ throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
+ }
+
+ // If we set params via the configuration on this exchange, remove them
+ // now. This preserves legacy behavior for this component and enables a
+ // use case where one message can be sent to multiple elasticsearch
+ // endpoints where the user is relying on the endpoint configuration
+ // (index/type) rather than header values. If we do not clear this out
+ // sending the same message (index request, for example) to multiple
+ // elasticsearch endpoints would have the effect overriding any
+ // subsequent endpoint index/type with the first endpoint index/type.
+ if (configIndexName) {
+ message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+ }
+
+ if (configIndexType) {
+ message.removeHeader(ElasticsearchConstants.PARAM_INDEX_TYPE);
+ }
+
+ if (configWaitForActiveShards) {
+ message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+ }
+ if (configuration.getDisconnect()) {
+ IOHelper.close(client);
+ client = null;
+ if (configuration.getEnableSniffer()) {
+ IOHelper.close(sniffer);
+ sniffer = null;
+ }
+ }
+
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void doStart() throws Exception {
+ super.doStart();
+ if (!configuration.getDisconnect()) {
+ startClient();
+ }
+ }
+
+ private void startClient() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, UnknownHostException {
+ if (client == null) {
+ LOG.info("Connecting to the ElasticSearch cluster: " + configuration.getClusterName());
+ if (configuration.getHostAddressesList() != null
+ && !configuration.getHostAddressesList().isEmpty()) {
+ client = createClient();
+ } else {
+ LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+ }
+ }
+ }
+
+ private RestClient createClient() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+ final RestClientBuilder builder = RestClient.builder(configuration.getHostAddressesList().toArray(new HttpHost[0]));
+ builder.setMaxRetryTimeoutMillis(configuration.getMaxRetryTimeout());
+ builder.setRequestConfigCallback(requestConfigBuilder ->
+ requestConfigBuilder.setConnectTimeout(configuration.getConnectionTimeout()).setSocketTimeout(configuration.getSocketTimeout()));
+ if (configuration.getUser() != null && configuration.getPassword() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(configuration.getUser(), configuration.getPassword()));
+ builder.setHttpClientConfigCallback(httpClientBuilder -> {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ return httpClientBuilder;
+ });
+ }
+ final RestClient restClient = builder.build();
+ if (configuration.getEnableSniffer()) {
+ SnifferBuilder snifferBuilder = Sniffer.builder(restClient);
+ snifferBuilder.setSniffIntervalMillis(configuration.getSnifferInterval());
+ snifferBuilder.setSniffAfterFailureDelayMillis(configuration.getSniffAfterFailureDelay());
+ sniffer = snifferBuilder.build();
+ }
+ return restClient;
+ }
+
+
+ @Override
+ protected void doStop() throws Exception {
+ if (client != null) {
+ LOG.info("Disconnecting from ElasticSearch cluster: {}", configuration.getClusterName());
+ client.close();
+ if (sniffer != null) {
+ sniffer.close();
+ }
+ }
+ super.doStop();
+ }
+
+ public RestClient getClient() {
+ return client;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java
new file mode 100644
index 0000000..9f60eb4
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5.aggregation;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadRuntimeException;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+
+/**
+ * Aggregates two {@link ActionRequest}s into a single {@link BulkRequest}.
+ */
+public class BulkRequestAggregationStrategy implements AggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter.
+ Object objBody = newExchange.getIn().getBody();
+ if (!(objBody instanceof DocWriteRequest[])) {
+ throw new InvalidPayloadRuntimeException(newExchange, DocWriteRequest[].class);
+ }
+
+ DocWriteRequest[] newBody = (DocWriteRequest[]) objBody;
+ BulkRequest request;
+ if (oldExchange == null) {
+ request = new BulkRequest();
+ request.add(newBody);
+ newExchange.getIn().setBody(request);
+ return newExchange;
+ } else {
+ request = oldExchange.getIn().getBody(BulkRequest.class);
+ request.add(newBody);
+ return oldExchange;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java
new file mode 100644
index 0000000..bec9583
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5.converter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.elasticsearch5.ElasticsearchConstants;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.MultiSearchRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ElasticsearchActionRequestConverter {
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchActionRequestConverter.class);
+
+ private static final String ES_QUERY_DSL_PREFIX = "query";
+ private static final String PARENT = "parent";
+
+ private ElasticsearchActionRequestConverter() {
+ }
+
+ // Update requests
+ private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) {
+ if (document instanceof UpdateRequest) {
+ return (UpdateRequest) document;
+ }
+ UpdateRequest updateRequest = new UpdateRequest();
+ if (document instanceof byte[]) {
+ updateRequest.doc((byte[]) document);
+ } else if (document instanceof Map) {
+ updateRequest.doc((Map<String, Object>) document);
+ } else if (document instanceof String) {
+ updateRequest.doc((String) document);
+ } else if (document instanceof XContentBuilder) {
+ updateRequest.doc((XContentBuilder) document);
+ } else {
+ return null;
+ }
+
+ return updateRequest
+ .waitForActiveShards(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class))
+ .parent(exchange.getIn().getHeader(
+ PARENT, String.class))
+ .index(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_TYPE, String.class))
+ .id(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+ }
+
+ // Index requests
+ private static IndexRequest createIndexRequest(Object document, Exchange exchange) {
+ if (document instanceof IndexRequest) {
+ return (IndexRequest) document;
+ }
+ IndexRequest indexRequest = new IndexRequest();
+ if (document instanceof byte[]) {
+ indexRequest.source((byte[]) document, XContentFactory.xContentType((byte[]) document));
+ } else if (document instanceof Map) {
+ indexRequest.source((Map<String, Object>) document);
+ } else if (document instanceof String) {
+ indexRequest.source((String) document, XContentFactory.xContentType((String) document));
+ } else if (document instanceof XContentBuilder) {
+ indexRequest.source((XContentBuilder) document);
+ } else {
+ return null;
+ }
+
+ return indexRequest
+ .waitForActiveShards(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class))
+ .parent(exchange.getIn().getHeader(
+ PARENT, String.class))
+ .index(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_TYPE, String.class));
+ }
+
+ public static IndexRequest toIndexRequest(Object document, Exchange exchange) {
+ return createIndexRequest(document, exchange)
+ .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+ }
+
+ public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) {
+ return createUpdateRequest(document, exchange)
+ .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+ }
+
+ public static GetRequest toGetRequest(Object document, Exchange exchange) {
+ if (document instanceof GetRequest) {
+ return (GetRequest) document;
+ }
+ return new GetRequest(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_TYPE,
+ String.class)).id((String) document);
+ }
+
+ public static DeleteRequest toDeleteRequest(Object document, Exchange exchange) {
+ if (document instanceof DeleteRequest) {
+ return (DeleteRequest) document;
+ }
+ if (document instanceof String) {
+ return new DeleteRequest()
+ .index(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_NAME,
+ String.class))
+ .type(exchange.getIn().getHeader(
+ ElasticsearchConstants.PARAM_INDEX_TYPE,
+ String.class)).id((String) document);
+ } else {
+ throw new IllegalArgumentException("Wrong body type. Only DeleteRequest or String is allowed as a type");
+ }
+ }
+
+ public static SearchRequest toSearchRequest(Object queryObject, Exchange exchange) throws IOException {
+ SearchRequest searchRequest = new SearchRequest(exchange.getIn()
+ .getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+ .types(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class));
+
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ String queryText = null;
+
+ if (queryObject instanceof Map<?, ?>) {
+ Map<String, Object> mapQuery = (Map<String, Object>) queryObject;
+ // Remove 'query' prefix from the query object for backward compatibility
+ if (mapQuery.containsKey(ES_QUERY_DSL_PREFIX)) {
+ mapQuery = (Map<String, Object>) mapQuery.get(ES_QUERY_DSL_PREFIX);
+ }
+ try {
+ XContentBuilder contentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
+ queryText = contentBuilder.map(mapQuery).string();
+ } catch (IOException e) {
+ LOG.error(e.getMessage());
+ }
+ } else if (queryObject instanceof String) {
+ queryText = (String) queryObject;
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode jsonTextObject = mapper.readValue(queryText, JsonNode.class);
+ JsonNode parentJsonNode = jsonTextObject.get(ES_QUERY_DSL_PREFIX);
+ if (parentJsonNode != null) {
+ queryText = parentJsonNode.toString();
+ }
+ } else {
+ // Cannot convert the queryObject into SearchRequest
+ return null;
+ }
+
+ searchSourceBuilder.query(QueryBuilders.wrapperQuery(queryText));
+ searchRequest.source(searchSourceBuilder);
+
+ return searchRequest;
+ }
+
+ public static BulkRequest toBulkRequest(Object documents, Exchange exchange) {
+ if (documents instanceof BulkRequest) {
+ return (BulkRequest) documents;
+ }
+ if (documents instanceof List) {
+ BulkRequest request = new BulkRequest();
+ for (Object document : (List<Object>) documents) {
+ request.add(createIndexRequest(document, exchange));
+ }
+ return request;
+ } else {
+ throw new IllegalArgumentException("Wrong body type. Only BulkRequest or List is allowed as a type");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest b/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest
new file mode 100644
index 0000000..13cc909
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.elasticsearch5.ElasticsearchComponent
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
new file mode 100644
index 0000000..f0496e5
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.http.HttpHost;
+import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner.newConfigs;
+
+public class ElasticsearchBaseTest extends CamelTestSupport {
+
+
+ public static ElasticsearchClusterRunner runner;
+ public static String clusterName;
+ public static RestClient client;
+
+ protected static final int ES_BASE_TRANSPORT_PORT = AvailablePortFinder.getNextAvailable();
+ protected static final int ES_BASE_HTTP_PORT = AvailablePortFinder.getNextAvailable(ES_BASE_TRANSPORT_PORT+1);
+
+ @SuppressWarnings("resource")
+ @BeforeClass
+ public static void cleanupOnce() throws Exception {
+ deleteDirectory("target/testcluster/");
+ clusterName = "es-cl-run-" + System.currentTimeMillis();
+
+ runner = new ElasticsearchClusterRunner();
+ // create ES nodes
+ runner.onBuild((number, settingsBuilder) -> {
+ settingsBuilder.put("http.cors.enabled", true);
+ settingsBuilder.put("http.cors.allow-origin", "*");
+ }).build(newConfigs()
+ .clusterName(clusterName)
+ .numOfNode(1)
+ .baseHttpPort(ES_BASE_TRANSPORT_PORT)
+ .basePath("target/testcluster/"));
+
+ // wait for green status
+ runner.ensureGreen();
+ client = RestClient.builder(new HttpHost(InetAddress.getByName("localhost"),ES_BASE_HTTP_PORT)).build();
+ }
+
+ @AfterClass
+ public static void teardownOnce() throws IOException {
+ if (client != null) {
+ client.close();
+ }
+ if (runner != null) {
+ runner.close();
+ }
+ }
+
+ @Override
+ public boolean isCreateCamelContextPerClass() {
+ // let's speed up the tests using the same context
+ return true;
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+ final ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
+ elasticsearchComponent.setHostAddresses("localhost:"+ES_BASE_HTTP_PORT);
+ context.addComponent("elasticsearch5-rest", elasticsearchComponent);
+ return context;
+ }
+
+ /**
+ * As we don't delete the {@code target/data} folder for <b>each</b> test
+ * below (otherwise they would run much slower), we need to make sure
+ * there's no side effect of the same used data through creating unique
+ * indexes.
+ */
+ Map<String, String> createIndexedData(String... additionalPrefixes) {
+ String prefix = createPrefix();
+
+ // take over any potential prefixes we may have been asked for
+ if (additionalPrefixes.length > 0) {
+ StringBuilder sb = new StringBuilder(prefix);
+ for (String additionalPrefix : additionalPrefixes) {
+ sb.append(additionalPrefix).append("-");
+ }
+ prefix = sb.toString();
+ }
+
+ String key = prefix + "key";
+ String value = prefix + "value";
+ log.info("Creating indexed data using the key/value pair {} => {}", key, value);
+
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(key, value);
+ return map;
+ }
+
+ String createPrefix() {
+ // make use of the test method name to avoid collision
+ return getTestMethodName().toLowerCase() + "-";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java
new file mode 100644
index 0000000..979c71c
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+public class ElasticsearchBulkTest extends ElasticsearchBaseTest {
+
+ @Test
+ public void testBulkIndex() throws Exception {
+ List<Map<String, String>> documents = new ArrayList<Map<String, String>>();
+ Map<String, String> document1 = createIndexedData("1");
+ Map<String, String> document2 = createIndexedData("2");
+
+ documents.add(document1);
+ documents.add(document2);
+
+ List<?> indexIds = template.requestBody("direct:bulk_index", documents, List.class);
+ assertNotNull("indexIds should be set", indexIds);
+ assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size());
+ }
+
+ @Test
+ public void bulkIndexListRequestBody() throws Exception {
+ String prefix = createPrefix();
+
+ // given
+ List<Map<String, String>> request = new ArrayList<>();
+ final HashMap<String, String> valueMap = new HashMap<>();
+ valueMap.put("id",prefix+"baz");
+ valueMap.put("content",prefix + "hello");
+ request.add(valueMap);
+ // when
+ @SuppressWarnings("unchecked")
+ List<String> indexedDocumentIds = template.requestBody("direct:bulk_index", request, List.class);
+
+ // then
+ assertThat(indexedDocumentIds, notNullValue());
+ assertThat(indexedDocumentIds.size(), equalTo(1));
+ }
+
+ @Test
+ public void bulkIndexRequestBody() throws Exception {
+ String prefix = createPrefix();
+
+ // given
+ BulkRequest request = new BulkRequest();
+ request.add(new IndexRequest(prefix + "foo", prefix + "bar", prefix + "baz").source("{\"" + prefix + "content\": \"" + prefix + "hello\"}"));
+
+ // when
+ @SuppressWarnings("unchecked")
+ List<String> indexedDocumentIds = template.requestBody("direct:bulk_index", request, List.class);
+
+ // then
+ assertThat(indexedDocumentIds, notNullValue());
+ assertThat(indexedDocumentIds.size(), equalTo(1));
+ assertThat(indexedDocumentIds, hasItem(prefix + "baz"));
+ }
+
+ @Test
+ public void bulkRequestBody() throws Exception {
+ String prefix = createPrefix();
+
+ // given
+ BulkRequest request = new BulkRequest();
+ request.add(new IndexRequest(prefix + "foo", prefix + "bar", prefix + "baz").source("{\"" + prefix + "content\": \"" + prefix + "hello\"}"));
+
+ // when
+ BulkItemResponse[] response = (BulkItemResponse[]) template.requestBody("direct:bulk", request);
+
+ // then
+ assertThat(response, notNullValue());
+ assertEquals(prefix + "baz", response[0].getResponse().getId());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:bulk_index").to("elasticsearch5-rest://elasticsearch?operation=BulkIndex&indexName=twitter&indexType=tweet");
+ from("direct:bulk").to("elasticsearch5-rest://elasticsearch?operation=Bulk&indexName=twitter&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
new file mode 100644
index 0000000..6ecd3f9
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.http.HttpHost;
+import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner.newConfigs;
+
+public class ElasticsearchClusterBaseTest extends CamelTestSupport {
+
+ public static ElasticsearchClusterRunner runner;
+ public static String clusterName;
+ public static RestClient restclient;
+ public static RestHighLevelClient client;
+
+ protected static final int ES_BASE_HTTP_PORT = AvailablePortFinder.getNextAvailable();
+ protected static final int ES_FIRST_NODE_TRANSPORT_PORT = AvailablePortFinder.getNextAvailable(ES_BASE_HTTP_PORT + 1);
+
+ @SuppressWarnings("resource")
+ @BeforeClass
+ public static void cleanUpOnce() throws Exception {
+ deleteDirectory("target/testcluster/");
+ clusterName = "es-cl-run-" + System.currentTimeMillis();
+ // create runner instance
+
+ runner = new ElasticsearchClusterRunner();
+ // create ES nodes
+ runner.onBuild((number, settingsBuilder) -> {
+ settingsBuilder.put("http.cors.enabled", true);
+ settingsBuilder.put("http.cors.allow-origin", "*");
+ settingsBuilder.put("discovery.zen.ping.unicast.hosts", "127.0.0.1:9301,127.0.0.1:9302,127.0.0.1:9303");
+ }).build(newConfigs()
+ .clusterName(clusterName)
+ .numOfNode(3)
+ .baseHttpPort(ES_BASE_HTTP_PORT)
+ .basePath("target/testcluster/")
+ .disableESLogger());
+
+ // wait for green status
+ runner.ensureGreen();
+ restclient = RestClient.builder(new HttpHost(InetAddress.getByName("localhost"),ES_FIRST_NODE_TRANSPORT_PORT)).build();
+ client = new RestHighLevelClient(restclient);
+ }
+
+ @AfterClass
+ public static void teardownOnce() throws Exception {
+ if (restclient != null) {
+ restclient.close();
+ }
+ if (runner != null) {
+ // close runner
+ runner.close();
+ // delete all files
+ runner.clean();
+ }
+ }
+
+ @Override
+ public boolean isCreateCamelContextPerClass() {
+ // let's speed up the tests using the same context
+ return true;
+ }
+
+ /**
+ * As we don't delete the {@code target/data} folder for <b>each</b> test
+ * below (otherwise they would run much slower), we need to make sure
+ * there's no side effect of the same used data through creating unique
+ * indexes.
+ */
+ Map<String, String> createIndexedData(String... additionalPrefixes) {
+ String prefix = createPrefix();
+
+ // take over any potential prefixes we may have been asked for
+ if (additionalPrefixes.length > 0) {
+ StringBuilder sb = new StringBuilder(prefix);
+ for (String additionalPrefix : additionalPrefixes) {
+ sb.append(additionalPrefix).append("-");
+ }
+ prefix = sb.toString();
+ }
+
+ String key = prefix + "key";
+ String value = prefix + "value";
+ log.info("Creating indexed data using the key/value pair {} => {}", key, value);
+
+ Map<String, String> map = new HashMap<>();
+ map.put(key, value);
+ return map;
+ }
+
+ String createPrefix() {
+ // make use of the test method name to avoid collision
+ return getTestMethodName().toLowerCase() + "-";
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java
new file mode 100644
index 0000000..b0430d0
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.elasticsearch.action.get.GetRequest;
+import org.junit.Test;
+
+public class ElasticsearchClusterIndexTest extends ElasticsearchClusterBaseTest {
+
+ @Test
+ public void indexWithIpAndPort() throws Exception {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index);
+ headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "1");
+
+ String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+
+ headers.clear();
+
+ headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index);
+ headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "status");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "2");
+
+ indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+
+ assertEquals("Cluster must be of three nodes", runner.getNodeSize(), 3);
+ assertEquals("Index id 1 must exists", true, client.get(new GetRequest("twitter", "tweet", "1")).isExists());
+ assertEquals("Index id 2 must exists", true, client.get(new GetRequest("twitter", "status", "2")).isExists());
+ }
+
+ @Test
+ public void indexWithSnifferEnable() throws Exception {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index);
+ headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "facebook");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "post");
+ headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "4");
+
+ String indexId = template.requestBodyAndHeaders("direct:indexWithSniffer", map, headers, String.class);
+ assertNotNull("indexId should be set", indexId);
+
+ assertEquals("Cluster must be of three nodes", runner.getNodeSize(), 3);
+ assertEquals("Index id 4 must exists", true, client.get(new GetRequest("facebook", "post", "4")).isExists());
+
+ final BasicResponseHandler responseHandler = new BasicResponseHandler();
+ String body = responseHandler.handleEntity(restclient.performRequest("GET", "/_cluster/health?pretty").getEntity());
+ assertStringContains(body,"\"number_of_data_nodes\" : 3");
+
+ }
+
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:indexWithIpAndPort")
+ .to("elasticsearch5-rest://" + clusterName + "?operation=Index&indexName=twitter&indexType=tweet&hostAddresses=localhost:" + ES_FIRST_NODE_TRANSPORT_PORT);
+ from("direct:indexWithSniffer")
+ .to("elasticsearch5-rest://" + clusterName + "?operation=Index&indexName=twitter&indexType=tweet&enableSniffer=true&hostAddresses=localhost:" + ES_FIRST_NODE_TRANSPORT_PORT);
+ }
+ };
+ }
+}