You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/10/16 18:50:41 UTC

[2/7] camel git commit: CAMEL-11868: New ElasticSearch5 REST component

CAMEL-11868: New ElasticSearch5 REST component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/928f185f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/928f185f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/928f185f

Branch: refs/heads/master
Commit: 928f185f9c5fcb1eb48d8f626b7f569210bca5f7
Parents: 24ae294
Author: fharms <fl...@gmail.com>
Authored: Mon Oct 16 19:26:43 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Oct 16 20:00:24 2017 +0200

----------------------------------------------------------------------
 components/camel-elasticsearch5-rest/pom.xml    | 120 ++++++++
 .../elasticsearch5/ElasticsearchComponent.java  | 233 +++++++++++++++
 .../ElasticsearchConfiguration.java             | 245 +++++++++++++++
 .../elasticsearch5/ElasticsearchConstants.java  |  37 +++
 .../elasticsearch5/ElasticsearchEndpoint.java   |  60 ++++
 .../elasticsearch5/ElasticsearchOperation.java  |  55 ++++
 .../elasticsearch5/ElasticsearchProducer.java   | 298 +++++++++++++++++++
 .../BulkRequestAggregationStrategy.java         |  52 ++++
 .../ElasticsearchActionRequestConverter.java    | 206 +++++++++++++
 .../apache/camel/component/elasticsearch5-rest  |  18 ++
 .../elasticsearch5/ElasticsearchBaseTest.java   | 124 ++++++++
 .../elasticsearch5/ElasticsearchBulkTest.java   | 113 +++++++
 .../ElasticsearchClusterBaseTest.java           | 120 ++++++++
 .../ElasticsearchClusterIndexTest.java          |  90 ++++++
 ...icsearchGetSearchDeleteExistsUpdateTest.java | 288 ++++++++++++++++++
 .../elasticsearch5/ElasticsearchIndexTest.java  |  90 ++++++
 .../src/test/resources/log4j2.properties        |   7 +
 components/pom.xml                              |   1 +
 parent/pom.xml                                  |   3 +-
 .../spring-boot/components-starter/pom.xml      |   1 +
 20 files changed, 2160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/pom.xml b/components/camel-elasticsearch5-rest/pom.xml
