You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/03 16:55:01 UTC

[camel] 01/01: CAMEL-18580: camel-elasticsearch - Propose an async producer

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-18580/elastic-async-producer
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fd3133751204600b3c9fd83201c57d5a317903c2
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Mon Oct 3 18:54:35 2022 +0200

    CAMEL-18580: camel-elasticsearch - Propose an async producer
---
 .../camel/component/es/ElasticsearchProducer.java  | 543 ++++++++++++++-------
 1 file changed, 380 insertions(+), 163 deletions(-)

diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
index 51f62962a2c..5cafd7933c5 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
@@ -22,29 +22,42 @@ import java.nio.file.Paths;
 import java.security.KeyStore;
 import java.security.cert.Certificate;
 import java.security.cert.CertificateFactory;
+import java.util.concurrent.CompletableFuture;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManagerFactory;
 
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
 import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.WriteResponseBase;
 import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
 import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.DeleteResponse;
 import co.elastic.clients.elasticsearch.core.GetRequest;
 import co.elastic.clients.elasticsearch.core.IndexRequest;
 import co.elastic.clients.elasticsearch.core.MgetRequest;
+import co.elastic.clients.elasticsearch.core.MgetResponse;
 import co.elastic.clients.elasticsearch.core.MsearchRequest;
+import co.elastic.clients.elasticsearch.core.MsearchResponse;
 import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
 import co.elastic.clients.elasticsearch.core.UpdateRequest;
+import co.elastic.clients.elasticsearch.core.UpdateResponse;
 import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
 import co.elastic.clients.elasticsearch.indices.ExistsRequest;
 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
 import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.endpoints.BooleanResponse;
 import co.elastic.clients.transport.rest_client.RestClientTransport;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.util.IOHelper;
 import org.apache.http.HttpHost;
 import org.apache.http.auth.AuthScope;
@@ -64,13 +77,14 @@ import static org.apache.camel.component.es.ElasticsearchConstants.PARAM_SCROLL_
 /**
  * Represents an Elasticsearch producer.
  */
-public class ElasticsearchProducer extends DefaultProducer {
+class ElasticsearchProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class);
 
     protected final ElasticsearchConfiguration configuration;
+    private final Object mutex = new Object();
     private volatile RestClient client;
-    private volatile Sniffer sniffer;
+    private Sniffer sniffer;
 
     public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) {
         super(endpoint);
@@ -116,188 +130,338 @@ public class ElasticsearchProducer extends DefaultProducer {
         }
         if (operationConfig == null) {
             throw new IllegalArgumentException(
-                    ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported");
+                    ElasticsearchConstants.PARAM_OPERATION + " value is mandatory");
         }
         return operationConfig;
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        if (configuration.isDisconnect() && client == null) {
-            startClient();
-        }
-        final ObjectMapper mapper = new ObjectMapper();
-        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
-        ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper));
-        ElasticsearchClient esClient = new ElasticsearchClient(transport);
-        // 2. Index and type will be set by:
-        // a. If the incoming body is already an action request
-        // b. If the body is not an action request we will use headers if they
-        // are set.
-        // c. If the body is not an action request and the headers aren't set we
-        // will use the configuration.
-        // No error is thrown by the component in the event none of the above
-        // conditions are met. The java es client
-        // will throw.
-
-        Message message = exchange.getIn();
-        final ElasticsearchOperation operation = resolveOperation(exchange);
-
-        // Set the index/type headers on the exchange if necessary. This is used
-        // for type conversion.
-        boolean configIndexName = false;
-        String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
-        if (indexName == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
-            configIndexName = true;
-        }
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            if (configuration.isDisconnect() && client == null) {
+                startClient();
+            }
+            final ObjectMapper mapper = new ObjectMapper();
+            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+            ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper));
+            // 2. Index and type will be set by:
+            // a. If the incoming body is already an action request
+            // b. If the body is not an action request we will use headers if they
+            // are set.
+            // c. If the body is not an action request and the headers aren't set we
+            // will use the configuration.
+            // No error is thrown by the component in the event none of the above
+            // conditions are met. The java es client
+            // will throw.
 
