You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/04 08:04:34 UTC
[camel] branch main updated: CAMEL-18580: camel-elasticsearch - Propose an async producer (#8459)
This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e47c0e34a40 CAMEL-18580: camel-elasticsearch - Propose an async producer (#8459)
e47c0e34a40 is described below
commit e47c0e34a4057481a96a2a6224dd0f1ee4c43699
Author: Nicolas Filotto <es...@users.noreply.github.com>
AuthorDate: Tue Oct 4 10:04:27 2022 +0200
CAMEL-18580: camel-elasticsearch - Propose an async producer (#8459)
## Motivation
In Camel 3.19, a new component based on the Java Api Client has been proposed. This component provides a blocking producer which could be improved by leveraging the asynchronous client to propose a non-blocking/asynchronous producer instead.
## Modifications:
* Rewrites the code of the process method to use the `ElasticsearchAsyncClient` instead of `ElasticsearchClient`
* Removes the volatile keyword from the field `sniffer` as it is always written before and read after the volatile field `client` (piggybacking)
* Uses a mutex to prevent concurrent initialization of the client
* Fixes the error message when no operation has been set
---
.../camel/component/es/ElasticsearchProducer.java | 543 ++++++++++++++-------
1 file changed, 380 insertions(+), 163 deletions(-)
diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
index 51f62962a2c..5cafd7933c5 100644
--- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
+++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/es/ElasticsearchProducer.java
@@ -22,29 +22,42 @@ import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
+import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.WriteResponseBase;
import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.DeleteResponse;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.MgetRequest;
+import co.elastic.clients.elasticsearch.core.MgetResponse;
import co.elastic.clients.elasticsearch.core.MsearchRequest;
+import co.elastic.clients.elasticsearch.core.MsearchResponse;
import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.UpdateRequest;
+import co.elastic.clients.elasticsearch.core.UpdateResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
+import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.IOHelper;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
@@ -64,13 +77,14 @@ import static org.apache.camel.component.es.ElasticsearchConstants.PARAM_SCROLL_
/**
* Represents an Elasticsearch producer.
*/
-public class ElasticsearchProducer extends DefaultProducer {
+class ElasticsearchProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class);
protected final ElasticsearchConfiguration configuration;
+ private final Object mutex = new Object();
private volatile RestClient client;
- private volatile Sniffer sniffer;
+ private Sniffer sniffer;
public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) {
super(endpoint);
@@ -116,188 +130,338 @@ public class ElasticsearchProducer extends DefaultProducer {
}
if (operationConfig == null) {
throw new IllegalArgumentException(
- ElasticsearchConstants.PARAM_OPERATION + " value '" + operationConfig + "' is not supported");
+ ElasticsearchConstants.PARAM_OPERATION + " value is mandatory");
}
return operationConfig;
}
@Override
- public void process(Exchange exchange) throws Exception {
- if (configuration.isDisconnect() && client == null) {
- startClient();
- }
- final ObjectMapper mapper = new ObjectMapper();
- mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
- ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper));
- ElasticsearchClient esClient = new ElasticsearchClient(transport);
- // 2. Index and type will be set by:
- // a. If the incoming body is already an action request
- // b. If the body is not an action request we will use headers if they
- // are set.
- // c. If the body is not an action request and the headers aren't set we
- // will use the configuration.
- // No error is thrown by the component in the event none of the above
- // conditions are met. The java es client
- // will throw.
-
- Message message = exchange.getIn();
- final ElasticsearchOperation operation = resolveOperation(exchange);
-
- // Set the index/type headers on the exchange if necessary. This is used
- // for type conversion.
- boolean configIndexName = false;
- String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
- if (indexName == null) {
- message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
- configIndexName = true;
- }
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ if (configuration.isDisconnect() && client == null) {
+ startClient();
+ }
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper));
+ // 2. Index and type will be set by:
+ // a. If the incoming body is already an action request
+ // b. If the body is not an action request we will use headers if they
+ // are set.
+ // c. If the body is not an action request and the headers aren't set we
+ // will use the configuration.
+ // No error is thrown by the component in the event none of the above
+ // conditions are met. The java es client
+ // will throw.
- Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
- if (size == null) {
- message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize());
- }
+ Message message = exchange.getIn();
+ final ElasticsearchOperation operation = resolveOperation(exchange);
- Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
- if (from == null) {
- message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom());
- }
+ // Set the index/type headers on the exchange if necessary. This is used
+ // for type conversion.
+ boolean configIndexName = false;
+ String indexName = message.getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class);
+ if (indexName == null) {
+ message.setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
+ configIndexName = true;
+ }
- boolean configWaitForActiveShards = false;
- Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
- if (waitForActiveShards == null) {
- message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
- configWaitForActiveShards = true;
- }
+ Integer size = message.getHeader(ElasticsearchConstants.PARAM_SIZE, Integer.class);
+ if (size == null) {
+ message.setHeader(ElasticsearchConstants.PARAM_SIZE, configuration.getSize());
+ }
- Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class);
- if (documentClass == null) {
- documentClass = configuration.getDocumentClass();
- }
+ Integer from = message.getHeader(ElasticsearchConstants.PARAM_FROM, Integer.class);
+ if (from == null) {
+ message.setHeader(ElasticsearchConstants.PARAM_FROM, configuration.getFrom());
+ }
- switch (operation) {
- case Index: {
- IndexRequest.Builder<?> indexRequestBuilder = message.getBody(IndexRequest.Builder.class);
- message.setBody(esClient.index(indexRequestBuilder.build()).id());
- break;
+ boolean configWaitForActiveShards = false;
+ Integer waitForActiveShards = message.getHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
+ if (waitForActiveShards == null) {
+ message.setHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
+ configWaitForActiveShards = true;
}
- case Update: {
- UpdateRequest.Builder updateRequestBuilder = message.getBody(UpdateRequest.Builder.class);
- message.setBody(esClient.update(updateRequestBuilder.build(), documentClass).id());
- break;
+
+ Class<?> documentClass = message.getHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, Class.class);
+ if (documentClass == null) {
+ documentClass = configuration.getDocumentClass();
}
- case GetById: {
- GetRequest.Builder getRequestBuilder = message.getBody(GetRequest.Builder.class);
- if (getRequestBuilder == null) {
- throw new IllegalArgumentException(
- "Wrong body type. Only String or GetRequest.Builder is allowed as a type");
+
+ ActionContext ctx = new ActionContext(exchange, callback, transport, configIndexName, configWaitForActiveShards);
+
+ switch (operation) {
+ case Index: {
+ processIndexAsync(ctx);
+ break;
}
- message.setBody(esClient.get(getRequestBuilder.build(), documentClass));
- break;
- }
- case Bulk: {
- BulkRequest.Builder bulkRequestBuilder = message.getBody(BulkRequest.Builder.class);
- if (bulkRequestBuilder == null) {
- throw new IllegalArgumentException(
- "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
+ case Update: {
+ processUpdateAsync(ctx, documentClass);
+ break;
}
- message.setBody(esClient.bulk(bulkRequestBuilder.build()).items());
- break;
- }
- case Delete: {
- DeleteRequest.Builder deleteRequestBuilder = message.getBody(DeleteRequest.Builder.class);
- if (deleteRequestBuilder == null) {
- throw new IllegalArgumentException(
- "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
+ case GetById: {
+ processGetByIdAsync(ctx, documentClass);
+ break;
}
- message.setBody(esClient.delete(deleteRequestBuilder.build()).result());
- break;
- }
- case DeleteIndex: {
- DeleteIndexRequest.Builder deleteIndexRequestBuilder = message.getBody(DeleteIndexRequest.Builder.class);
- if (deleteIndexRequestBuilder == null) {
- throw new IllegalArgumentException(
- "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
+ case Bulk: {
+ processBulkAsync(ctx);
+ break;
}
- message.setBody(esClient.indices().delete(deleteIndexRequestBuilder.build()).acknowledged());
- break;
- }
- case Exists: {
- ExistsRequest.Builder builder = new ExistsRequest.Builder();
- builder.index(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
- message.setBody(esClient.indices().exists(builder.build()).value());
- break;
- }
- case Search: {
- SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
- if (searchRequestBuilder == null) {
- throw new IllegalArgumentException(
- "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
+ case Delete: {
+ processDeleteAsync(ctx);
+ break;
}
- // is it a scroll request ?
- boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class);
- if (useScroll) {
- int scrollKeepAliveMs
- = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(),
- Integer.class);
- ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>(
- searchRequestBuilder, esClient, scrollKeepAliveMs, exchange, documentClass);
- exchange.getIn().setBody(scrollRequestIterator);
- } else {
- message.setBody(esClient.search(searchRequestBuilder.build(), documentClass).hits());
+ case DeleteIndex: {
+ processDeleteIndexAsync(ctx);
+ break;
}
- break;
- }
- case MultiSearch: {
- MsearchRequest.Builder msearchRequestBuilder = message.getBody(MsearchRequest.Builder.class);
- if (msearchRequestBuilder == null) {
- throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
+ case Exists: {
+ processExistsAsync(ctx);
+ break;
}
- message.setBody(esClient.msearch(msearchRequestBuilder.build(), documentClass).responses());
- break;
- }
- case MultiGet: {
- MgetRequest.Builder mgetRequestBuilder = message.getBody(MgetRequest.Builder.class);
- if (mgetRequestBuilder == null) {
- throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
+ case Search: {
+ SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
+ if (searchRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
+ }
+ // is it a scroll request ?
+ boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class);
+ if (useScroll) {
+ // As a scroll request is expected, for the sake of simplicity, the synchronous mode is preserved
+ int scrollKeepAliveMs
+ = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(),
+ Integer.class);
+ ElasticsearchScrollRequestIterator<?> scrollRequestIterator = new ElasticsearchScrollRequestIterator<>(
+ searchRequestBuilder, new ElasticsearchClient(transport), scrollKeepAliveMs, exchange,
+ documentClass);
+ exchange.getIn().setBody(scrollRequestIterator);
+ cleanup(ctx);
+ callback.done(true);
+ return true;
+ } else {
+ onComplete(
+ new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), documentClass)
+ .thenApply(SearchResponse::hits),
+ ctx);
+ }
+ break;
+ }
+ case MultiSearch: {
+ processMultiSearchAsync(ctx, documentClass);
+ break;
+ }
+ case MultiGet: {
+ processMultiGetAsync(ctx, documentClass);
+ break;
+ }
+ case Ping: {
+ processPingAsync(ctx);
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException(
+ ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
}
- message.setBody(esClient.mget(mgetRequestBuilder.build(), documentClass).docs());
- break;
- }
- case Ping: {
- message.setBody(esClient.ping().value());
- break;
- }
- default: {
- throw new IllegalArgumentException(
- ElasticsearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
}
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
}
+ return false;
+ }
+
+ /**
+ * Executes asynchronously a ping to the Elastic cluster.
+ */
+ private void processPingAsync(ActionContext ctx) {
+ onComplete(
+ ctx.getClient().ping()
+ .thenApply(BooleanResponse::value),
+ ctx);
+ }
+
+ /**
+ * Executes asynchronously a multi-get request.
+ */
+ private void processMultiGetAsync(ActionContext ctx, Class<?> documentClass) {
+ MgetRequest.Builder mgetRequestBuilder = ctx.getMessage().getBody(MgetRequest.Builder.class);
+ if (mgetRequestBuilder == null) {
+ throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().mget(mgetRequestBuilder.build(), documentClass)
+ .thenApply(MgetResponse::docs),
+ ctx);
+ }
- // If we set params via the configuration on this exchange, remove them
- // now. This preserves legacy behavior for this component and enables a
- // use case where one message can be sent to multiple elasticsearch
- // endpoints where the user is relying on the endpoint configuration
- // (index/type) rather than header values. If we do not clear this out
- // sending the same message (index request, for example) to multiple
- // elasticsearch endpoints would have the effect overriding any
- // subsequent endpoint index/type with the first endpoint index/type.
- if (configIndexName) {
- message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+ /**
+ * Executes asynchronously a multi-search request.
+ */
+ private void processMultiSearchAsync(ActionContext ctx, Class<?> documentClass) {
+ MsearchRequest.Builder msearchRequestBuilder = ctx.getMessage().getBody(MsearchRequest.Builder.class);
+ if (msearchRequestBuilder == null) {
+ throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
}
+ onComplete(
+ ctx.getClient().msearch(msearchRequestBuilder.build(), documentClass)
+ .thenApply(MsearchResponse::responses),
+ ctx);
+ }
+
+ /**
+ * Checks asynchronously if a given index exists.
+ */
+ private void processExistsAsync(ActionContext ctx) {
+ ExistsRequest.Builder builder = new ExistsRequest.Builder();
+ builder.index(ctx.getMessage().getHeader(ElasticsearchConstants.PARAM_INDEX_NAME, String.class));
+ onComplete(
+ ctx.getClient().indices().exists(builder.build())
+ .thenApply(BooleanResponse::value),
+ ctx);
+ }
- if (configWaitForActiveShards) {
- message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+ /**
+ * Deletes asynchronously an index.
+ */
+ private void processDeleteIndexAsync(ActionContext ctx) {
+ DeleteIndexRequest.Builder deleteIndexRequestBuilder = ctx.getMessage().getBody(DeleteIndexRequest.Builder.class);
+ if (deleteIndexRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
}
- if (configuration.isDisconnect()) {
- IOHelper.close(transport);
- IOHelper.close(client);
- client = null;
- if (configuration.isEnableSniffer()) {
- IOHelper.close(sniffer);
- sniffer = null;
- }
+ onComplete(
+ ctx.getClient().indices().delete(deleteIndexRequestBuilder.build())
+ .thenApply(DeleteIndexResponse::acknowledged),
+ ctx);
+ }
+
+ /**
+ * Deletes asynchronously a document.
+ */
+ private void processDeleteAsync(ActionContext ctx) {
+ DeleteRequest.Builder deleteRequestBuilder = ctx.getMessage().getBody(DeleteRequest.Builder.class);
+ if (deleteRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().delete(deleteRequestBuilder.build())
+ .thenApply(DeleteResponse::result),
+ ctx);
+ }
+
+ /**
+ * Executes asynchronously bulk operations.
+ */
+ private void processBulkAsync(ActionContext ctx) {
+ BulkRequest.Builder bulkRequestBuilder = ctx.getMessage().getBody(BulkRequest.Builder.class);
+ if (bulkRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
}
+ onComplete(
+ ctx.getClient().bulk(bulkRequestBuilder.build())
+ .thenApply(BulkResponse::items),
+ ctx);
+ }
+
+ /**
+ * Finds asynchronously a document by id.
+ */
+ private void processGetByIdAsync(ActionContext ctx, Class<?> documentClass) {
+ GetRequest.Builder getRequestBuilder = ctx.getMessage().getBody(GetRequest.Builder.class);
+ if (getRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only String or GetRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().get(getRequestBuilder.build(), documentClass),
+ ctx);
+ }
+
+ /**
+ * Updates asynchronously a document.
+ */
+ private void processUpdateAsync(ActionContext ctx, Class<?> documentClass) {
+ UpdateRequest.Builder updateRequestBuilder = ctx.getMessage().getBody(UpdateRequest.Builder.class);
+ onComplete(
+ ctx.getClient().update(updateRequestBuilder.build(), documentClass)
+ .thenApply(r -> ((UpdateResponse<?>) r).id()),
+ ctx);
+ }
+
+ /**
+ * Indexes asynchronously a document.
+ */
+ private void processIndexAsync(ActionContext ctx) {
+ IndexRequest.Builder<?> indexRequestBuilder = ctx.getMessage().getBody(IndexRequest.Builder.class);
+ onComplete(
+ ctx.getClient().index(indexRequestBuilder.build())
+ .thenApply(WriteResponseBase::id),
+ ctx);
+ }
+
+ /**
+ * Add actions to perform once the given future is complete.
+ *
+ * @param future the future to complete with specific actions.
+ * @param ctx the context of the asynchronous task.
+ * @param <T> the result type returned by the future.
+ */
+ private <T> void onComplete(CompletableFuture<T> future, ActionContext ctx) {
+ final Exchange exchange = ctx.getExchange();
+ future.thenAccept(r -> exchange.getIn().setBody(r))
+ .thenAccept(r -> cleanup(ctx))
+ .whenComplete(
+ (r, e) -> {
+ try {
+ if (e != null) {
+ exchange.setException(new CamelExchangeException(
+ "An error occurred while executing the action", exchange, e));
+ }
+ } finally {
+ ctx.getCallback().done(false);
+ }
+ });
+ }
+
+ /**
+ * The cleanup task to execute once everything is done.
+ */
+ private void cleanup(ActionContext ctx) {
+
+ try {
+ Message message = ctx.getMessage();
+
+ // If we set params via the configuration on this exchange, remove them
+ // now. This preserves legacy behavior for this component and enables a
+ // use case where one message can be sent to multiple elasticsearch
+ // endpoints where the user is relying on the endpoint configuration
+ // (index/type) rather than header values. If we do not clear this out
+ // sending the same message (index request, for example) to multiple
+ // elasticsearch endpoints would have the effect overriding any
+ // subsequent endpoint index/type with the first endpoint index/type.
+ if (ctx.isConfigIndexName()) {
+ message.removeHeader(ElasticsearchConstants.PARAM_INDEX_NAME);
+ }
+ if (ctx.isConfigWaitForActiveShards()) {
+ message.removeHeader(ElasticsearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+ }
+ if (configuration.isDisconnect()) {
+ IOHelper.close(ctx.getTransport());
+ if (configuration.isEnableSniffer()) {
+ IOHelper.close(sniffer);
+ sniffer = null;
+ }
+ IOHelper.close(client);
+ client = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not execute the cleanup task", e);
+ }
}
@Override
@@ -310,12 +474,16 @@ public class ElasticsearchProducer extends DefaultProducer {
private void startClient() {
if (client == null) {
- LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName());
- if (configuration.getHostAddressesList() != null
- && !configuration.getHostAddressesList().isEmpty()) {
- client = createClient();
- } else {
- LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+ synchronized (mutex) {
+ if (client == null) {
+ LOG.info("Connecting to the ElasticSearch cluster: {}", configuration.getClusterName());
+ if (configuration.getHostAddressesList() != null
+ && !configuration.getHostAddressesList().isEmpty()) {
+ client = createClient();
+ } else {
+ LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
+ }
+ }
}
}
}
@@ -388,4 +556,53 @@ public class ElasticsearchProducer extends DefaultProducer {
throw new RuntimeException(e);
}
}
+
+ /**
+ * An inner class providing all the information that an asynchronous action could need.
+ */
+ private static class ActionContext {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+ private final ElasticsearchTransport transport;
+ private final boolean configIndexName;
+ private final boolean configWaitForActiveShards;
+
+ ActionContext(Exchange exchange, AsyncCallback callback, ElasticsearchTransport transport, boolean configIndexName,
+ boolean configWaitForActiveShards) {
+ this.exchange = exchange;
+ this.callback = callback;
+ this.transport = transport;
+ this.configIndexName = configIndexName;
+ this.configWaitForActiveShards = configWaitForActiveShards;
+ }
+
+ ElasticsearchTransport getTransport() {
+ return transport;
+ }
+
+ ElasticsearchAsyncClient getClient() {
+ return new ElasticsearchAsyncClient(transport);
+ }
+
+ boolean isConfigIndexName() {
+ return configIndexName;
+ }
+
+ boolean isConfigWaitForActiveShards() {
+ return configWaitForActiveShards;
+ }
+
+ Exchange getExchange() {
+ return exchange;
+ }
+
+ AsyncCallback getCallback() {
+ return callback;
+ }
+
+ Message getMessage() {
+ return exchange.getIn();
+ }
+ }
}