new file mode 100644
index 0000000..77c4516
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.20.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.camel</groupId>
+  <artifactId>camel-elasticsearch5-rest</artifactId>
+  <packaging>jar</packaging>
+  <name>Camel :: ElasticSearch5 :: REST</name>
+  <description>Camel ElasticSearch 5.x REST support</description>
+
+  <properties>
+    <elasticsearch.version>${elasticsearch5-version}</elasticsearch.version>
+    <camel.osgi.export.pkg>org.apache.camel.component.elasticsearch5.*;${camel.osgi.version}</camel.osgi.export.pkg>
+    <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=elasticsearch-rest</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>elasticsearch-rest-high-level-client</artifactId>
+      <version>${elasticsearch5-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>elasticsearch-rest-client-sniffer</artifactId>
+      <version>${elasticsearch-sniffer-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson2-version}</version>
+    </dependency>
+
+    <!-- for testing -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.client</groupId>
+      <artifactId>transport</artifactId>
+      <version>${elasticsearch5-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codelibs</groupId>
+      <artifactId>elasticsearch-cluster-runner</artifactId>
+      <version>${elasticsearch5-cluster-runner-version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- logging -->
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-1.2-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <es.path.data>target/data</es.path.data>
+          </systemPropertyVariables>
+          <forkCount>1</forkCount>
+          <reuseForks>false</reuseForks>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
new file mode 100644
index 0000000..79eca0b
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchComponent.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.Metadata;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * Represents the component that manages {@link ElasticsearchEndpoint}.
+ */
+public class ElasticsearchComponent extends DefaultComponent {
+
+    @Metadata(label = "advanced")
+    private RestClient client;
+    @Metadata(label = "advanced")
+    private String hostAddresses;
+    @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT)
+    private int socketTimeout = ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT;
+    @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.MAX_RETRY_TIMEOUT)
+    private int maxRetryTimeout = ElasticsearchConstants.MAX_RETRY_TIMEOUT;
+    @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT)
+    private int connectionTimeout = ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT;
+
+    @Metadata(label = "advance")
+    private String user;
+    @Metadata(secret = true)
+    private String password;
+    @Metadata(label = "advanced", defaultValue = "false")
+    private boolean enableSSL;
+    @Metadata(label = "advanced", defaultValue = "false")
+    private boolean enableSniffer;
+    @Metadata(label = "advanced", defaultValue = "" + ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL)
+    private int snifferInterval = ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL;
+    @Metadata(label = "advanced", defaultValue = "" +  ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY)
+    private int sniffAfterFailureDelay = ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY;
+
+    public ElasticsearchComponent() {
+        super();
+    }
+
+    public ElasticsearchComponent(CamelContext context) {
+        super(context);
+    }
+
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        ElasticsearchConfiguration config = new ElasticsearchConfiguration();
+        config.setHostAddresses(this.getHostAddresses());
+        config.setSocketTimeout(this.getSocketTimeout());
+        config.setMaxRetryTimeout(this.getMaxRetryTimeout());
+        config.setConnectionTimeout(this.getConnectionTimeout());
+        config.setUser(this.getUser());
+        config.setEnableSSL(this.getEnableSSL());
+        config.setPassword(this.getPassword());
+        config.setEnableSniffer(this.getEnableSniffer());
+        config.setSnifferInterval(this.getSnifferInterval());
+        config.setSniffAfterFailureDelay(this.getSniffAfterFailureDelay());
+        config.setClusterName(remaining);
+
+        setProperties(config, parameters);
+        config.setHostAddressesList(parseHostAddresses(config.getHostAddresses(), config));
+
+        Endpoint endpoint = new ElasticsearchEndpoint(uri, this, config, client);
+        return endpoint;
+    }
+    
+    private List<HttpHost> parseHostAddresses(String ipsString, ElasticsearchConfiguration config) throws UnknownHostException {
+        if (ipsString == null || ipsString.isEmpty()) {
+            return null;
+        }
+        List<String> addressesStr = Arrays.asList(ipsString.split(","));
+        List<HttpHost> addressesTrAd = new ArrayList<>(addressesStr.size());
+        for (String address : addressesStr) {
+            String[] split = address.split(":");
+            String hostname;
+            if (split.length > 0) {
+                hostname = split[0];
+            } else {
+                throw new IllegalArgumentException();
+            }
+            Integer port = split.length > 1 ? Integer.parseInt(split[1]) : ElasticsearchConstants.DEFAULT_PORT;
+            addressesTrAd.add(new HttpHost(hostname, port, config.getEnableSSL() ? "HTTPS" : "HTTP"));
+        }
+        return addressesTrAd;
+    }
+
+    public RestClient getClient() {
+        return client;
+    }
+
+    /**
+     * To use an existing configured Elasticsearch client, instead of creating a client per endpoint.
+     * This allow to customize the client with specific settings.
+     */
+    public void setClient(RestClient client) {
+        this.client = client;
+    }
+    /**
+     * Comma separated list with ip:port formatted remote transport addresses to use.
+     * The ip and port options must be left blank for hostAddresses to be considered instead.
+     */
+    public String getHostAddresses() {
+        return hostAddresses;
+    }
+
+    public void setHostAddresses(String hostAddresses) {
+        this.hostAddresses = hostAddresses;
+    }
+
+    /**
+     * The timeout in ms to wait before the socket will timeout.
+     */
+    public int getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(int socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    /**
+     *  The time in ms to wait before connection will timeout.
+     */
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    /**
+     *  Basic authenticate user
+     */
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    /**
+     *  Password for authenticate
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * Enable SSL
+     */
+    public Boolean getEnableSSL() {
+        return enableSSL;
+    }
+
+    public void setEnableSSL(Boolean enableSSL) {
+        this.enableSSL = enableSSL;
+    }
+
+    /**
+     * The time in ms before retry
+     */
+    public int getMaxRetryTimeout() {
+        return maxRetryTimeout;
+    }
+
+    public void setMaxRetryTimeout(int maxRetryTimeout) {
+        this.maxRetryTimeout = maxRetryTimeout;
+    }
+
+    /**
+     * Enable automatically discover nodes from a running Elasticsearch cluster
+     */
+    public Boolean getEnableSniffer() {
+        return enableSniffer;
+    }
+
+    public void setEnableSniffer(Boolean enableSniffer) {
+        this.enableSniffer = enableSniffer;
+    }
+
+    /**
+     * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when
+     * sniffOnFailure is disabled or when there are no failures between consecutive sniff executions
+     */
+    public int getSnifferInterval() {
+        return snifferInterval;
+    }
+
+    public void setSnifferInterval(int snifferInterval) {
+        this.snifferInterval = snifferInterval;
+    }
+
+    /**
+     * The delay of a sniff execution scheduled after a failure (in milliseconds)
+     */
+    public int getSniffAfterFailureDelay() {
+        return sniffAfterFailureDelay;
+    }
+
+    public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
+        this.sniffAfterFailureDelay = sniffAfterFailureDelay;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
new file mode 100644
index 0000000..e1f109b
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConfiguration.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.util.List;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+import org.apache.http.HttpHost;
+
+@UriParams
+public class ElasticsearchConfiguration {
+
+    private List<HttpHost> hostAddressesList;
+
+    @UriPath @Metadata(required = "true")
+    private String clusterName;
+    @UriParam
+    private ElasticsearchOperation operation;
+    @UriParam
+    private String indexName;
+    @UriParam
+    private String indexType;
+    @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS)
+    private int waitForActiveShards = ElasticsearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS;
+    @UriParam @Metadata(required = "true")
+    private String hostAddresses;
+    @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT)
+    private int socketTimeout = ElasticsearchConstants.DEFAULT_SOCKET_TIMEOUT;
+    @UriParam(defaultValue = "" + ElasticsearchConstants.MAX_RETRY_TIMEOUT)
+    private int maxRetryTimeout = ElasticsearchConstants.MAX_RETRY_TIMEOUT;
+    @UriParam(defaultValue = "" + ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT)
+    private int connectionTimeout = ElasticsearchConstants.DEFAULT_CONNECTION_TIMEOUT;
+    @UriParam(defaultValue = "false")
+    private boolean disconnect;
+
+    private String user;
+    private String password;
+    private boolean enableSSL;
+    //Sniffer parameter.
+    private boolean enableSniffer;
+    private int snifferInterval = ElasticsearchConstants.DEFAULT_SNIFFER_INTERVAL;
+    private int sniffAfterFailureDelay = ElasticsearchConstants.DEFAULT_AFTER_FAILURE_DELAY;
+    /**
+     * Name of the cluster
+     */
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    /**
+     * What operation to perform
+     */
+    public ElasticsearchOperation getOperation() {
+        return operation;
+    }
+
+    public void setOperation(ElasticsearchOperation operation) {
+        this.operation = operation;
+    }
+
+    /**
+     * The name of the index to act against
+     */
+    public String getIndexName() {
+        return indexName;
+    }
+
+    public void setIndexName(String indexName) {
+        this.indexName = indexName;
+    }
+
+    /**
+     * The type of the index to act against
+     */
+    public String getIndexType() {
+        return indexType;
+    }
+
+    public void setIndexType(String indexType) {
+        this.indexType = indexType;
+    }
+
+    /**
+     * Comma separated list with ip:port formatted remote transport addresses to use.
+     * The ip and port options must be left blank for hostAddresses to be considered instead.
+     */
+    public String getHostAddresses() {
+        return hostAddresses;
+    }
+
+    public void setHostAddresses(String hostAddresses) {
+        this.hostAddresses = hostAddresses;
+    }
+
+    /**
+     * Index creation waits for the write consistency number of shards to be available
+     */
+    public int getWaitForActiveShards() {
+        return waitForActiveShards;
+    }
+
+    public void setWaitForActiveShards(int waitForActiveShards) {
+        this.waitForActiveShards = waitForActiveShards;
+    }
+
+    public List<HttpHost> getHostAddressesList() {
+        return hostAddressesList;
+    }
+
+    public void setHostAddressesList(List<HttpHost> hostAddressesList) {
+        this.hostAddressesList = hostAddressesList;
+    }
+
+    /**
+     * The timeout in ms to wait before the socket will timeout.
+     */
+    public int getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(int socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    /**
+     *  The time in ms to wait before connection will timeout.
+     */
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    /**
+     *  Basic authenticate user
+     */
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    /**
+     *  Password for authenticate
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    /**
+     * Enable SSL
+     */
+    public Boolean getEnableSSL() {
+        return enableSSL;
+    }
+
+    public void setEnableSSL(Boolean enableSSL) {
+        this.enableSSL = enableSSL;
+    }
+
+    /**
+     * The time in ms before retry
+     */
+    public int getMaxRetryTimeout() {
+        return maxRetryTimeout;
+    }
+
+    public void setMaxRetryTimeout(int maxRetryTimeout) {
+        this.maxRetryTimeout = maxRetryTimeout;
+    }
+
+    /**
+     * Disconnect after it finish calling the producer
+     */
+    public Boolean getDisconnect() {
+        return disconnect;
+    }
+
+    public void setDisconnect(Boolean disconnect) {
+        this.disconnect = disconnect;
+    }
+
+    /**
+     * Enable automatically discover nodes from a running Elasticsearch cluster
+     */
+    public Boolean getEnableSniffer() {
+        return enableSniffer;
+    }
+
+    public void setEnableSniffer(Boolean enableSniffer) {
+        this.enableSniffer = enableSniffer;
+    }
+
+    /**
+     * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when
+     * sniffOnFailure is disabled or when there are no failures between consecutive sniff executions
+     */
+    public int getSnifferInterval() {
+        return snifferInterval;
+    }
+
+    public void setSnifferInterval(int snifferInterval) {
+        this.snifferInterval = snifferInterval;
+    }
+
+    /**
+     * The delay of a sniff execution scheduled after a failure (in milliseconds)
+     */
+    public int getSniffAfterFailureDelay() {
+        return sniffAfterFailureDelay;
+    }
+
+    public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
+        this.sniffAfterFailureDelay = sniffAfterFailureDelay;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
new file mode 100644
index 0000000..9f2d493
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchConstants.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.elasticsearch5;
+
+
+public interface ElasticsearchConstants {
+
+    String PARAM_OPERATION = "operation";
+    String PARAM_INDEX_ID = "indexId";
+    String PARAM_INDEX_NAME = "indexName";
+    String PARAM_INDEX_TYPE = "indexType";
+    String PARAM_WAIT_FOR_ACTIVE_SHARDS = "waitForActiveShards";
+
+    int    DEFAULT_PORT = 9200;
+    int    DEFAULT_FOR_WAIT_ACTIVE_SHARDS = 1; // Meaning only wait for the primary shard
+    int    DEFAULT_SOCKET_TIMEOUT = 30000; // Meaning how long time to wait before the socket timeout
+    int    MAX_RETRY_TIMEOUT = 30000; // Meaning how long to wait before retry again
+    int    DEFAULT_CONNECTION_TIMEOUT = 30000; // Meaning how many seconds before it timeout when establish connection
+    int    DEFAULT_SNIFFER_INTERVAL = 60000 * 5; // Meaning how often it should search for elasticsearch nodes
+    int    DEFAULT_AFTER_FAILURE_DELAY = 60000; // Meaning when should the sniff execution scheduled after a failure
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
new file mode 100644
index 0000000..37bafd6
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchEndpoint.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * The elasticsearch component is used for interfacing with ElasticSearch server using 5.x REST API.
+ */
+@UriEndpoint(firstVersion = "2.21.0", scheme = "elasticsearch5-rest", title = "Elastichsearch Rest or Elasticsearch 5 Rest",
+    syntax = "elasticsearch5-rest:clusterName", producerOnly = true, label = "monitoring,search")
+public class ElasticsearchEndpoint extends DefaultEndpoint {
+
+    @UriParam
+    protected final ElasticsearchConfiguration configuration;
+
+    private RestClient client;
+
+    public ElasticsearchEndpoint(String uri, ElasticsearchComponent component, ElasticsearchConfiguration config, RestClient client) throws Exception {
+        super(uri, component);
+        this.configuration = config;
+        this.client = client;
+    }
+
+    public Producer createProducer() throws Exception {
+        return new ElasticsearchProducer(this, configuration);
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("Cannot consume from an ElasticsearchEndpoint: " + getEndpointUri());
+    }
+    
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public RestClient getClient() {
+        return client;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java
new file mode 100644
index 0000000..11b57f8
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchOperation.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+/**
+ * The ElasticSearch server operations list which are implemented
+ * 
+ * Index        - Index a document associated with a given index and type
+ * Update       - Updates a document based on a script
+ * Bulk         - Executes a bulk of index / delete operations
+ * BulkIndex   - Executes a bulk of index / delete operations
+ * GetById    - Gets the document that was indexed from an index with a type and id
+ * MultiGet     - Multiple get documents
+ * Delete       - Deletes a document from the index based on the index, type and id
+ * Search       - Search across one or more indices and one or more types with a query
+ * Exists       - Checks the index exists or not (using search with size=0 and terminate_after=1 parameters)
+ * 
+ */
+public enum ElasticsearchOperation {
+    Index("Index"),
+    Update("Update"),
+    Bulk("Bulk"),
+    BulkIndex("BulkIndex"),
+    GetById("GetById"),
+    MultiGet("MultiGet"),
+    Delete("Delete"),
+    DeleteIndex("DeleteIndex"),
+    Search("Search"),
+    Exists("Exists");
+
+    private final String text;
+
+    ElasticsearchOperation(final String text) {
+        this.text = text;
+    }
+
+    @Override
+    public String toString() {
+        return text;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
new file mode 100644
index 0000000..f87fb54
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/ElasticsearchProducer.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.lang.reflect.InvocationTargetException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.elasticsearch5.converter.ElasticsearchActionRequestConverter;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.IOHelper;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.MultiGetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.elasticsearch.client.sniff.SnifferBuilder;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents an Elasticsearch producer.
+ */
+public class ElasticsearchProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class);
+
+    protected final ElasticsearchConfiguration configuration;
+    private RestClient client;
+    private Sniffer sniffer;
+
+    public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) {
+        super(endpoint);
+        this.configuration = configuration;
+        this.client = endpoint.getClient();
+    }
+
+    private ElasticsearchOperation resolveOperation(Exchange exchange) {
+        // 1. Operation can be driven by either (in order of preference):
+        // a. If the body is an ActionRequest the operation is set by the type
+        // of request.
+        // b. If the body is not an ActionRequest, the operation is set by the
+        // header if it exists.
+        // c. If neither the operation can not be derived from the body or
+        // header, the configuration is used.
+        // In the event we can't discover the operation from a, b or c we throw
+        // an error.
+        Object request = exchange.getIn().getBody();
+        if (request instanceof IndexRequest) {
+            return ElasticsearchOperation.Index;
+        } else if (request instanceof GetRequest) {
+            return ElasticsearchOperation.GetById;
+        } else if (request instanceof MultiGetRequest) {
+            return ElasticsearchOperation.MultiGet;
+        } else if (request instanceof UpdateRequest) {
+            return ElasticsearchOperation.Update;
+        } else if (request instanceof BulkRequest) {
+            // do we want bulk or bulk_index?
+            if (configuration.getOperation() == ElasticsearchOperation.BulkIndex) {
+                return configuration.getOperation().BulkIndex;
+            } else {
+                return configuration.getOperation().Bulk;
+            }
+        } else if (request instanceof DeleteRequest) {
+            return ElasticsearchOperation.Delete;
+        } else if (request instanceof SearchRequest) {
+            return ElasticsearchOperation.Search;
+        } else if (request instanceof DeleteIndexRequest) {
+            return ElasticsearchOperation.DeleteIndex;
+        }
+
+        ElasticsearchOperation operationConfig = exchange.getIn().getHeader(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.class);
+        if (operationConfig == null) {
+            operationConfig = configuration.getOperation();
+        }
+        if (operationConfig == null) {
+            throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported");
+        }
+        return operationConfig;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        if (configuration.getDisconnect() && client == null) {
+            startClient();
+        }
+        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(client);
+        // 2. Index and type will be set by:
+        // a. If the incoming body is already an action request
+        // b. If the body is not an action request we will use headers if they
+        // are set.
+        // c. If the body is not an action request and the headers aren't set we
+        // will use the configuration.
+        // No error is thrown by the component in the event none of the above
+        // conditions are met. The java es client
+        // will throw.
+
+        Message message = exchange.getIn();
+        final ElasticsearchOperation operation = resolveOperation(exchange);
+
+        // Set the index/type headers on the exchange if necessary. This is used
+        // for type conversion.
+        boolean configIndexName = false;
+        String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
+        if (indexName == null) {
+            message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
+            configIndexName = true;
+        }
+
+        boolean configIndexType = false;
+        String indexType = message.getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class);
+        if (indexType == null) {
+            message.setHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, configuration.getIndexType());
+            configIndexType = true;
+        }
+
+        boolean configWaitForActiveShards = false;
+        Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
+        if (waitForActiveShards == null) {
+            message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
+            configWaitForActiveShards = true;
+        }
+
+        if (operation == ElasticsearchOperation.Index) {
+            IndexRequest indexRequest = ElasticsearchActionRequestConverter.toIndexRequest(message.getBody(), exchange);
+            message.setBody(restHighLevelClient.index(indexRequest).getId());
+        } else if (operation == ElasticsearchOperation.Update) {
+            UpdateRequest updateRequest = ElasticsearchActionRequestConverter.toUpdateRequest(message.getBody(Map.class), exchange);
+            message.setBody(restHighLevelClient.update(updateRequest).getId());
+        } else if (operation == ElasticsearchOperation.GetById) {
+            GetRequest getRequest = ElasticsearchActionRequestConverter.toGetRequest(message.getBody(), exchange);
+            message.setBody(restHighLevelClient.get(getRequest));
+        } else if (operation == ElasticsearchOperation.Bulk) {
+            BulkRequest bulkRequest = message.getBody(BulkRequest.class);
+            message.setBody(restHighLevelClient.bulk(bulkRequest).getItems());
+        } else if (operation == ElasticsearchOperation.BulkIndex) {
+            BulkRequest bulkRequest = ElasticsearchActionRequestConverter.toBulkRequest(message.getBody(), exchange);
+            List<String> indexedIds = Arrays.stream(restHighLevelClient.bulk(bulkRequest).getItems())
+                .map(BulkItemResponse::getId)
+                .collect(Collectors.toList());
+            message.setBody(indexedIds);
+        } else if (operation == ElasticsearchOperation.Delete) {
+            DeleteRequest deleteRequest = ElasticsearchActionRequestConverter.toDeleteRequest(message.getBody(), exchange);
+            message.setBody(restHighLevelClient.delete(deleteRequest).getResult());
+        } else if (operation == ElasticsearchOperation.DeleteIndex) {
+            DeleteRequest deleteRequest = ElasticsearchActionRequestConverter.toDeleteRequest(message.getBody(), exchange);
+            message.setBody(client.performRequest("Delete", deleteRequest.index()).getStatusLine().getStatusCode());
+        } else if (operation == ElasticsearchOperation.Exists) {
+            // ExistsRequest API is deprecated, using SearchRequest instead with size=0 and terminate_after=1
+            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+            sourceBuilder.size(0);
+            sourceBuilder.terminateAfter(1);
+            SearchRequest searchRequest = new SearchRequest(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
+            searchRequest.source(sourceBuilder);
+            try {
+                restHighLevelClient.search(searchRequest);
+                message.setBody(true);
+            } catch (ElasticsearchStatusException e) {
+                if (e.status().equals(RestStatus.NOT_FOUND)) {
+                    message.setBody(false);
+                } else {
+                    throw new IllegalStateException(e);
+                }
+
+            }
+        } else if (operation == ElasticsearchOperation.Search) {
+            SearchRequest searchRequest = ElasticsearchActionRequestConverter.toSearchRequest(message.getBody(), exchange);
+            message.setBody(restHighLevelClient.search(searchRequest).getHits());
+        } else {
+            throw new IllegalArgumentException(ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
+        }
+
+        // If we set params via the configuration on this exchange, remove them
+        // now. This preserves legacy behavior for this component and enables a
+        // use case where one message can be sent to multiple elasticsearch
+        // endpoints where the user is relying on the endpoint configuration
+        // (index/type) rather than header values. If we do not clear this out
+        // sending the same message (index request, for example) to multiple
+        // elasticsearch endpoints would have the effect overriding any
+        // subsequent endpoint index/type with the first endpoint index/type.
+        if (configIndexName) {
+            message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+        }
+
+        if (configIndexType) {
+            message.removeHeader(ElasticsearchConstants.PARAM_INDEX_TYPE);
+        }
+
+        if (configWaitForActiveShards) {
+            message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+        }
+        if (configuration.getDisconnect()) {
+            IOHelper.close(client);
+            client = null;
+            if (configuration.getEnableSniffer()) {
+                IOHelper.close(sniffer);
+                sniffer = null;
+            }
+        }
+
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (!configuration.getDisconnect()) {
+            startClient();
+        }
+    }
+
+    private void startClient() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException, UnknownHostException {
+        if (client == null) {
+            LOG.info("Connecting to the ElasticSearch cluster: " + configuration.getClusterName());
+            if (configuration.getHostAddressesList() != null
+                && !configuration.getHostAddressesList().isEmpty()) {
+                client = createClient();
+            } else {
+                LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+            }
+        }
+    }
+
+    private RestClient createClient() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+        final RestClientBuilder builder = RestClient.builder(configuration.getHostAddressesList().toArray(new HttpHost[0]));
+        builder.setMaxRetryTimeoutMillis(configuration.getMaxRetryTimeout());
+        builder.setRequestConfigCallback(requestConfigBuilder ->
+            requestConfigBuilder.setConnectTimeout(configuration.getConnectionTimeout()).setSocketTimeout(configuration.getSocketTimeout()));
+        if (configuration.getUser() != null && configuration.getPassword() != null) {
+            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(configuration.getUser(), configuration.getPassword()));
+            builder.setHttpClientConfigCallback(httpClientBuilder -> {
+                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                return httpClientBuilder;
+            });
+        }
+        final RestClient restClient = builder.build();
+        if (configuration.getEnableSniffer()) {
+            SnifferBuilder snifferBuilder = Sniffer.builder(restClient);
+            snifferBuilder.setSniffIntervalMillis(configuration.getSnifferInterval());
+            snifferBuilder.setSniffAfterFailureDelayMillis(configuration.getSniffAfterFailureDelay());
+            sniffer = snifferBuilder.build();
+        }
+        return restClient;
+    }
+
+
+    @Override
+    protected void doStop() throws Exception {
+        if (client != null) {
+            LOG.info("Disconnecting from ElasticSearch cluster: {}", configuration.getClusterName());
+            client.close();
+            if (sniffer != null) {
+                sniffer.close();
+            }
+        }
+        super.doStop();
+    }
+
+    public RestClient getClient() {
+        return client;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java
new file mode 100644
index 0000000..9f60eb4
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/aggregation/BulkRequestAggregationStrategy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5.aggregation;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadRuntimeException;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+
+/**
+ * Aggregates two {@link ActionRequest}s into a single {@link BulkRequest}.
+ */
+public class BulkRequestAggregationStrategy implements AggregationStrategy {
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter.
+        Object objBody = newExchange.getIn().getBody();
+        if (!(objBody instanceof DocWriteRequest[])) {
+            throw new InvalidPayloadRuntimeException(newExchange, DocWriteRequest[].class);
+        }
+
+        DocWriteRequest[] newBody = (DocWriteRequest[]) objBody;
+        BulkRequest request;
+        if (oldExchange == null) {
+            request = new BulkRequest();
+            request.add(newBody);
+            newExchange.getIn().setBody(request);
+            return newExchange;
+        } else {
+            request = oldExchange.getIn().getBody(BulkRequest.class);
+            request.add(newBody);
+            return oldExchange;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java
new file mode 100644
index 0000000..bec9583
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/java/org/apache/camel/component/elasticsearch5/converter/ElasticsearchActionRequestConverter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5.converter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.elasticsearch5.ElasticsearchConstants;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.MultiSearchRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ElasticsearchActionRequestConverter {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchActionRequestConverter.class);
+
+    private static final String ES_QUERY_DSL_PREFIX = "query";
+    private static final String PARENT = "parent";
+
+    private ElasticsearchActionRequestConverter() {
+    }
+
+    // Update requests
+    private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) {
+        if (document instanceof UpdateRequest) {
+            return (UpdateRequest) document;
+        }
+        UpdateRequest updateRequest = new UpdateRequest();
+        if (document instanceof byte[]) {
+            updateRequest.doc((byte[]) document);
+        } else if (document instanceof Map) {
+            updateRequest.doc((Map<String, Object>) document);
+        } else if (document instanceof String) {
+            updateRequest.doc((String) document);
+        } else if (document instanceof XContentBuilder) {
+            updateRequest.doc((XContentBuilder) document);
+        } else {
+            return null;
+        }
+
+        return updateRequest
+            .waitForActiveShards(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class))
+            .parent(exchange.getIn().getHeader(
+                PARENT, String.class))
+            .index(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+            .type(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_INDEX_TYPE, String.class))
+            .id(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+    }
+
+    // Index requests
+    private static IndexRequest createIndexRequest(Object document, Exchange exchange) {
+        if (document instanceof IndexRequest) {
+            return (IndexRequest) document;
+        }
+        IndexRequest indexRequest = new IndexRequest();
+        if (document instanceof byte[]) {
+            indexRequest.source((byte[]) document, XContentFactory.xContentType((byte[]) document));
+        } else if (document instanceof Map) {
+            indexRequest.source((Map<String, Object>) document);
+        } else if (document instanceof String) {
+            indexRequest.source((String) document, XContentFactory.xContentType((String) document));
+        } else if (document instanceof XContentBuilder) {
+            indexRequest.source((XContentBuilder) document);
+        } else {
+            return null;
+        }
+
+        return indexRequest
+            .waitForActiveShards(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class))
+            .parent(exchange.getIn().getHeader(
+                PARENT, String.class))
+            .index(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+            .type(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_INDEX_TYPE, String.class));
+    }
+
+    public static IndexRequest toIndexRequest(Object document, Exchange exchange) {
+        return createIndexRequest(document, exchange)
+            .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+    }
+
+    public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) {
+        return createUpdateRequest(document, exchange)
+            .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
+    }
+
+    public static GetRequest toGetRequest(Object document, Exchange exchange) {
+        if (document instanceof GetRequest) {
+            return (GetRequest) document;
+        }
+        return new GetRequest(exchange.getIn().getHeader(
+            ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+            .type(exchange.getIn().getHeader(
+                ElasticsearchConstants.PARAM_INDEX_TYPE,
+                String.class)).id((String) document);
+    }
+
+    public static DeleteRequest toDeleteRequest(Object document, Exchange exchange) {
+        if (document instanceof DeleteRequest) {
+            return (DeleteRequest) document;
+        }
+        if (document instanceof String) {
+            return new DeleteRequest()
+                .index(exchange.getIn().getHeader(
+                    ElasticsearchConstants.PARAM_INDEX_NAME,
+                    String.class))
+                .type(exchange.getIn().getHeader(
+                    ElasticsearchConstants.PARAM_INDEX_TYPE,
+                    String.class)).id((String) document);
+        } else {
+            throw new IllegalArgumentException("Wrong body type. Only DeleteRequest or String is allowed as a type");
+        }
+    }
+
+    public static SearchRequest toSearchRequest(Object queryObject, Exchange exchange) throws IOException {
+        SearchRequest searchRequest = new SearchRequest(exchange.getIn()
+            .getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
+            .types(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, String.class));
+
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        String queryText = null;
+
+        if (queryObject instanceof Map<?, ?>) {
+            Map<String, Object> mapQuery = (Map<String, Object>) queryObject;
+            // Remove 'query' prefix from the query object for backward compatibility
+            if (mapQuery.containsKey(ES_QUERY_DSL_PREFIX)) {
+                mapQuery = (Map<String, Object>) mapQuery.get(ES_QUERY_DSL_PREFIX);
+            }
+            try {
+                XContentBuilder contentBuilder = XContentFactory.contentBuilder(XContentType.JSON);
+                queryText = contentBuilder.map(mapQuery).string();
+            } catch (IOException e) {
+                LOG.error(e.getMessage());
+            }
+        } else if (queryObject instanceof String) {
+            queryText = (String) queryObject;
+            ObjectMapper mapper = new ObjectMapper();
+            JsonNode jsonTextObject = mapper.readValue(queryText, JsonNode.class);
+            JsonNode parentJsonNode = jsonTextObject.get(ES_QUERY_DSL_PREFIX);
+            if (parentJsonNode != null) {
+                queryText = parentJsonNode.toString();
+            }
+        } else {
+            // Cannot convert the queryObject into SearchRequest
+            return null;
+        }
+
+        searchSourceBuilder.query(QueryBuilders.wrapperQuery(queryText));
+        searchRequest.source(searchSourceBuilder);
+
+        return searchRequest;
+    }
+
+    public static BulkRequest toBulkRequest(Object documents, Exchange exchange) {
+        if (documents instanceof BulkRequest) {
+            return (BulkRequest) documents;
+        }
+        if (documents instanceof List) {
+            BulkRequest request = new BulkRequest();
+            for (Object document : (List<Object>) documents) {
+                request.add(createIndexRequest(document, exchange));
+            }
+            return request;
+        } else {
+            throw new IllegalArgumentException("Wrong body type. Only BulkRequest or List is allowed as a type");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest b/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest
new file mode 100644
index 0000000..13cc909
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/main/resources/META-INF/services/org/apache/camel/component/elasticsearch5-rest
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.elasticsearch5.ElasticsearchComponent
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
new file mode 100644
index 0000000..f0496e5
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBaseTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.http.HttpHost;
+import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner;
+import org.elasticsearch.client.RestClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner.newConfigs;
+
+public class ElasticsearchBaseTest extends CamelTestSupport {
+
+
+    public static ElasticsearchClusterRunner runner;
+    public static String clusterName;
+    public static RestClient client;
+
+    protected static final int ES_BASE_TRANSPORT_PORT = AvailablePortFinder.getNextAvailable();
+    protected static final int ES_BASE_HTTP_PORT = AvailablePortFinder.getNextAvailable(ES_BASE_TRANSPORT_PORT+1);
+
+    @SuppressWarnings("resource")
+    @BeforeClass
+    public static void cleanupOnce() throws Exception {
+        deleteDirectory("target/testcluster/");
+        clusterName = "es-cl-run-" + System.currentTimeMillis();
+
+        runner = new ElasticsearchClusterRunner();
+        // create ES nodes
+        runner.onBuild((number, settingsBuilder) -> {
+            settingsBuilder.put("http.cors.enabled", true);
+            settingsBuilder.put("http.cors.allow-origin", "*");
+        }).build(newConfigs()
+            .clusterName(clusterName)
+            .numOfNode(1)
+            .baseHttpPort(ES_BASE_TRANSPORT_PORT)
+            .basePath("target/testcluster/"));
+
+        // wait for green status
+        runner.ensureGreen();
+        client = RestClient.builder(new HttpHost(InetAddress.getByName("localhost"),ES_BASE_HTTP_PORT)).build();
+    }
+
+    @AfterClass
+    public static void teardownOnce() throws IOException {
+        if (client != null) {
+            client.close();
+        }
+        if (runner != null) {
+            runner.close();
+        }
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        // let's speed up the tests using the same context
+        return true;
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        final ElasticsearchComponent elasticsearchComponent = new ElasticsearchComponent();
+        elasticsearchComponent.setHostAddresses("localhost:"+ES_BASE_HTTP_PORT);
+        context.addComponent("elasticsearch5-rest", elasticsearchComponent);
+        return context;
+    }
+
+    /**
+     * As we don't delete the {@code target/data} folder for <b>each</b> test
+     * below (otherwise they would run much slower), we need to make sure
+     * there's no side effect of the same used data through creating unique
+     * indexes.
+     */
+    Map<String, String> createIndexedData(String... additionalPrefixes) {
+        String prefix = createPrefix();
+
+        // take over any potential prefixes we may have been asked for
+        if (additionalPrefixes.length > 0) {
+            StringBuilder sb = new StringBuilder(prefix);
+            for (String additionalPrefix : additionalPrefixes) {
+                sb.append(additionalPrefix).append("-");
+            }
+            prefix = sb.toString();
+        }
+
+        String key = prefix + "key";
+        String value = prefix + "value";
+        log.info("Creating indexed data using the key/value pair {} => {}", key, value);
+
+        Map<String, String> map = new HashMap<String, String>();
+        map.put(key, value);
+        return map;
+    }
+
+    String createPrefix() {
+        // make use of the test method name to avoid collision
+        return getTestMethodName().toLowerCase() + "-";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java
new file mode 100644
index 0000000..979c71c
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchBulkTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+public class ElasticsearchBulkTest extends ElasticsearchBaseTest {
+
+    @Test
+    public void testBulkIndex() throws Exception {
+        List<Map<String, String>> documents = new ArrayList<Map<String, String>>();
+        Map<String, String> document1 = createIndexedData("1");
+        Map<String, String> document2 = createIndexedData("2");
+
+        documents.add(document1);
+        documents.add(document2);
+
+        List<?> indexIds = template.requestBody("direct:bulk_index", documents, List.class);
+        assertNotNull("indexIds should be set", indexIds);
+        assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size());
+    }
+
+    @Test
+    public void bulkIndexListRequestBody() throws Exception {
+        String prefix = createPrefix();
+
+        // given
+        List<Map<String, String>> request = new ArrayList<>();
+        final HashMap<String, String> valueMap = new HashMap<>();
+        valueMap.put("id",prefix+"baz");
+        valueMap.put("content",prefix + "hello");
+        request.add(valueMap);
+        // when
+        @SuppressWarnings("unchecked")
+        List<String> indexedDocumentIds = template.requestBody("direct:bulk_index", request, List.class);
+
+        // then
+        assertThat(indexedDocumentIds, notNullValue());
+        assertThat(indexedDocumentIds.size(), equalTo(1));
+    }
+
+    @Test
+    public void bulkIndexRequestBody() throws Exception {
+        String prefix = createPrefix();
+
+        // given
+        BulkRequest request = new BulkRequest();
+        request.add(new IndexRequest(prefix + "foo", prefix + "bar", prefix + "baz").source("{\"" + prefix + "content\": \"" + prefix + "hello\"}"));
+
+        // when
+        @SuppressWarnings("unchecked")
+        List<String> indexedDocumentIds = template.requestBody("direct:bulk_index", request, List.class);
+
+        // then
+        assertThat(indexedDocumentIds, notNullValue());
+        assertThat(indexedDocumentIds.size(), equalTo(1));
+        assertThat(indexedDocumentIds, hasItem(prefix + "baz"));
+    }
+
+    @Test
+    public void bulkRequestBody() throws Exception {
+        String prefix = createPrefix();
+
+        // given
+        BulkRequest request = new BulkRequest();
+        request.add(new IndexRequest(prefix + "foo", prefix + "bar", prefix + "baz").source("{\"" + prefix + "content\": \"" + prefix + "hello\"}"));
+
+        // when
+        BulkItemResponse[] response = (BulkItemResponse[]) template.requestBody("direct:bulk", request);
+
+        // then
+        assertThat(response, notNullValue());
+        assertEquals(prefix + "baz", response[0].getResponse().getId());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:bulk_index").to("elasticsearch5-rest://elasticsearch?operation=BulkIndex&indexName=twitter&indexType=tweet");
+                from("direct:bulk").to("elasticsearch5-rest://elasticsearch?operation=Bulk&indexName=twitter&indexType=tweet&hostAddresses=localhost:" + ES_BASE_HTTP_PORT);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
new file mode 100644
index 0000000..6ecd3f9
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterBaseTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.http.HttpHost;
+import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner.newConfigs;
+
+public class ElasticsearchClusterBaseTest extends CamelTestSupport {
+
+    public static ElasticsearchClusterRunner runner;
+    public static String clusterName;
+    public static RestClient restclient;
+    public static RestHighLevelClient client;
+
+    protected static final int ES_BASE_HTTP_PORT = AvailablePortFinder.getNextAvailable();
+    protected static final int ES_FIRST_NODE_TRANSPORT_PORT = AvailablePortFinder.getNextAvailable(ES_BASE_HTTP_PORT + 1);
+
+    @SuppressWarnings("resource")
+    @BeforeClass
+    public static void cleanUpOnce() throws Exception {
+        deleteDirectory("target/testcluster/");
+        clusterName = "es-cl-run-" + System.currentTimeMillis();
+        // create runner instance
+
+        runner = new ElasticsearchClusterRunner();
+        // create ES nodes
+        runner.onBuild((number, settingsBuilder) -> {
+            settingsBuilder.put("http.cors.enabled", true);
+            settingsBuilder.put("http.cors.allow-origin", "*");
+            settingsBuilder.put("discovery.zen.ping.unicast.hosts", "127.0.0.1:9301,127.0.0.1:9302,127.0.0.1:9303");
+        }).build(newConfigs()
+                 .clusterName(clusterName)
+                 .numOfNode(3)
+                 .baseHttpPort(ES_BASE_HTTP_PORT)
+                 .basePath("target/testcluster/")
+                 .disableESLogger());
+
+        // wait for green status
+        runner.ensureGreen();
+        restclient = RestClient.builder(new HttpHost(InetAddress.getByName("localhost"),ES_FIRST_NODE_TRANSPORT_PORT)).build();
+        client = new RestHighLevelClient(restclient);
+    }
+
+    @AfterClass
+    public static void teardownOnce() throws Exception {
+        if (restclient != null) {
+            restclient.close();
+        }
+        if (runner != null) {
+            // close runner
+            runner.close();
+            // delete all files
+            runner.clean();
+        }
+    }
+
+    @Override
+    public boolean isCreateCamelContextPerClass() {
+        // let's speed up the tests using the same context
+        return true;
+    }
+
+    /**
+     * As we don't delete the {@code target/data} folder for <b>each</b> test
+     * below (otherwise they would run much slower), we need to make sure
+     * there's no side effect of the same used data through creating unique
+     * indexes.
+     */
+    Map<String, String> createIndexedData(String... additionalPrefixes) {
+        String prefix = createPrefix();
+
+        // take over any potential prefixes we may have been asked for
+        if (additionalPrefixes.length > 0) {
+            StringBuilder sb = new StringBuilder(prefix);
+            for (String additionalPrefix : additionalPrefixes) {
+                sb.append(additionalPrefix).append("-");
+            }
+            prefix = sb.toString();
+        }
+
+        String key = prefix + "key";
+        String value = prefix + "value";
+        log.info("Creating indexed data using the key/value pair {} => {}", key, value);
+
+        Map<String, String> map = new HashMap<>();
+        map.put(key, value);
+        return map;
+    }
+
+    String createPrefix() {
+        // make use of the test method name to avoid collision
+        return getTestMethodName().toLowerCase() + "-";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/928f185f/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java
----------------------------------------------------------------------
diff --git a/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java
new file mode 100644
index 0000000..b0430d0
--- /dev/null
+++ b/components/camel-elasticsearch5-rest/src/test/java/org/apache/camel/component/elasticsearch5/ElasticsearchClusterIndexTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch5;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.elasticsearch.action.get.GetRequest;
+import org.junit.Test;
+
+public class ElasticsearchClusterIndexTest extends ElasticsearchClusterBaseTest {
+
+    @Test
+    public void indexWithIpAndPort() throws Exception {
+        Map<String, String> map = createIndexedData();
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index);
+        headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "1");
+
+        String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+
+        headers.clear();
+
+        headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index);
+        headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "status");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "2");
+
+        indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+
+        assertEquals("Cluster must be of three nodes", runner.getNodeSize(), 3);
+        assertEquals("Index id 1 must exists", true, client.get(new GetRequest("twitter", "tweet", "1")).isExists());
+        assertEquals("Index id 2 must exists", true, client.get(new GetRequest("twitter", "status", "2")).isExists());
+    }
+
+    @Test
+    public void indexWithSnifferEnable() throws Exception {
+        Map<String, String> map = createIndexedData();
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Index);
+        headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "facebook");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "post");
+        headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "4");
+
+        String indexId = template.requestBodyAndHeaders("direct:indexWithSniffer", map, headers, String.class);
+        assertNotNull("indexId should be set", indexId);
+
+        assertEquals("Cluster must be of three nodes", runner.getNodeSize(), 3);
+        assertEquals("Index id 4 must exists", true, client.get(new GetRequest("facebook", "post", "4")).isExists());
+
+        final BasicResponseHandler responseHandler = new BasicResponseHandler();
+        String body = responseHandler.handleEntity(restclient.performRequest("GET", "/_cluster/health?pretty").getEntity());
+        assertStringContains(body,"\"number_of_data_nodes\" : 3");
+
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:indexWithIpAndPort")
+                    .to("elasticsearch5-rest://" + clusterName + "?operation=Index&indexName=twitter&indexType=tweet&hostAddresses=localhost:" + ES_FIRST_NODE_TRANSPORT_PORT);
+                from("direct:indexWithSniffer")
+                    .to("elasticsearch5-rest://" + clusterName + "?operation=Index&indexName=twitter&indexType=tweet&enableSniffer=true&hostAddresses=localhost:" + ES_FIRST_NODE_TRANSPORT_PORT);
+            }
+        };
+    }
+}