You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/03 16:55:00 UTC

[camel] branch CAMEL-18580/elastic-async-producer created (now fd313375120)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a change to branch CAMEL-18580/elastic-async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git


      at fd313375120 CAMEL-18580: camel-elasticsearch - Propose an async producer

This branch includes the following new commits:

     new fd313375120 CAMEL-18580: camel-elasticsearch - Propose an async producer

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-18580: camel-elasticsearch - Propose an async producer

Posted by nf...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-18580/elastic-async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fd3133751204600b3c9fd83201c57d5a317903c2
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Mon Oct 3 18:54:35 2022 +0200

    CAMEL-18580: camel-elasticsearch - Propose an async producer
---
 .../camel/component/es/ElasticsearchProducer.java  | 543 ++++++++++++++-------
 1 file changed, 380 insertions(+), 163 deletions(-)

diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
index 51f62962a2c..5cafd7933c5 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
@@ -22,29 +22,42 @@ import java.nio.file.Paths;
 import java.security.KeyStore;
 import java.security.cert.Certificate;
 import java.security.cert.CertificateFactory;
+import java.util.concurrent.CompletableFuture;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManagerFactory;
 
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.WriteResponseBase;
 import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
 import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.DeleteResponse;
 import co.elastic.clients.elasticsearch.core.GetRequest;
 import co.elastic.clients.elasticsearch.core.IndexRequest;
 import co.elastic.clients.elasticsearch.core.MgetRequest;
+import co.elastic.clients.elasticsearch.core.MgetResponse;
 import co.elastic.clients.elasticsearch.core.MsearchRequest;
+import co.elastic.clients.elasticsearch.core.MsearchResponse;
 import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
 import co.elastic.clients.elasticsearch.core.UpdateRequest;
+import co.elastic.clients.elasticsearch.core.UpdateResponse;
 import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
 import co.elastic.clients.elasticsearch.indices.ExistsRequest;
 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
 import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.endpoints.BooleanResponse;
 import co.elastic.clients.transport.rest_client.RestClientTransport;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.IOHelper;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -64,13 +77,14 @@ import static org.apache.camel.component.es.ElasticsearchConstants.PARAM_SCROLL_
 /**
  * Represents an Elasticsearch producer.
  */
-public class ElasticsearchProducer extends DefaultProducer {
+class ElasticsearchProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class);
 
     protected final ElasticsearchConfiguration configuration;
+    private final Object mutex = new Object();
     private volatile RestClient client;
-    private volatile Sniffer sniffer;
+    private Sniffer sniffer;
 
     public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) {
         super(endpoint);
@@ -116,188 +130,338 @@ public class ElasticsearchProducer extends DefaultProducer {
         }
         if (operationConfig == null) {
             throw new IllegalArgumentException(
-                    ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported");
+                    ElasticsearchConstants.PARAM_OPERATION + " value is mandatory");
         }
         return operationConfig;
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        if (configuration.isDisconnect() && client == null) {
-            startClient();
-        }
-        final ObjectMapper mapper = new ObjectMapper();
-        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
-        ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper));
-        ElasticsearchClient esClient = new ElasticsearchClient(transport);
-        // 2. Index and type will be set by:
-        // a. If the incoming body is already an action request
-        // b. If the body is not an action request we will use headers if they
-        // are set.
-        // c. If the body is not an action request and the headers aren't set we
-        // will use the configuration.
-        // No error is thrown by the component in the event none of the above
-        // conditions are met. The java es client
-        // will throw.
-
-        Message message = exchange.getIn();
-        final ElasticsearchOperation operation = resolveOperation(exchange);
-
-        // Set the index/type headers on the exchange if necessary. This is used
-        // for type conversion.
-        boolean configIndexName = false;
-        String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
-        if (indexName == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
-            configIndexName = true;
-        }
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            if (configuration.isDisconnect() && client == null) {
+                startClient();
+            }
+            final ObjectMapper mapper = new ObjectMapper();
+            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+            ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper));
+            // 2. Index and type will be set by:
+            // a. If the incoming body is already an action request
+            // b. If the body is not an action request we will use headers if they
+            // are set.
+            // c. If the body is not an action request and the headers aren't set we
+            // will use the configuration.
+            // No error is thrown by the component in the event none of the above
+            // conditions are met. The java es client
+            // will throw.
 
