You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by "davsclaus (via GitHub)" <gi...@apache.org> on 2023/12/08 10:44:48 UTC

Re: [PR] CAMEL-20166 : Create component Elasticsearch Rest Client [camel]

davsclaus commented on code in PR #12374:
URL: https://github.com/apache/camel/pull/12374#discussion_r1420255712


##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientProducer.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchRestClientProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRestClientProducer.class);
+    public static final String PUT = "PUT";
+    public static final String DELETE = "DELETE";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    private ElasticsearchRestClientEndpoint endpoint;
+
+    public ElasticsearchRestClientProducer(ElasticsearchRestClientEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // getting configuration from Endpoint
+        ElasticsearchRestClientOperation operation = this.endpoint.getOperation();

Review Comment:
   Better to validate this in component when creating endpoint, or in endpoint when creating proudcer, or in this class in doInit method



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientEndpoint.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.util.List;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * ElasticsearchRestClient component which allows you to interface with Elasticsearch or OpenSearch using the Java Low
+ * level Rest Client
+ * <p>
+ */
+@UriEndpoint(firstVersion = "4.3.0", scheme = "elasticsearch-rest-client",
+             title = "Elasticsearch Low level Rest Client",
+             syntax = "elasticsearch-rest-client:clusterName", producerOnly = true,
+             category = { Category.SEARCH })
+public class ElasticsearchRestClientEndpoint extends DefaultEndpoint {
+    @UriPath
+    @Metadata(required = true)
+    private String clusterName;
+    @UriParam
+    ElasticsearchRestClientOperation operation;
+
+    @UriParam(label = "advanced")

Review Comment:
   autowired = true, label = advanced



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientProducer.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchRestClientProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRestClientProducer.class);
+    public static final String PUT = "PUT";
+    public static final String DELETE = "DELETE";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    private ElasticsearchRestClientEndpoint endpoint;
+
+    public ElasticsearchRestClientProducer(ElasticsearchRestClientEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // getting configuration from Endpoint
+        ElasticsearchRestClientOperation operation = this.endpoint.getOperation();
+        if (operation == null) {
+            LOG.error("Operation value is mandatory");
+            throw new IllegalArgumentException(
+                    "Operation value is mandatory");
+        }
+        String indexName = this.endpoint.getIndexName();
+        if (ObjectHelper.isEmpty(indexName)) {
+            indexName = exchange.getMessage().getHeader(ElasticSearchRestClientConstant.INDEX_NAME, String.class);
+            if (ObjectHelper.isEmpty(indexName)) {
+                LOG.error("Index Name is mandatory");
+                throw new IllegalArgumentException(
+                        "Index Name is mandatory");
+            }
+        }
+
+        // get or create the Rest Client
+        RestClient restClient = getorCreateRestClient();
+
+        try {
+            Request request = generateRequest(exchange, operation, indexName);
+
+            performRequest(exchange, callback, operation, restClient, request);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Generate REST Request depending on content of Exchange
+     *
+     * @param  exchange
+     * @param  operation
+     * @param  indexName
+     * @return
+     */
+    private Request generateRequest(Exchange exchange, ElasticsearchRestClientOperation operation, String indexName) {
+        return switch (operation) {
+            case CREATE_INDEX -> createIndexRequest(indexName, exchange);
+            case DELETE_INDEX -> deleteIndexRequest(indexName);
+            case INDEX_OR_UPDATE -> indexRequest(indexName, exchange);
+            case GET_BY_ID -> getById(indexName, exchange);
+            case SEARCH -> search(indexName, exchange);
+            case DELETE -> delete(indexName, exchange);
+            default -> null;
+        };
+    }
+
+    /**
+     * Async request to Elasticsearch or equivalent Server
+     *
+     * @param exchange
+     * @param callback
+     * @param operation
+     * @param restClient
+     * @param request
+     */
+    private void performRequest(
+            Exchange exchange, AsyncCallback callback, ElasticsearchRestClientOperation operation, RestClient restClient,
+            Request request) {
+        restClient.performRequestAsync(
+                request,
+                new ResponseListener() {
+                    @Override
+                    public void onSuccess(Response response) {
+                        try {
+                            // Get response
+                            String responseBody = EntityUtils.toString(response.getEntity());
+                            // Create a Json Object from the response
+                            JsonObject jsonObject = convertHttpEntityToJsonObject(responseBody);
+                            populateExchange(jsonObject);
+                        } catch (IOException e) {
+                            exchange.setException(e);
+                        }
+
+                        callback.done(true);

Review Comment:
   should be done(false)



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientProducer.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchRestClientProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRestClientProducer.class);
+    public static final String PUT = "PUT";
+    public static final String DELETE = "DELETE";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    private ElasticsearchRestClientEndpoint endpoint;
+
+    public ElasticsearchRestClientProducer(ElasticsearchRestClientEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // getting configuration from Endpoint
+        ElasticsearchRestClientOperation operation = this.endpoint.getOperation();
+        if (operation == null) {
+            LOG.error("Operation value is mandatory");

Review Comment:
   Do not log and throw exception, just throw excpetion.



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientProducer.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchRestClientProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRestClientProducer.class);
+    public static final String PUT = "PUT";
+    public static final String DELETE = "DELETE";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    private ElasticsearchRestClientEndpoint endpoint;
+
+    public ElasticsearchRestClientProducer(ElasticsearchRestClientEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // getting configuration from Endpoint
+        ElasticsearchRestClientOperation operation = this.endpoint.getOperation();
+        if (operation == null) {
+            LOG.error("Operation value is mandatory");
+            throw new IllegalArgumentException(
+                    "Operation value is mandatory");
+        }
+        String indexName = this.endpoint.getIndexName();
+        if (ObjectHelper.isEmpty(indexName)) {
+            indexName = exchange.getMessage().getHeader(ElasticSearchRestClientConstant.INDEX_NAME, String.class);
+            if (ObjectHelper.isEmpty(indexName)) {
+                LOG.error("Index Name is mandatory");
+                throw new IllegalArgumentException(
+                        "Index Name is mandatory");
+            }
+        }
+
+        // get or create the Rest Client
+        RestClient restClient = getorCreateRestClient();
+
+        try {
+            Request request = generateRequest(exchange, operation, indexName);
+
+            performRequest(exchange, callback, operation, restClient, request);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Generate REST Request depending on content of Exchange
+     *
+     * @param  exchange
+     * @param  operation
+     * @param  indexName
+     * @return
+     */
+    private Request generateRequest(Exchange exchange, ElasticsearchRestClientOperation operation, String indexName) {
+        return switch (operation) {
+            case CREATE_INDEX -> createIndexRequest(indexName, exchange);
+            case DELETE_INDEX -> deleteIndexRequest(indexName);
+            case INDEX_OR_UPDATE -> indexRequest(indexName, exchange);
+            case GET_BY_ID -> getById(indexName, exchange);
+            case SEARCH -> search(indexName, exchange);
+            case DELETE -> delete(indexName, exchange);
+            default -> null;
+        };
+    }
+
+    /**
+     * Async request to Elasticsearch or equivalent Server
+     *
+     * @param exchange
+     * @param callback
+     * @param operation
+     * @param restClient
+     * @param request
+     */
+    private void performRequest(
+            Exchange exchange, AsyncCallback callback, ElasticsearchRestClientOperation operation, RestClient restClient,
+            Request request) {
+        restClient.performRequestAsync(
+                request,
+                new ResponseListener() {
+                    @Override
+                    public void onSuccess(Response response) {
+                        try {
+                            // Get response
+                            String responseBody = EntityUtils.toString(response.getEntity());
+                            // Create a Json Object from the response
+                            JsonObject jsonObject = convertHttpEntityToJsonObject(responseBody);
+                            populateExchange(jsonObject);
+                        } catch (IOException e) {

Review Comment:
   Better to catch Exception in general



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientEndpoint.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.util.List;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * ElasticsearchRestClient component which allows you to interface with Elasticsearch or OpenSearch using the Java Low
+ * level Rest Client
+ * <p>
+ */
+@UriEndpoint(firstVersion = "4.3.0", scheme = "elasticsearch-rest-client",
+             title = "Elasticsearch Low level Rest Client",
+             syntax = "elasticsearch-rest-client:clusterName", producerOnly = true,
+             category = { Category.SEARCH })
+public class ElasticsearchRestClientEndpoint extends DefaultEndpoint {
+    @UriPath
+    @Metadata(required = true)
+    private String clusterName;
+    @UriParam
+    ElasticsearchRestClientOperation operation;
+
+    @UriParam(label = "advanced")
+    RestClient restClient;
+
+    @UriParam
+    String indexName;
+
+    @UriParam
+    List<HttpHost> hostAddressesList;

Review Comment:
   Its more tooling friendly to have this as a string with multiple hosts separated by comma



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientEndpoint.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.util.List;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * ElasticsearchRestClient component which allows you to interface with Elasticsearch or OpenSearch using the Java Low
+ * level Rest Client
+ * <p>
+ */
+@UriEndpoint(firstVersion = "4.3.0", scheme = "elasticsearch-rest-client",
+             title = "Elasticsearch Low level Rest Client",
+             syntax = "elasticsearch-rest-client:clusterName", producerOnly = true,
+             category = { Category.SEARCH })
+public class ElasticsearchRestClientEndpoint extends DefaultEndpoint {
+    @UriPath
+    @Metadata(required = true)
+    private String clusterName;
+    @UriParam
+    ElasticsearchRestClientOperation operation;
+
+    @UriParam(label = "advanced")
+    RestClient restClient;
+
+    @UriParam
+    String indexName;
+
+    @UriParam
+    List<HttpHost> hostAddressesList;
+
+    @UriParam(defaultValue = "30000")
+    private int connectionTimeout;
+
+    @UriParam(defaultValue = "30000")
+    private int socketTimeout;
+
+    @UriParam(label = "security")
+    private String user;
+    @UriParam(label = "security")

Review Comment:
   secret = true for username and password



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientEndpoint.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.util.List;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * ElasticsearchRestClient component which allows you to interface with Elasticsearch or OpenSearch using the Java Low
+ * level Rest Client
+ * <p>
+ */
+@UriEndpoint(firstVersion = "4.3.0", scheme = "elasticsearch-rest-client",
+             title = "Elasticsearch Low level Rest Client",
+             syntax = "elasticsearch-rest-client:clusterName", producerOnly = true,
+             category = { Category.SEARCH })
+public class ElasticsearchRestClientEndpoint extends DefaultEndpoint {
+    @UriPath
+    @Metadata(required = true)
+    private String clusterName;
+    @UriParam
+    ElasticsearchRestClientOperation operation;
+
+    @UriParam(label = "advanced")
+    RestClient restClient;
+
+    @UriParam
+    String indexName;
+
+    @UriParam
+    List<HttpHost> hostAddressesList;
+
+    @UriParam(defaultValue = "30000")
+    private int connectionTimeout;
+
+    @UriParam(defaultValue = "30000")
+    private int socketTimeout;
+
+    @UriParam(label = "security")
+    private String user;
+    @UriParam(label = "security")
+    private String password;
+    @UriParam(label = "security")
+    @Metadata(supportFileReference = true)
+    private String certificatePath;
+
+    @UriParam(label = "advanced")
+    private boolean enableSniffer;
+
+    @UriParam(label = "advanced", defaultValue = "60000")
+    private int snifferInterval;

Review Comment:
   the int is 0 by default, so maybe it should also be 60000



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientProducer.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchRestClientProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRestClientProducer.class);
+    public static final String PUT = "PUT";
+    public static final String DELETE = "DELETE";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    private ElasticsearchRestClientEndpoint endpoint;
+
+    public ElasticsearchRestClientProducer(ElasticsearchRestClientEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // getting configuration from Endpoint
+        ElasticsearchRestClientOperation operation = this.endpoint.getOperation();
+        if (operation == null) {
+            LOG.error("Operation value is mandatory");
+            throw new IllegalArgumentException(
+                    "Operation value is mandatory");
+        }
+        String indexName = this.endpoint.getIndexName();
+        if (ObjectHelper.isEmpty(indexName)) {
+            indexName = exchange.getMessage().getHeader(ElasticSearchRestClientConstant.INDEX_NAME, String.class);
+            if (ObjectHelper.isEmpty(indexName)) {
+                LOG.error("Index Name is mandatory");
+                throw new IllegalArgumentException(
+                        "Index Name is mandatory");
+            }
+        }
+
+        // get or create the Rest Client
+        RestClient restClient = getorCreateRestClient();

Review Comment:
   better to create client in doStart



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientProducer.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchRestClientProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRestClientProducer.class);
+    public static final String PUT = "PUT";
+    public static final String DELETE = "DELETE";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    private ElasticsearchRestClientEndpoint endpoint;
+
+    public ElasticsearchRestClientProducer(ElasticsearchRestClientEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // getting configuration from Endpoint
+        ElasticsearchRestClientOperation operation = this.endpoint.getOperation();
+        if (operation == null) {
+            LOG.error("Operation value is mandatory");
+            throw new IllegalArgumentException(
+                    "Operation value is mandatory");
+        }
+        String indexName = this.endpoint.getIndexName();
+        if (ObjectHelper.isEmpty(indexName)) {
+            indexName = exchange.getMessage().getHeader(ElasticSearchRestClientConstant.INDEX_NAME, String.class);
+            if (ObjectHelper.isEmpty(indexName)) {
+                LOG.error("Index Name is mandatory");
+                throw new IllegalArgumentException(
+                        "Index Name is mandatory");
+            }
+        }
+
+        // get or create the Rest Client
+        RestClient restClient = getorCreateRestClient();
+
+        try {
+            Request request = generateRequest(exchange, operation, indexName);
+
+            performRequest(exchange, callback, operation, restClient, request);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Generate REST Request depending on content of Exchange
+     *
+     * @param  exchange
+     * @param  operation
+     * @param  indexName
+     * @return
+     */
+    private Request generateRequest(Exchange exchange, ElasticsearchRestClientOperation operation, String indexName) {
+        return switch (operation) {
+            case CREATE_INDEX -> createIndexRequest(indexName, exchange);
+            case DELETE_INDEX -> deleteIndexRequest(indexName);
+            case INDEX_OR_UPDATE -> indexRequest(indexName, exchange);
+            case GET_BY_ID -> getById(indexName, exchange);
+            case SEARCH -> search(indexName, exchange);
+            case DELETE -> delete(indexName, exchange);
+            default -> null;
+        };
+    }
+
+    /**
+     * Async request to Elasticsearch or equivalent Server
+     *
+     * @param exchange
+     * @param callback
+     * @param operation
+     * @param restClient
+     * @param request
+     */
+    private void performRequest(
+            Exchange exchange, AsyncCallback callback, ElasticsearchRestClientOperation operation, RestClient restClient,
+            Request request) {
+        restClient.performRequestAsync(
+                request,
+                new ResponseListener() {
+                    @Override
+                    public void onSuccess(Response response) {
+                        try {
+                            // Get response
+                            String responseBody = EntityUtils.toString(response.getEntity());
+                            // Create a Json Object from the response
+                            JsonObject jsonObject = convertHttpEntityToJsonObject(responseBody);
+                            populateExchange(jsonObject);
+                        } catch (IOException e) {
+                            exchange.setException(e);
+                        }
+
+                        callback.done(true);
+                    }
+
+                    private JsonObject convertHttpEntityToJsonObject(String httpResponse) throws IOException {
+                        // Jackson ObjectMapper
+                        ObjectMapper objectMapper = new ObjectMapper();
+
+                        // Convert JSON content to Map<String, Object>
+                        Map<String, Object> map = objectMapper.readValue(httpResponse, new TypeReference<>() {
+                        });
+                        // convert to JsonObject
+                        return new JsonObject(map);
+                    }
+
+                    /**
+                     * Generate response Body of the Exchange, depending on operation Type
+                     *
+                     * @param doc
+                     */
+                    private void populateExchange(JsonObject doc) {
+                        switch (operation) {
+                            case INDEX_OR_UPDATE -> exchange.getMessage().setBody(extractID(doc));
+                            case CREATE_INDEX, DELETE_INDEX -> exchange.getMessage().setBody(extractAck(doc));
+                            case DELETE -> exchange.getMessage().setBody(extractDeleted(doc));
+                            case GET_BY_ID -> exchange.getMessage().setBody(extractDocument(doc));
+                            case SEARCH -> exchange.getMessage().setBody(extractSearch(doc));
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        exchange.setException(e);
+                        callback.done(true);

Review Comment:
   done false



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientEndpoint.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.util.List;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.RestClient;
+
+/**
+ * ElasticsearchRestClient component which allows you to interface with Elasticsearch or OpenSearch using the Java Low
+ * level Rest Client
+ * <p>
+ */
+@UriEndpoint(firstVersion = "4.3.0", scheme = "elasticsearch-rest-client",
+             title = "Elasticsearch Low level Rest Client",
+             syntax = "elasticsearch-rest-client:clusterName", producerOnly = true,
+             category = { Category.SEARCH })
+public class ElasticsearchRestClientEndpoint extends DefaultEndpoint {
+    @UriPath
+    @Metadata(required = true)
+    private String clusterName;
+    @UriParam
+    ElasticsearchRestClientOperation operation;
+
+    @UriParam(label = "advanced")
+    RestClient restClient;
+
+    @UriParam
+    String indexName;
+
+    @UriParam
+    List<HttpHost> hostAddressesList;
+
+    @UriParam(defaultValue = "30000")
+    private int connectionTimeout;
+
+    @UriParam(defaultValue = "30000")
+    private int socketTimeout;
+
+    @UriParam(label = "security")
+    private String user;
+    @UriParam(label = "security")
+    private String password;
+    @UriParam(label = "security")
+    @Metadata(supportFileReference = true)
+    private String certificatePath;
+
+    @UriParam(label = "advanced")
+    private boolean enableSniffer;
+
+    @UriParam(label = "advanced", defaultValue = "60000")
+    private int snifferInterval;
+    @UriParam(label = "advanced", defaultValue = "60000")
+    private int sniffAfterFailureDelay;

Review Comment:
   the int is 0 by default, so maybe it should also be 60000



##########
components/camel-elasticsearch-rest-client/src/main/java/org/apache/camel/component/elasticsearch/rest/client/ElasticsearchRestClientProducer.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.elasticsearch.rest.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.Sniffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchRestClientProducer extends DefaultAsyncProducer {
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRestClientProducer.class);
+    public static final String PUT = "PUT";
+    public static final String DELETE = "DELETE";
+    public static final String POST = "POST";
+    public static final String GET = "GET";
+
+    private ElasticsearchRestClientEndpoint endpoint;
+
+    public ElasticsearchRestClientProducer(ElasticsearchRestClientEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        // getting configuration from Endpoint
+        ElasticsearchRestClientOperation operation = this.endpoint.getOperation();
+        if (operation == null) {
+            LOG.error("Operation value is mandatory");
+            throw new IllegalArgumentException(

Review Comment:
   however its wrong to throw exception from the async method you use in Camel.
   Instead you need to set the exception on exchange, and call calldone.done(true) and return true.
   
   So somtimes its better to make a 
   
   doProcess that can throw eception and call this from processing in a try .. .. catch
   
   process
      try 
         return doProcess
      catch
         exchange set exception
          callback done true
          return true
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org