-        Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
-        if (size == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize());
-        }
+            Message message = exchange.getIn();
+            final ElasticsearchOperation operation = resolveOperation(exchange);
 
-        Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
-        if (from == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom());
-        }
+            // Set the index/type headers on the exchange if necessary. This is used
+            // for type conversion.
+            boolean configIndexName = false;
+            String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
+            if (indexName == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
+                configIndexName = true;
+            }
 
-        boolean configWaitForActiveShards = false;
-        Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
-        if (waitForActiveShards == null) {
-            message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
-            configWaitForActiveShards = true;
-        }
+            Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
+            if (size == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize());
+            }
 
-        Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class);
-        if (documentClass == null) {
-            documentClass = configuration.getDocumentClass();
-        }
+            Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
+            if (from == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom());
+            }
 
-        switch (operation) {
-            case Index: {
-                IndexRequest.Builder<?> indexRequestBuilder = message.getBody(IndexRequest.Builder.class);
-                message.setBody(esClient.index(indexRequestBuilder.build()).id());
-                break;
+            boolean configWaitForActiveShards = false;
+            Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
+            if (waitForActiveShards == null) {
+                message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
+                configWaitForActiveShards = true;
             }
-            case Update: {
-                UpdateRequest.Builder updateRequestBuilder = message.getBody(UpdateRequest.Builder.class);
-                message.setBody(esClient.update(updateRequestBuilder.build(), documentClass).id());
-                break;
+
+            Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class);
+            if (documentClass == null) {
+                documentClass = configuration.getDocumentClass();
             }
-            case GetById: {
-                GetRequest.Builder getRequestBuilder = message.getBody(GetRequest.Builder.class);
-                if (getRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only String or GetRequest.Builder is allowed as a type");
+
+            ActionContext ctx = new ActionContext(exchange, callback, transport, configIndexName, configWaitForActiveShards);
+
+            switch (operation) {
+                case Index: {
+                    processIndexAsync(ctx);
+                    break;
                 }
-                message.setBody(esClient.get(getRequestBuilder.build(), documentClass));
-                break;
-            }
-            case Bulk: {
-                BulkRequest.Builder bulkRequestBuilder = message.getBody(BulkRequest.Builder.class);
-                if (bulkRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
+                case Update: {
+                    processUpdateAsync(ctx, documentClass);
+                    break;
                 }
-                message.setBody(esClient.bulk(bulkRequestBuilder.build()).items());
-                break;
-            }
-            case Delete: {
-                DeleteRequest.Builder deleteRequestBuilder = message.getBody(DeleteRequest.Builder.class);
-                if (deleteRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
+                case GetById: {
+                    processGetByIdAsync(ctx, documentClass);
+                    break;
                 }
-                message.setBody(esClient.delete(deleteRequestBuilder.build()).result());
-                break;
-            }
-            case DeleteIndex: {
-                DeleteIndexRequest.Builder deleteIndexRequestBuilder = message.getBody(DeleteIndexRequest.Builder.class);
-                if (deleteIndexRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
+                case Bulk: {
+                    processBulkAsync(ctx);
+                    break;
                 }
-                message.setBody(esClient.indices().delete(deleteIndexRequestBuilder.build()).acknowledged());
-                break;
-            }
-            case Exists: {
-                ExistsRequest.Builder builder = new ExistsRequest.Builder();
-                builder.index(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
-                message.setBody(esClient.indices().exists(builder.build()).value());
-                break;
-            }
-            case Search: {
-                SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
-                if (searchRequestBuilder == null) {
-                    throw new IllegalArgumentException(
-                            "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
+                case Delete: {
+                    processDeleteAsync(ctx);
+                    break;
                 }
-                // is it a scroll request ?
-                boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class);
-                if (useScroll) {
-                    int scrollKeepAliveMs
-                            = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(),
-                                    Integer.class);
-                    ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>(
-                            searchRequestBuilder, esClient, scrollKeepAliveMs, exchange, documentClass);
-                    exchange.getIn().setBody(scrollRequestIterator);
-                } else {
-                    message.setBody(esClient.search(searchRequestBuilder.build(), documentClass).hits());
+                case DeleteIndex: {
+                    processDeleteIndexAsync(ctx);
+                    break;
                 }
-                break;
-            }
-            case MultiSearch: {
-                MsearchRequest.Builder msearchRequestBuilder = message.getBody(MsearchRequest.Builder.class);
-                if (msearchRequestBuilder == null) {
-                    throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
+                case Exists: {
+                    processExistsAsync(ctx);
+                    break;
                 }
-                message.setBody(esClient.msearch(msearchRequestBuilder.build(), documentClass).responses());
-                break;
-            }
-            case MultiGet: {
-                MgetRequest.Builder mgetRequestBuilder = message.getBody(MgetRequest.Builder.class);
-                if (mgetRequestBuilder == null) {
-                    throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
+                case Search: {
+                    SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
+                    if (searchRequestBuilder == null) {
+                        throw new IllegalArgumentException(
+                                "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
+                    }
+                    // is it a scroll request ?
+                    boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class);
+                    if (useScroll) {
+                        // As a scroll request is expected, for the sake of simplicity, the synchronous mode is preserved
+                        int scrollKeepAliveMs
+                                = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(),
+                                        Integer.class);
+                        ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>(
+                                searchRequestBuilder, new ElasticsearchClient(transport), scrollKeepAliveMs, exchange,
+                                documentClass);
+                        exchange.getIn().setBody(scrollRequestIterator);
+                        cleanup(ctx);
+                        callback.done(true);
+                        return true;
+                    } else {
+                        onComplete(
+                                new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), documentClass)
+                                        .thenApply(SearchResponse::hits),
+                                ctx);
+                    }
+                    break;
+                }
+                case MultiSearch: {
+                    processMultiSearchAsync(ctx, documentClass);
+                    break;
+                }
+                case MultiGet: {
+                    processMultiGetAsync(ctx, documentClass);
+                    break;
+                }
+                case Ping: {
+                    processPingAsync(ctx);
+                    break;
+                }
+                default: {
+                    throw new IllegalArgumentException(
+                            ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
                 }
-                message.setBody(esClient.mget(mgetRequestBuilder.build(), documentClass).docs());
-                break;
-            }
-            case Ping: {
-                message.setBody(esClient.ping().value());
-                break;
-            }
-            default: {
-                throw new IllegalArgumentException(
-                        ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
             }
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
+        return false;
+    }
+
+    /**
+     * Executes asynchronously a ping to the Elastic cluster.
+     */
+    private void processPingAsync(ActionContext ctx) {
+        onComplete(
+                ctx.getClient().ping()
+                        .thenApply(BooleanResponse::value),
+                ctx);
+    }
+
+    /**
+     * Executes asynchronously a multi-get request.
+     */
+    private void processMultiGetAsync(ActionContext ctx, Class<?> documentClass) {
+        MgetRequest.Builder mgetRequestBuilder = ctx.getMessage().getBody(MgetRequest.Builder.class);
+        if (mgetRequestBuilder == null) {
+            throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
+        }
+        onComplete(
+                ctx.getClient().mget(mgetRequestBuilder.build(), documentClass)
+                        .thenApply(MgetResponse::docs),
+                ctx);
+    }
 
-        // If we set params via the configuration on this exchange, remove them
-        // now. This preserves legacy behavior for this component and enables a
-        // use case where one message can be sent to multiple elasticsearch
-        // endpoints where the user is relying on the endpoint configuration
-        // (index/type) rather than header values. If we do not clear this out
-        // sending the same message (index request, for example) to multiple
-        // elasticsearch endpoints would have the effect overriding any
-        // subsequent endpoint index/type with the first endpoint index/type.
-        if (configIndexName) {
-            message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+    /**
+     * Executes asynchronously a multi-search request.
+     */
+    private void processMultiSearchAsync(ActionContext ctx, Class<?> documentClass) {
+        MsearchRequest.Builder msearchRequestBuilder = ctx.getMessage().getBody(MsearchRequest.Builder.class);
+        if (msearchRequestBuilder == null) {
+            throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
         }
+        onComplete(
+                ctx.getClient().msearch(msearchRequestBuilder.build(), documentClass)
+                        .thenApply(MsearchResponse::responses),
+                ctx);
+    }
+
+    /**
+     * Checks asynchronously if a given index exists.
+     */
+    private void processExistsAsync(ActionContext ctx) {
+        ExistsRequest.Builder builder = new ExistsRequest.Builder();
+        builder.index(ctx.getMessage().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
+        onComplete(
+                ctx.getClient().indices().exists(builder.build())
+                        .thenApply(BooleanResponse::value),
+                ctx);
+    }
 
-        if (configWaitForActiveShards) {
-            message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+    /**
+     * Deletes asynchronously an index.
+     */
+    private void processDeleteIndexAsync(ActionContext ctx) {
+        DeleteIndexRequest.Builder deleteIndexRequestBuilder = ctx.getMessage().getBody(DeleteIndexRequest.Builder.class);
+        if (deleteIndexRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
         }
-        if (configuration.isDisconnect()) {
-            IOHelper.close(transport);
-            IOHelper.close(client);
-            client = null;
-            if (configuration.isEnableSniffer()) {
-                IOHelper.close(sniffer);
-                sniffer = null;
-            }
+        onComplete(
+                ctx.getClient().indices().delete(deleteIndexRequestBuilder.build())
+                        .thenApply(DeleteIndexResponse::acknowledged),
+                ctx);
+    }
+
+    /**
+     * Deletes asynchronously a document.
+     */
+    private void processDeleteAsync(ActionContext ctx) {
+        DeleteRequest.Builder deleteRequestBuilder = ctx.getMessage().getBody(DeleteRequest.Builder.class);
+        if (deleteRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
+        }
+        onComplete(
+                ctx.getClient().delete(deleteRequestBuilder.build())
+                        .thenApply(DeleteResponse::result),
+                ctx);
+    }
+
+    /**
+     * Executes asynchronously bulk operations.
+     */
+    private void processBulkAsync(ActionContext ctx) {
+        BulkRequest.Builder bulkRequestBuilder = ctx.getMessage().getBody(BulkRequest.Builder.class);
+        if (bulkRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
         }
+        onComplete(
+                ctx.getClient().bulk(bulkRequestBuilder.build())
+                        .thenApply(BulkResponse::items),
+                ctx);
+    }
+
+    /**
+     * Finds asynchronously a document by id.
+     */
+    private void processGetByIdAsync(ActionContext ctx, Class<?> documentClass) {
+        GetRequest.Builder getRequestBuilder = ctx.getMessage().getBody(GetRequest.Builder.class);
+        if (getRequestBuilder == null) {
+            throw new IllegalArgumentException(
+                    "Wrong body type. Only String or GetRequest.Builder is allowed as a type");
+        }
+        onComplete(
+                ctx.getClient().get(getRequestBuilder.build(), documentClass),
+                ctx);
+    }
+
+    /**
+     * Updates asynchronously a document.
+     */
+    private void processUpdateAsync(ActionContext ctx, Class<?> documentClass) {
+        UpdateRequest.Builder updateRequestBuilder = ctx.getMessage().getBody(UpdateRequest.Builder.class);
+        onComplete(
+                ctx.getClient().update(updateRequestBuilder.build(), documentClass)
+                        .thenApply(r -> ((UpdateResponse<?>) r).id()),
+                ctx);
+    }
+
+    /**
+     * Indexes asynchronously a document.
+     */
+    private void processIndexAsync(ActionContext ctx) {
+        IndexRequest.Builder<?> indexRequestBuilder = ctx.getMessage().getBody(IndexRequest.Builder.class);
+        onComplete(
+                ctx.getClient().index(indexRequestBuilder.build())
+                        .thenApply(WriteResponseBase::id),
+                ctx);
+    }
+
+    /**
+     * Add actions to perform once the given future is complete.
+     *
+     * @param future the future to complete with specific actions.
+     * @param ctx    the context of the asynchronous task.
+     * @param <T>    the result type returned by the future.
+     */
+    private <T> void onComplete(CompletableFuture<T> future, ActionContext ctx) {
+        final Exchange exchange = ctx.getExchange();
+        future.thenAccept(r -> exchange.getIn().setBody(r))
+                .thenAccept(r -> cleanup(ctx))
+                .whenComplete(
+                        (r, e) -> {
+                            try {
+                                if (e != null) {
+                                    exchange.setException(new CamelExchangeException(
+                                            "An error occurred while executing the action", exchange, e));
+                                }
+                            } finally {
+                                ctx.getCallback().done(false);
+                            }
+                        });
+    }
+
+    /**
+     * The cleanup task to execute once everything is done.
+     */
+    private void cleanup(ActionContext ctx) {
+
+        try {
+            Message message = ctx.getMessage();
+
+            // If we set params via the configuration on this exchange, remove them
+            // now. This preserves legacy behavior for this component and enables a
+            // use case where one message can be sent to multiple elasticsearch
+            // endpoints where the user is relying on the endpoint configuration
+            // (index/type) rather than header values. If we do not clear this out
+            // sending the same message (index request, for example) to multiple
+            // elasticsearch endpoints would have the effect overriding any
+            // subsequent endpoint index/type with the first endpoint index/type.
+            if (ctx.isConfigIndexName()) {
+                message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+            }
 
+            if (ctx.isConfigWaitForActiveShards()) {
+                message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+            }
+            if (configuration.isDisconnect()) {
+                IOHelper.close(ctx.getTransport());
+                if (configuration.isEnableSniffer()) {
+                    IOHelper.close(sniffer);
+                    sniffer = null;
+                }
+                IOHelper.close(client);
+                client = null;
+            }
+        } catch (Exception e) {
+            LOG.warn("Could not execute the cleanup task", e);
+        }
     }
 
     @Override
@@ -310,12 +474,16 @@ public class ElasticsearchProducer extends DefaultProducer {
 
     private void startClient() {
         if (client == null) {
-            LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName());
-            if (configuration.getHostAddressesList() != null
-                    && !configuration.getHostAddressesList().isEmpty()) {
-                client = createClient();
-            } else {
-                LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+            synchronized (mutex) {
+                if (client == null) {
+                    LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName());
+                    if (configuration.getHostAddressesList() != null
+                            && !configuration.getHostAddressesList().isEmpty()) {
+                        client = createClient();
+                    } else {
+                        LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+                    }
+                }
             }
         }
     }
@@ -388,4 +556,53 @@ public class ElasticsearchProducer extends DefaultProducer {
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * An inner class providing all the information that an asynchronous action could need.
+     */
+    private static class ActionContext {
+
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final ElasticsearchTransport transport;
+        private final boolean configIndexName;
+        private final boolean configWaitForActiveShards;
+
+        ActionContext(Exchange exchange, AsyncCallback callback, ElasticsearchTransport transport, boolean configIndexName,
+                      boolean configWaitForActiveShards) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.transport = transport;
+            this.configIndexName = configIndexName;
+            this.configWaitForActiveShards = configWaitForActiveShards;
+        }
+
+        ElasticsearchTransport getTransport() {
+            return transport;
+        }
+
+        ElasticsearchAsyncClient getClient() {
+            return new ElasticsearchAsyncClient(transport);
+        }
+
+        boolean isConfigIndexName() {
+            return configIndexName;
+        }
+
+        boolean isConfigWaitForActiveShards() {
+            return configWaitForActiveShards;
+        }
+
+        Exchange getExchange() {
+            return exchange;
+        }
+
+        AsyncCallback getCallback() {
+            return callback;
+        }
+
+        Message getMessage() {
+            return exchange.getIn();
+        }
+    }
 }