-        Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
-        if (size == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize());
-        }
+            Message message = exchange.getIn();
+            final ElasticsearchOperation operation = resolveOperation(exchange);
 
-        Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
-        if (from == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom());
-        }
+            // Set the index/type headers on the exchange if necessary. This is used
+            // for type conversion.
+            boolean configIndexName = false;
+            String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
+            if (indexName == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
+                configIndexName = true;
+            }
 
-        boolean configWaitForActiveShards = false;
-        Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
-        if (waitForActiveShards == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
-            configWaitForActiveShards = true;
-        }
+            Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
+            if (size == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize());
+            }
 
-        Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class);
-        if (documentClass == null) {
-            documentClass = configuration.getDocumentClass();
-        }
+            Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
+            if (from == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom());
+            }
 
-        switch (operation) {
-            case Index: {
-                IndexRequest.Builder<?> indexRequestBuilder = message.getBody(IndexRequest.Builder.class);
-                message.setBody(esClient.index(indexRequestBuilder.build()).id());
-                break;
+            boolean configWaitForActiveShards = false;
+            Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
+            if (waitForActiveShards == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
+                configWaitForActiveShards = true;
             }
-            case Update: {
-                UpdateRequest.Builder updateRequestBuilder = message.getBody(UpdateRequest.Builder.class);
-                message.setBody(esClient.update(updateRequestBuilder.build(), documentClass).id());
-                break;
+
+            Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class);
+            if (documentClass == null) {
+                documentClass = configuration.getDocumentClass();
             }
-            case GetById: {
-                GetRequest.Builder getRequestBuilder = message.getBody(GetRequest.Builder.class);
-                if (getRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only String or GetRequest.Builder is allowed as a type");
+
+            ActionContext ctx = new ActionContext(exchange, callback, transport, configIndexName, configWaitForActiveShards);
+
+            switch (operation) {
+                case Index: {
+                    processIndexAsync(ctx);
+                    break;
                 }
-                message.setBody(esClient.get(getRequestBuilder.build(), documentClass));
-                break;
-            }
-            case Bulk: {
-                BulkRequest.Builder bulkRequestBuilder = message.getBody(BulkRequest.Builder.class);
-                if (bulkRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
+                case Update: {
+                    processUpdateAsync(ctx, documentClass);
+                    break;
                 }
-                message.setBody(esClient.bulk(bulkRequestBuilder.build()).items());
-                break;
-            }
-            case Delete: {
-                DeleteRequest.Builder deleteRequestBuilder = message.getBody(DeleteRequest.Builder.class);
-                if (deleteRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
+                case GetById: {
+                    processGetByIdAsync(ctx, documentClass);
+                    break;
                 }
-                message.setBody(esClient.delete(deleteRequestBuilder.build()).result());
-                break;
-            }
-            case DeleteIndex: {
-                DeleteIndexRequest.Builder deleteIndexRequestBuilder = message.getBody(DeleteIndexRequest.Builder.class);
-                if (deleteIndexRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
+                case Bulk: {
+                    processBulkAsync(ctx);
+                    break;
                 }
-                message.setBody(esClient.indices().delete(deleteIndexRequestBuilder.build()).acknowledged());
-                break;
-            }
-            case Exists: {
-                ExistsRequest.Builder builder = new ExistsRequest.Builder();
-                builder.index(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
-                message.setBody(esClient.indices().exists(builder.build()).value());
-                break;
-            }
-            case Search: {
-                SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
-                if (searchRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
+                case Delete: {
+                    processDeleteAsync(ctx);
+                    break;
                 }
-                // is it a scroll request ?
-                boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class);
-                if (useScroll) {
-                    int scrollKeepAliveMs
-                            = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(),
-                                    Integer.class);
-                    ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>(
-                            searchRequestBuilder, esClient, scrollKeepAliveMs, exchange, documentClass);
-                    exchange.getIn().setBody(scrollRequestIterator);
-                } else {
-                    message.setBody(esClient.search(searchRequestBuilder.build(), documentClass).hits());
+                case DeleteIndex: {
+                    processDeleteIndexAsync(ctx);
+                    break;
                 }
-                break;
-            }
-            case MultiSearch: {
-                MsearchRequest.Builder msearchRequestBuilder = message.getBody(MsearchRequest.Builder.class);
-                if (msearchRequestBuilder == null) {
-                    throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
+                case Exists: {
+                    processExistsAsync(ctx);
+                    break;
                 }
-                message.setBody(esClient.msearch(msearchRequestBuilder.build(), documentClass).responses());
-                break;
-            }
-            case MultiGet: {
-                MgetRequest.Builder mgetRequestBuilder = message.getBody(MgetRequest.Builder.class);
-                if (mgetRequestBuilder == null) {
-                    throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
+                case Search: {
+                    SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
+                    if (searchRequestBuilder == null) {
+                        throw new IllegalArgumentException(
+                                "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
+                    }
+                    // is it a scroll request ?
+                    boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class);
+                    if (useScroll) {
+                        // As a scroll request is expected, for the sake of simplicity, the synchronous mode is preserved
+                        int scrollKeepAliveMs
+                                = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(),
+                                        Integer.class);
+                        ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>(
+                                searchRequestBuilder, new ElasticsearchClient(transport), scrollKeepAliveMs, exchange,
+                                documentClass);
+                        exchange.getIn().setBody(scrollRequestIterator);
+                        cleanup(ctx);
+                        callback.done(true);
+                        return true;
+                    } else {
+                        onComplete(
+                                new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), documentClass)
+                                        .thenApply(SearchResponse::hits),
+                                ctx);
+                    }
+                    break;
+                }
+                case MultiSearch: {
+                    processMultiSearchAsync(ctx, documentClass);
+                    break;
+                }
+                case MultiGet: {
+                    processMultiGetAsync(ctx, documentClass);
+                    break;
+                }
+                case Ping: {
+                    processPingAsync(ctx);
+                    break;
+                }
+                default: {
+                    throw new IllegalArgumentException(
+                            ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
                 }
-                message.setBody(esClient.mget(mgetRequestBuilder.build(), documentClass).docs());
-                break;
-            }
-            case Ping: {
-                message.setBody(esClient.ping().value());
-                break;
-            }
-            default: {
-                throw new IllegalArgumentException(
-                        ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
             }
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
+        return false;
+    }
+
+    /**
+     * Executes asynchronously a ping to the Elastic cluster.
+     */
+    private void processPingAsync(ActionContext ctx) {
+        onComplete(
+                ctx.getClient().ping()
+                        .thenApply(BooleanResponse::value),
+                ctx);
+    }
+
+    /**
+     * Executes asynchronously a multi-get request.
+     */
+    private void processMultiGetAsync(ActionContext ctx, Class<?> documentClass) {
+        MgetRequest.Builder mgetRequestBuilder = ctx.getMessage().getBody(MgetRequest.Builder.class);
+        if (mgetRequestBuilder == null) {
+            throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
+        }
+        onComplete(
+                ctx.getClient().mget(mgetRequestBuilder.build(), documentClass)
+                        .thenApply(MgetResponse::docs),
+                ctx);
+    }
 
-        // If we set params via the configuration on this exchange, remove them
-        // now. This preserves legacy behavior for this component and enables a
-        // use case where one message can be sent to multiple elasticsearch
-        // endpoints where the user is relying on the endpoint configuration
-        // (index/type) rather than header values. If we do not clear this out
-        // sending the same message (index request, for example) to multiple
-        // elasticsearch endpoints would have the effect overriding any
-        // subsequent endpoint index/type with the first endpoint index/type.
-        if (configIndexName) {
-            message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+    /**
+     * Executes asynchronously a multi-search request.
+     */
+    private void processMultiSearchAsync(ActionContext ctx, Class<?> documentClass) {
+        MsearchRequest.Builder msearchRequestBuilder = ctx.getMessage().getBody(MsearchRequest.Builder.class);
+        if (msearchRequestBuilder == null) {
+            throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
         }
+        onComplete(
+                ctx.getClient().msearch(msearchRequestBuilder.build(), documentClass)
+                        .thenApply(MsearchResponse::responses),
+                ctx);
+    }
+
+    /**
+     * Checks asynchronously if a given index exists.
+     */
+    private void processExistsAsync(ActionContext ctx) {
+        ExistsRequest.Builder builder = new ExistsRequest.Builder();
+        builder.index(ctx.getMessage().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
+        onComplete(
+                ctx.getClient().indices().exists(builder.build())
+                        .thenApply(BooleanResponse::value),
+                ctx);
+    }
 
-        if (configWaitForActiveShards) {
-            message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+    /**
+     * Deletes asynchronously an index.
+     */
+    private void processDeleteIndexAsync(ActionContext ctx) {
+        DeleteIndexRequest.Builder deleteIndexRequestBuilder = ctx.getMessage().getBody(DeleteIndexRequest.Builder.class);
+        if (deleteIndexRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
         }
-        if (configuration.isDisconnect()) {
-            IOHelper.close(transport);
-            IOHelper.close(client);
-            client = null;
-            if (configuration.isEnableSniffer()) {
-                IOHelper.close(sniffer);
-                sniffer = null;
-            }
+        onComplete(
+                ctx.getClient().indices().delete(deleteIndexRequestBuilder.build())
+                        .thenApply(DeleteIndexResponse::acknowledged),
+                ctx);
+    }
+
+    /**
+     * Deletes asynchronously a document.
+     */
+    private void processDeleteAsync(ActionContext ctx) {
+        DeleteRequest.Builder deleteRequestBuilder = ctx.getMessage().getBody(DeleteRequest.Builder.class);
+        if (deleteRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
+        }
+        onComplete(
+                ctx.getClient().delete(deleteRequestBuilder.build())
+                        .thenApply(DeleteResponse::result),
+                ctx);
+    }
+
+    /**
+     * Executes asynchronously bulk operations.
+     */
+    private void processBulkAsync(ActionContext ctx) {
+        BulkRequest.Builder bulkRequestBuilder = ctx.getMessage().getBody(BulkRequest.Builder.class);
+        if (bulkRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
         }
+        onComplete(
+                ctx.getClient().bulk(bulkRequestBuilder.build())
+                        .thenApply(BulkResponse::items),
+                ctx);
+    }
+
+    /**
+     * Finds asynchronously a document by id.
+     */
+    private void processGetByIdAsync(ActionContext ctx, Class<?> documentClass) {
+        GetRequest.Builder getRequestBuilder = ctx.getMessage().getBody(GetRequest.Builder.class);
+        if (getRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only String or GetRequest.Builder is allowed as a type");
+        }
+        onComplete(
+                ctx.getClient().get(getRequestBuilder.build(), documentClass),
+                ctx);
+    }
+
+    /**
+     * Updates asynchronously a document.
+     */
+    private void processUpdateAsync(ActionContext ctx, Class<?> documentClass) {
+        UpdateRequest.Builder updateRequestBuilder = ctx.getMessage().getBody(UpdateRequest.Builder.class);
+        onComplete(
+                ctx.getClient().update(updateRequestBuilder.build(), documentClass)
+                        .thenApply(r -> ((UpdateResponse<?>) r).id()),
+                ctx);
+    }
+
+    /**
+     * Indexes asynchronously a document.
+     */
+    private void processIndexAsync(ActionContext ctx) {
+        IndexRequest.Builder<?> indexRequestBuilder = ctx.getMessage().getBody(IndexRequest.Builder.class);
+        onComplete(
+                ctx.getClient().index(indexRequestBuilder.build())
+                        .thenApply(WriteResponseBase::id),
+                ctx);
+    }
+
+    /**
+     * Add actions to perform once the given future is complete.
+     *
+     * @param future the future to complete with specific actions.
+     * @param ctx    the context of the asynchronous task.
+     * @param <T>    the result type returned by the future.
+     */
+    private <T> void onComplete(CompletableFuture<T> future, ActionContext ctx) {
+        final Exchange exchange = ctx.getExchange();
+        future.thenAccept(r -> exchange.getIn().setBody(r))
+                .thenAccept(r -> cleanup(ctx))
+                .whenComplete(
+                        (r, e) -> {
+                            try {
+                                if (e != null) {
+                                    exchange.setException(new CamelExchangeException(
+                                            "An error occurred while executing the action", exchange, e));
+                                }
+                            } finally {
+                                ctx.getCallback().done(false);
+                            }
+                        });
+    }
+
+    /**
+     * The cleanup task to execute once everything is done.
+     */
+    private void cleanup(ActionContext ctx) {
+
+        try {
+            Message message = ctx.getMessage();
+
+            // If we set params via the configuration on this exchange, remove them
+            // now. This preserves legacy behavior for this component and enables a
+            // use case where one message can be sent to multiple elasticsearch
+            // endpoints where the user is relying on the endpoint configuration
+            // (index/type) rather than header values. If we do not clear this out
+            // sending the same message (index request, for example) to multiple
+            // elasticsearch endpoints would have the effect overriding any
+            // subsequent endpoint index/type with the first endpoint index/type.
+            if (ctx.isConfigIndexName()) {
+                message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+            }
 
+            if (ctx.isConfigWaitForActiveShards()) {
+                message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+            }
+            if (configuration.isDisconnect()) {
+                IOHelper.close(ctx.getTransport());
+                if (configuration.isEnableSniffer()) {
+                    IOHelper.close(sniffer);
+                    sniffer = null;
+                }
+                IOHelper.close(client);
+                client = null;
+            }
+        } catch (Exception e) {
+            LOG.warn("Could not execute the cleanup task", e);
+        }
     }
 
     @Override
@@ -310,12 +474,16 @@ public class ElasticsearchProducer extends DefaultProducer {
 
     private void startClient() {
         if (client == null) {
-            LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName());
-            if (configuration.getHostAddressesList() != null
-                    && !configuration.getHostAddressesList().isEmpty()) {
-                client = createClient();
-            } else {
-                LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+            synchronized (mutex) {
+                if (client == null) {
+                    LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName());
+                    if (configuration.getHostAddressesList() != null
+                            && !configuration.getHostAddressesList().isEmpty()) {
+                        client = createClient();
+                    } else {
+                        LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+                    }
+                }
             }
         }
     }
@@ -388,4 +556,53 @@ public class ElasticsearchProducer extends DefaultProducer {
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * An inner class providing all the information that an asynchronous action could need.
+     */
+    private static class ActionContext {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final ElasticsearchTransport transport;
+        private final boolean configIndexName;
+        private final boolean configWaitForActiveShards;
+
+        ActionContext(Exchange exchange, AsyncCallback callback, ElasticsearchTransport transport, boolean configIndexName,
+                      boolean configWaitForActiveShards) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.transport = transport;
+            this.configIndexName = configIndexName;
+            this.configWaitForActiveShards = configWaitForActiveShards;
+        }
+
+        ElasticsearchTransport getTransport() {
+            return transport;
+        }
+
+        ElasticsearchAsyncClient getClient() {
+            return new ElasticsearchAsyncClient(transport);
+        }
+
+        boolean isConfigIndexName() {
+            return configIndexName;
+        }
+
+        boolean isConfigWaitForActiveShards() {
+            return configWaitForActiveShards;
+        }
+
+        Exchange getExchange() {
+            return exchange;
+        }
+
+        AsyncCallback getCallback() {
+            return callback;
+        }
+
+        Message getMessage() {
+            return exchange.getIn();
+        }
+    }
 }