You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/03 08:10:01 UTC
[camel] 01/03: [CAMEL-18837] OpenSearch component
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 36c6c60d9d933aa5da245545165584903329e5f3
Author: Adriano Machado <ad...@redhat.com>
AuthorDate: Tue Jun 27 13:28:46 2023 -0400
[CAMEL-18837] OpenSearch component
---
bom/camel-bom/pom.xml | 5 +
camel-dependencies/pom.xml | 4 +
catalog/camel-allcomponents/pom.xml | 5 +
.../org/apache/camel/catalog/components.properties | 1 +
.../camel/catalog/components/opensearch.json | 72 ++
.../apache/camel/catalog/schemas/camel-spring.xsd | 2 +-
components/camel-opensearch/pom.xml | 121 +++
.../opensearch/OpensearchComponentConfigurer.java | 117 +++
.../opensearch/OpensearchEndpointConfigurer.java | 154 ++++
.../opensearch/OpensearchEndpointUriFactory.java | 87 ++
.../OpensearchActionRequestConverterLoader.java | 66 ++
.../services/org/apache/camel/TypeConverterLoader | 2 +
.../services/org/apache/camel/component.properties | 7 +
.../services/org/apache/camel/component/opensearch | 2 +
.../apache/camel/configurer/opensearch-component | 2 +
.../apache/camel/configurer/opensearch-endpoint | 2 +
.../apache/camel/urifactory/opensearch-endpoint | 2 +
.../camel/component/opensearch/opensearch.json | 72 ++
.../src/main/docs/opensearch-component.adoc | 278 +++++++
.../component/opensearch/OpensearchComponent.java | 237 ++++++
.../OpensearchComponentVerifierExtension.java | 89 ++
.../opensearch/OpensearchConfiguration.java | 316 +++++++
.../component/opensearch/OpensearchConstants.java | 55 ++
.../component/opensearch/OpensearchEndpoint.java | 65 ++
.../component/opensearch/OpensearchOperation.java | 64 ++
.../component/opensearch/OpensearchProducer.java | 553 +++++++++++++
.../OpensearchScrollRequestIterator.java | 148 ++++
.../BulkRequestAggregationStrategy.java | 50 ++
.../OpensearchActionRequestConverter.java | 313 +++++++
.../OpensearchComponentVerifierExtensionTest.java | 65 ++
.../opensearch/integration/OpensearchBulkIT.java | 254 ++++++
.../integration/OpensearchClusterIndexIT.java | 86 ++
.../OpensearchGetSearchDeleteExistsUpdateIT.java | 913 +++++++++++++++++++++
.../opensearch/integration/OpensearchIndexIT.java | 129 +++
.../opensearch/integration/OpensearchPingIT.java | 42 +
.../integration/OpensearchScrollSearchIT.java | 170 ++++
.../integration/OpensearchSizeLimitIT.java | 74 ++
.../integration/OpensearchTestSupport.java | 125 +++
.../src/test/resources/log4j2.properties | 30 +
components/pom.xml | 1 +
.../org/apache/camel/main/components.properties | 1 +
.../modules/ROOT/examples/json/opensearch.json | 1 +
docs/components/modules/ROOT/nav.adoc | 1 +
.../modules/ROOT/pages/opensearch-component.adoc | 1 +
.../component/ComponentsBuilderFactory.java | 13 +
.../dsl/OpensearchComponentBuilderFactory.java | 308 +++++++
.../src/generated/resources/metadata.json | 22 +
.../builder/endpoint/EndpointBuilderFactory.java | 1 +
.../camel/builder/endpoint/EndpointBuilders.java | 1 +
.../builder/endpoint/StaticEndpointBuilders.java | 43 +
.../dsl/OpensearchEndpointBuilderFactory.java | 834 +++++++++++++++++++
.../camel-component-known-dependencies.properties | 1 +
parent/pom.xml | 9 +
.../common/services/SimpleTestServiceBuilder.java | 3 +-
test-infra/camel-test-infra-opensearch/pom.xml | 54 ++
.../src/main/resources/META-INF/MANIFEST.MF | 0
.../opensearch/common/OpenSearchProperties.java | 34 +
.../services/OpenSearchLocalContainerService.java | 119 +++
.../opensearch/services/OpenSearchService.java | 35 +
.../services/OpenSearchServiceFactory.java | 84 ++
.../services/RemoteOpenSearchService.java | 65 ++
test-infra/pom.xml | 1 +
62 files changed, 6408 insertions(+), 3 deletions(-)
diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml
index d82e094b9ce..84519375140 100644
--- a/bom/camel-bom/pom.xml
+++ b/bom/camel-bom/pom.xml
@@ -1467,6 +1467,11 @@
<artifactId>camel-openapi-java</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-opensearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-openstack</artifactId>
diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml
index 49200ea0ad1..c70a1074e37 100644
--- a/camel-dependencies/pom.xml
+++ b/camel-dependencies/pom.xml
@@ -356,6 +356,10 @@
<olingo2-version>2.0.11</olingo2-version>
<olingo4-version>4.8.0</olingo4-version>
<openjpa-version>3.2.2</openjpa-version>
+ <opensearch-java-client-version>2.5.0</opensearch-java-client-version>
+ <opensearch-rest-client-version>2.8.0</opensearch-rest-client-version>
+ <opensearch-testcontainers-version>2.0.0</opensearch-testcontainers-version>
+ <opensearch-version>2.8.0</opensearch-version>
<openstack4j-version>3.10</openstack4j-version>
<opentelemetry-alpha-version>${opentelemetry-version}-alpha</opentelemetry-alpha-version>
<opentelemetry-version>1.26.0</opentelemetry-version>
diff --git a/catalog/camel-allcomponents/pom.xml b/catalog/camel-allcomponents/pom.xml
index e1cca1a01ee..8a76cb7cd7f 100644
--- a/catalog/camel-allcomponents/pom.xml
+++ b/catalog/camel-allcomponents/pom.xml
@@ -1253,6 +1253,11 @@
<artifactId>camel-openapi-java</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-opensearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-openstack</artifactId>
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components.properties
index b9d1583d995..34bf2e3c9fe 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components.properties
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components.properties
@@ -225,6 +225,7 @@ nitrite
oaipmh
olingo2
olingo4
+opensearch
openshift-build-configs
openshift-builds
openshift-deploymentconfigs
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/opensearch.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/opensearch.json
new file mode 100644
index 00000000000..a93b4cf5e11
--- /dev/null
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/opensearch.json
@@ -0,0 +1,72 @@
+{
+ "component": {
+ "kind": "component",
+ "name": "opensearch",
+ "title": "OpenSearch",
+ "description": "Send requests to OpenSearch via Java Client API.",
+ "deprecated": false,
+ "firstVersion": "4.0.0",
+ "label": "search,monitoring",
+ "javaType": "org.apache.camel.component.opensearch.OpensearchComponent",
+ "supportLevel": "Preview",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-opensearch",
+ "version": "4.0.0-SNAPSHOT",
+ "scheme": "opensearch",
+ "extendsScheme": "",
+ "syntax": "opensearch:clusterName",
+ "async": false,
+ "api": false,
+ "consumerOnly": false,
+ "producerOnly": true,
+ "lenientProperties": false
+ },
+ "componentProperties": {
+ "connectionTimeout": { "index": 0, "kind": "property", "displayName": "Connection Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "description": "The time in ms to wait before connection will time out." },
+ "hostAddresses": { "index": 1, "kind": "property", "displayName": "Host Addresses", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Comma separated list with ip:port formatted remote transport addresses to use. The ip and port options must be left blank for hostAddresses to be considered instead." },
+ "lazyStartProducer": { "index": 2, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail [...]
+ "maxRetryTimeout": { "index": 3, "kind": "property", "displayName": "Max Retry Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "description": "The time in ms before retry" },
+ "socketTimeout": { "index": 4, "kind": "property", "displayName": "Socket Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "description": "The timeout in ms to wait before the socket will time out." },
+ "autowiredEnabled": { "index": 5, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching t [...]
+ "client": { "index": 6, "kind": "property", "displayName": "Client", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.opensearch.client.RestClient", "deprecated": false, "autowired": true, "secret": false, "description": "To use an existing configured OpenSearch client, instead of creating a client per endpoint. This allows to customize the client with specific settings." },
+ "enableSniffer": { "index": 7, "kind": "property", "displayName": "Enable Sniffer", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable automatically discover nodes from a running OpenSearch cluster. If this option is used in conjunction with Spring Boot then it's managed by the Spring Boot configuration (see: Disable Sniffer in Sp [...]
+ "sniffAfterFailureDelay": { "index": 8, "kind": "property", "displayName": "Sniff After Failure Delay", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000, "description": "The delay of a sniff execution scheduled after a failure (in milliseconds)" },
+ "snifferInterval": { "index": 9, "kind": "property", "displayName": "Sniffer Interval", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 300000, "description": "The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFailure is disabled or when there are no failures between consecutive sniff executions" },
+ "enableSSL": { "index": 10, "kind": "property", "displayName": "Enable SSL", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable SSL" },
+ "password": { "index": 11, "kind": "property", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for authenticate" },
+ "user": { "index": 12, "kind": "property", "displayName": "User", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Basic authenticate user" }
+ },
+ "headers": {
+ "operation": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "org.apache.camel.component.opensearch.OpensearchOperation", "enum": [ "Index", "Update", "Bulk", "GetById", "MultiGet", "MultiSearch", "Delete", "DeleteIndex", "Search", "Exists", "Ping" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The operation to perform", "constantName": "org.apache.camel.component. [...]
+ "indexId": { "index": 1, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The id of the indexed document.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_INDEX_ID" },
+ "indexName": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the index to act against", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_INDEX_NAME" },
+ "documentClass": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Class", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": "ObjectNode", "description": "The full qualified name of the class of the document to unmarshall", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_DOCUMENT_CLASS" },
+ "waitForActiveShards": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The index creation waits for the write consistency number of shards to be available", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_WAIT_FOR_ACTIVE_SHARDS" },
+ "scrollKeepAliveMs": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The starting index of the response.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_SCROLL_KEEP_ALIVE_MS" },
+ "useScroll": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Set to true to enable scroll usage", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_SCROLL" },
+ "size": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The size of the response.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_SIZE" },
+ "from": { "index": 8, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The starting index of the response.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_FROM" }
+ },
+ "properties": {
+ "clusterName": { "index": 0, "kind": "path", "displayName": "Cluster Name", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Name of the cluster" },
+ "connectionTimeout": { "index": 1, "kind": "parameter", "displayName": "Connection Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The time in ms to wait before connection will timeout." },
+ "disconnect": { "index": 2, "kind": "parameter", "displayName": "Disconnect", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Disconnect after it finish calling the producer" },
+ "from": { "index": 3, "kind": "parameter", "displayName": "From", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Starting index of the response." },
+ "hostAddresses": { "index": 4, "kind": "parameter", "displayName": "Host Addresses", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Comma separated list with ip:port formatted remote transport addresses to use." },
+ "indexName": { "index": 5, "kind": "parameter", "displayName": "Index Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The name of the index to act against" },
+ "maxRetryTimeout": { "index": 6, "kind": "parameter", "displayName": "Max Retry Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The time in ms before retry" },
+ "operation": { "index": 7, "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.opensearch.OpensearchOperation", "enum": [ "Index", "Update", "Bulk", "GetById", "MultiGet", "MultiSearch", "Delete", "DeleteIndex", "Search", "Exists", "Ping" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfigura [...]
+ "scrollKeepAliveMs": { "index": 8, "kind": "parameter", "displayName": "Scroll Keep Alive Ms", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Time in ms during which OpenSearch will keep search context alive" },
+ "size": { "index": 9, "kind": "parameter", "displayName": "Size", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Size of the response." },
+ "socketTimeout": { "index": 10, "kind": "parameter", "displayName": "Socket Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The timeout in ms to wait before the socket will timeout." },
+ "useScroll": { "index": 11, "kind": "parameter", "displayName": "Use Scroll", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Enable scroll usage" },
+ "waitForActiveShards": { "index": 12, "kind": "parameter", "displayName": "Wait For Active Shards", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Index creation waits for the write consistency number of shards to be available" },
+ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...]
+ "documentClass": { "index": 14, "kind": "parameter", "displayName": "Document Class", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.Class<java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ObjectNode", "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The class to use when deserializing the docu [...]
+ "enableSniffer": { "index": 15, "kind": "parameter", "displayName": "Enable Sniffer", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Enable automatically discover nodes from a running OpenSearch cluster. If th [...]
+ "sniffAfterFailureDelay": { "index": 16, "kind": "parameter", "displayName": "Sniff After Failure Delay", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The delay of a sniff execution scheduled after a failure (in [...]
+ "snifferInterval": { "index": 17, "kind": "parameter", "displayName": "Sniffer Interval", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The interval between consecutive ordinary sniff executions in milliseconds. [...]
+ "certificatePath": { "index": 18, "kind": "parameter", "displayName": "Certificate Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The certificate that can be used to access the ES Cluster. It can be loaded by default [...]
+ "enableSSL": { "index": 19, "kind": "parameter", "displayName": "Enable SSL", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Enable SSL" }
+ }
+}
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 649a953edab..2bf707a8ddb 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -439,7 +439,7 @@ Enriches a message with data from a secondary resource
<xs:annotation>
<xs:documentation xml:lang="en">
<![CDATA[
-Camel error handling.
+Error handler settings
]]>
</xs:documentation>
</xs:annotation>
diff --git a/components/camel-opensearch/pom.xml b/components/camel-opensearch/pom.xml
new file mode 100644
index 00000000000..06901b275a2
--- /dev/null
+++ b/components/camel-opensearch/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-opensearch</artifactId>
+ <packaging>jar</packaging>
+ <name>Camel :: OpenSearch Java API Client</name>
+ <description>Camel OpenSearch Java API Client support</description>
+
+ <properties>
+ <!-- OpenSearch container is not available on these platforms -->
+ <skipITs.ppc64le>true</skipITs.ppc64le>
+ <skipITs.s390x>true</skipITs.s390x>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-support</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opensearch.client</groupId>
+ <artifactId>opensearch-java</artifactId>
+ <version>${opensearch-java-client-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opensearch.client</groupId>
+ <artifactId>opensearch-rest-client</artifactId>
+ <version>${opensearch-rest-client-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opensearch.client</groupId>
+ <artifactId>opensearch-rest-client-sniffer</artifactId>
+ <version>${opensearch-rest-client-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <!-- for testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-catalog</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>${awaitility-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- test infra -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-opensearch</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>full</id>
+ <activation>
+ <property>
+ <name>!quickly</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <os.path.data>target/data</os.path.data>
+ </systemPropertyVariables>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchComponentConfigurer.java b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchComponentConfigurer.java
new file mode 100644
index 00000000000..6036c1f4a64
--- /dev/null
+++ b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchComponentConfigurer.java
@@ -0,0 +1,117 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.opensearch;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class OpensearchComponentConfigurer extends PropertyConfigurerSupport implements GeneratedPropertyConfigurer, PropertyConfigurerGetter {
+
+ @Override
+ public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
+ OpensearchComponent target = (OpensearchComponent) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "autowiredenabled":
+ case "autowiredEnabled": target.setAutowiredEnabled(property(camelContext, boolean.class, value)); return true;
+ case "client": target.setClient(property(camelContext, org.opensearch.client.RestClient.class, value)); return true;
+ case "connectiontimeout":
+ case "connectionTimeout": target.setConnectionTimeout(property(camelContext, int.class, value)); return true;
+ case "enablessl":
+ case "enableSSL": target.setEnableSSL(property(camelContext, boolean.class, value)); return true;
+ case "enablesniffer":
+ case "enableSniffer": target.setEnableSniffer(property(camelContext, boolean.class, value)); return true;
+ case "hostaddresses":
+ case "hostAddresses": target.setHostAddresses(property(camelContext, java.lang.String.class, value)); return true;
+ case "lazystartproducer":
+ case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
+ case "maxretrytimeout":
+ case "maxRetryTimeout": target.setMaxRetryTimeout(property(camelContext, int.class, value)); return true;
+ case "password": target.setPassword(property(camelContext, java.lang.String.class, value)); return true;
+ case "sniffafterfailuredelay":
+ case "sniffAfterFailureDelay": target.setSniffAfterFailureDelay(property(camelContext, int.class, value)); return true;
+ case "snifferinterval":
+ case "snifferInterval": target.setSnifferInterval(property(camelContext, int.class, value)); return true;
+ case "sockettimeout":
+ case "socketTimeout": target.setSocketTimeout(property(camelContext, int.class, value)); return true;
+ case "user": target.setUser(property(camelContext, java.lang.String.class, value)); return true;
+ default: return false;
+ }
+ }
+
+ @Override
+ public String[] getAutowiredNames() {
+ return new String[]{"client"};
+ }
+
+ @Override
+ public Class<?> getOptionType(String name, boolean ignoreCase) {
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "autowiredenabled":
+ case "autowiredEnabled": return boolean.class;
+ case "client": return org.opensearch.client.RestClient.class;
+ case "connectiontimeout":
+ case "connectionTimeout": return int.class;
+ case "enablessl":
+ case "enableSSL": return boolean.class;
+ case "enablesniffer":
+ case "enableSniffer": return boolean.class;
+ case "hostaddresses":
+ case "hostAddresses": return java.lang.String.class;
+ case "lazystartproducer":
+ case "lazyStartProducer": return boolean.class;
+ case "maxretrytimeout":
+ case "maxRetryTimeout": return int.class;
+ case "password": return java.lang.String.class;
+ case "sniffafterfailuredelay":
+ case "sniffAfterFailureDelay": return int.class;
+ case "snifferinterval":
+ case "snifferInterval": return int.class;
+ case "sockettimeout":
+ case "socketTimeout": return int.class;
+ case "user": return java.lang.String.class;
+ default: return null;
+ }
+ }
+
+ @Override
+ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+ OpensearchComponent target = (OpensearchComponent) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "autowiredenabled":
+ case "autowiredEnabled": return target.isAutowiredEnabled();
+ case "client": return target.getClient();
+ case "connectiontimeout":
+ case "connectionTimeout": return target.getConnectionTimeout();
+ case "enablessl":
+ case "enableSSL": return target.isEnableSSL();
+ case "enablesniffer":
+ case "enableSniffer": return target.isEnableSniffer();
+ case "hostaddresses":
+ case "hostAddresses": return target.getHostAddresses();
+ case "lazystartproducer":
+ case "lazyStartProducer": return target.isLazyStartProducer();
+ case "maxretrytimeout":
+ case "maxRetryTimeout": return target.getMaxRetryTimeout();
+ case "password": return target.getPassword();
+ case "sniffafterfailuredelay":
+ case "sniffAfterFailureDelay": return target.getSniffAfterFailureDelay();
+ case "snifferinterval":
+ case "snifferInterval": return target.getSnifferInterval();
+ case "sockettimeout":
+ case "socketTimeout": return target.getSocketTimeout();
+ case "user": return target.getUser();
+ default: return null;
+ }
+ }
+}
+
diff --git a/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchEndpointConfigurer.java b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchEndpointConfigurer.java
new file mode 100644
index 00000000000..977ddd07f2d
--- /dev/null
+++ b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchEndpointConfigurer.java
@@ -0,0 +1,154 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.opensearch;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.support.component.PropertyConfigurerSupport;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+public class OpensearchEndpointConfigurer extends PropertyConfigurerSupport implements GeneratedPropertyConfigurer, PropertyConfigurerGetter {
+
+ @Override
+ public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
+ OpensearchEndpoint target = (OpensearchEndpoint) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "certificatepath":
+ case "certificatePath": target.getConfiguration().setCertificatePath(property(camelContext, java.lang.String.class, value)); return true;
+ case "connectiontimeout":
+ case "connectionTimeout": target.getConfiguration().setConnectionTimeout(property(camelContext, int.class, value)); return true;
+ case "disconnect": target.getConfiguration().setDisconnect(property(camelContext, boolean.class, value)); return true;
+ case "documentclass":
+ case "documentClass": target.getConfiguration().setDocumentClass(property(camelContext, java.lang.Class.class, value)); return true;
+ case "enablessl":
+ case "enableSSL": target.getConfiguration().setEnableSSL(property(camelContext, boolean.class, value)); return true;
+ case "enablesniffer":
+ case "enableSniffer": target.getConfiguration().setEnableSniffer(property(camelContext, boolean.class, value)); return true;
+ case "from": target.getConfiguration().setFrom(property(camelContext, java.lang.Integer.class, value)); return true;
+ case "hostaddresses":
+ case "hostAddresses": target.getConfiguration().setHostAddresses(property(camelContext, java.lang.String.class, value)); return true;
+ case "indexname":
+ case "indexName": target.getConfiguration().setIndexName(property(camelContext, java.lang.String.class, value)); return true;
+ case "lazystartproducer":
+ case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
+ case "maxretrytimeout":
+ case "maxRetryTimeout": target.getConfiguration().setMaxRetryTimeout(property(camelContext, int.class, value)); return true;
+ case "operation": target.getConfiguration().setOperation(property(camelContext, org.apache.camel.component.opensearch.OpensearchOperation.class, value)); return true;
+ case "scrollkeepalivems":
+ case "scrollKeepAliveMs": target.getConfiguration().setScrollKeepAliveMs(property(camelContext, int.class, value)); return true;
+ case "size": target.getConfiguration().setSize(property(camelContext, java.lang.Integer.class, value)); return true;
+ case "sniffafterfailuredelay":
+ case "sniffAfterFailureDelay": target.getConfiguration().setSniffAfterFailureDelay(property(camelContext, int.class, value)); return true;
+ case "snifferinterval":
+ case "snifferInterval": target.getConfiguration().setSnifferInterval(property(camelContext, int.class, value)); return true;
+ case "sockettimeout":
+ case "socketTimeout": target.getConfiguration().setSocketTimeout(property(camelContext, int.class, value)); return true;
+ case "usescroll":
+ case "useScroll": target.getConfiguration().setUseScroll(property(camelContext, boolean.class, value)); return true;
+ case "waitforactiveshards":
+ case "waitForActiveShards": target.getConfiguration().setWaitForActiveShards(property(camelContext, int.class, value)); return true;
+ default: return false;
+ }
+ }
+
+ @Override
+ public Class<?> getOptionType(String name, boolean ignoreCase) {
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "certificatepath":
+ case "certificatePath": return java.lang.String.class;
+ case "connectiontimeout":
+ case "connectionTimeout": return int.class;
+ case "disconnect": return boolean.class;
+ case "documentclass":
+ case "documentClass": return java.lang.Class.class;
+ case "enablessl":
+ case "enableSSL": return boolean.class;
+ case "enablesniffer":
+ case "enableSniffer": return boolean.class;
+ case "from": return java.lang.Integer.class;
+ case "hostaddresses":
+ case "hostAddresses": return java.lang.String.class;
+ case "indexname":
+ case "indexName": return java.lang.String.class;
+ case "lazystartproducer":
+ case "lazyStartProducer": return boolean.class;
+ case "maxretrytimeout":
+ case "maxRetryTimeout": return int.class;
+ case "operation": return org.apache.camel.component.opensearch.OpensearchOperation.class;
+ case "scrollkeepalivems":
+ case "scrollKeepAliveMs": return int.class;
+ case "size": return java.lang.Integer.class;
+ case "sniffafterfailuredelay":
+ case "sniffAfterFailureDelay": return int.class;
+ case "snifferinterval":
+ case "snifferInterval": return int.class;
+ case "sockettimeout":
+ case "socketTimeout": return int.class;
+ case "usescroll":
+ case "useScroll": return boolean.class;
+ case "waitforactiveshards":
+ case "waitForActiveShards": return int.class;
+ default: return null;
+ }
+ }
+
+ @Override
+ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+ OpensearchEndpoint target = (OpensearchEndpoint) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "certificatepath":
+ case "certificatePath": return target.getConfiguration().getCertificatePath();
+ case "connectiontimeout":
+ case "connectionTimeout": return target.getConfiguration().getConnectionTimeout();
+ case "disconnect": return target.getConfiguration().isDisconnect();
+ case "documentclass":
+ case "documentClass": return target.getConfiguration().getDocumentClass();
+ case "enablessl":
+ case "enableSSL": return target.getConfiguration().isEnableSSL();
+ case "enablesniffer":
+ case "enableSniffer": return target.getConfiguration().isEnableSniffer();
+ case "from": return target.getConfiguration().getFrom();
+ case "hostaddresses":
+ case "hostAddresses": return target.getConfiguration().getHostAddresses();
+ case "indexname":
+ case "indexName": return target.getConfiguration().getIndexName();
+ case "lazystartproducer":
+ case "lazyStartProducer": return target.isLazyStartProducer();
+ case "maxretrytimeout":
+ case "maxRetryTimeout": return target.getConfiguration().getMaxRetryTimeout();
+ case "operation": return target.getConfiguration().getOperation();
+ case "scrollkeepalivems":
+ case "scrollKeepAliveMs": return target.getConfiguration().getScrollKeepAliveMs();
+ case "size": return target.getConfiguration().getSize();
+ case "sniffafterfailuredelay":
+ case "sniffAfterFailureDelay": return target.getConfiguration().getSniffAfterFailureDelay();
+ case "snifferinterval":
+ case "snifferInterval": return target.getConfiguration().getSnifferInterval();
+ case "sockettimeout":
+ case "socketTimeout": return target.getConfiguration().getSocketTimeout();
+ case "usescroll":
+ case "useScroll": return target.getConfiguration().isUseScroll();
+ case "waitforactiveshards":
+ case "waitForActiveShards": return target.getConfiguration().getWaitForActiveShards();
+ default: return null;
+ }
+ }
+
+ @Override
+ public Object getCollectionValueType(Object target, String name, boolean ignoreCase) {
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "documentclass":
+ case "documentClass": return java.lang.Object.class;
+ default: return null;
+ }
+ }
+}
+
diff --git a/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchEndpointUriFactory.java b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchEndpointUriFactory.java
new file mode 100644
index 00000000000..587f2677d83
--- /dev/null
+++ b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/OpensearchEndpointUriFactory.java
@@ -0,0 +1,87 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.opensearch;
+
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.spi.EndpointUriFactory;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+public class OpensearchEndpointUriFactory extends org.apache.camel.support.component.EndpointUriFactorySupport implements EndpointUriFactory {
+
+ private static final String BASE = ":clusterName";
+
+ private static final Set<String> PROPERTY_NAMES;
+ private static final Set<String> SECRET_PROPERTY_NAMES;
+ private static final Set<String> MULTI_VALUE_PREFIXES;
+ static {
+ Set<String> props = new HashSet<>(20);
+ props.add("certificatePath");
+ props.add("clusterName");
+ props.add("connectionTimeout");
+ props.add("disconnect");
+ props.add("documentClass");
+ props.add("enableSSL");
+ props.add("enableSniffer");
+ props.add("from");
+ props.add("hostAddresses");
+ props.add("indexName");
+ props.add("lazyStartProducer");
+ props.add("maxRetryTimeout");
+ props.add("operation");
+ props.add("scrollKeepAliveMs");
+ props.add("size");
+ props.add("sniffAfterFailureDelay");
+ props.add("snifferInterval");
+ props.add("socketTimeout");
+ props.add("useScroll");
+ props.add("waitForActiveShards");
+ PROPERTY_NAMES = Collections.unmodifiableSet(props);
+ SECRET_PROPERTY_NAMES = Collections.emptySet();
+ MULTI_VALUE_PREFIXES = Collections.emptySet();
+ }
+
+ @Override
+ public boolean isEnabled(String scheme) {
+ return "opensearch".equals(scheme);
+ }
+
+ @Override
+ public String buildUri(String scheme, Map<String, Object> properties, boolean encode) throws URISyntaxException {
+ String syntax = scheme + BASE;
+ String uri = syntax;
+
+ Map<String, Object> copy = new HashMap<>(properties);
+
+ uri = buildPathParameter(syntax, uri, "clusterName", null, true, copy);
+ uri = buildQueryParameters(uri, copy, encode);
+ return uri;
+ }
+
+ @Override
+ public Set<String> propertyNames() {
+ return PROPERTY_NAMES;
+ }
+
+ @Override
+ public Set<String> secretPropertyNames() {
+ return SECRET_PROPERTY_NAMES;
+ }
+
+ @Override
+ public Set<String> multiValuePrefixes() {
+ return MULTI_VALUE_PREFIXES;
+ }
+
+ @Override
+ public boolean isLenientProperties() {
+ return false;
+ }
+}
+
diff --git a/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/converter/OpensearchActionRequestConverterLoader.java b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/converter/OpensearchActionRequestConverterLoader.java
new file mode 100644
index 00000000000..aa0cb7e6d8c
--- /dev/null
+++ b/components/camel-opensearch/src/generated/java/org/apache/camel/component/opensearch/converter/OpensearchActionRequestConverterLoader.java
@@ -0,0 +1,66 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.component.opensearch.converter;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.DeferredContextBinding;
+import org.apache.camel.Exchange;
+import org.apache.camel.TypeConversionException;
+import org.apache.camel.TypeConverterLoaderException;
+import org.apache.camel.spi.TypeConverterLoader;
+import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.support.SimpleTypeConverter;
+import org.apache.camel.support.TypeConverterSupport;
+import org.apache.camel.util.DoubleMap;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@SuppressWarnings("unchecked")
+@DeferredContextBinding
+public final class OpensearchActionRequestConverterLoader implements TypeConverterLoader, CamelContextAware {
+
+ private CamelContext camelContext;
+
+ public OpensearchActionRequestConverterLoader() {
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void load(TypeConverterRegistry registry) throws TypeConverterLoaderException {
+ registerConverters(registry);
+ }
+
+ private void registerConverters(TypeConverterRegistry registry) {
+ addTypeConverter(registry, org.opensearch.client.opensearch.core.BulkRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toBulkRequestBuilder(value, exchange));
+ addTypeConverter(registry, org.opensearch.client.opensearch.core.DeleteRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toDeleteRequestBuilder(value, exchange));
+ addTypeConverter(registry, org.opensearch.client.opensearch.core.GetRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toGetRequestBuilder(value, exchange));
+ addTypeConverter(registry, org.opensearch.client.opensearch.core.IndexRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toIndexRequestBuilder(value, exchange));
+ addTypeConverter(registry, org.opensearch.client.opensearch.core.MgetRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toMgetRequestBuilder(value, exchange));
+ addTypeConverter(registry, org.opensearch.client.opensearch.core.SearchRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toSearchRequestBuilder(value, exchange));
+ addTypeConverter(registry, org.opensearch.client.opensearch.core.UpdateRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toUpdateRequestBuilder(value, exchange));
+ addTypeConverter(registry, org.opensearch.client.opensearch.indices.DeleteIndexRequest.Builder.class, java.lang.Object.class, false,
+ (type, exchange, value) -> org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverter.toDeleteIndexRequestBuilder(value, exchange));
+ }
+
+ private static void addTypeConverter(TypeConverterRegistry registry, Class<?> toType, Class<?> fromType, boolean allowNull, SimpleTypeConverter.ConversionMethod method) {
+ registry.addTypeConverter(toType, fromType, new SimpleTypeConverter(allowNull, method));
+ }
+
+}
diff --git a/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader
new file mode 100644
index 00000000000..299922b60c8
--- /dev/null
+++ b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+org.apache.camel.component.opensearch.converter.OpensearchActionRequestConverterLoader
diff --git a/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/component.properties b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/component.properties
new file mode 100644
index 00000000000..386c74638d0
--- /dev/null
+++ b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/component.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+components=opensearch
+groupId=org.apache.camel
+artifactId=camel-opensearch
+version=4.0.0-SNAPSHOT
+projectName=Camel :: OpenSearch Java API Client
+projectDescription=Camel OpenSearch Java API Client support
diff --git a/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/component/opensearch b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/component/opensearch
new file mode 100644
index 00000000000..1af3208c78f
--- /dev/null
+++ b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/component/opensearch
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.opensearch.OpensearchComponent
diff --git a/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/configurer/opensearch-component b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/configurer/opensearch-component
new file mode 100644
index 00000000000..71e1abd733f
--- /dev/null
+++ b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/configurer/opensearch-component
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.opensearch.OpensearchComponentConfigurer
diff --git a/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/configurer/opensearch-endpoint b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/configurer/opensearch-endpoint
new file mode 100644
index 00000000000..ee76775b276
--- /dev/null
+++ b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/configurer/opensearch-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.opensearch.OpensearchEndpointConfigurer
diff --git a/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/urifactory/opensearch-endpoint b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/urifactory/opensearch-endpoint
new file mode 100644
index 00000000000..f8936c0655c
--- /dev/null
+++ b/components/camel-opensearch/src/generated/resources/META-INF/services/org/apache/camel/urifactory/opensearch-endpoint
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.opensearch.OpensearchEndpointUriFactory
diff --git a/components/camel-opensearch/src/generated/resources/org/apache/camel/component/opensearch/opensearch.json b/components/camel-opensearch/src/generated/resources/org/apache/camel/component/opensearch/opensearch.json
new file mode 100644
index 00000000000..a93b4cf5e11
--- /dev/null
+++ b/components/camel-opensearch/src/generated/resources/org/apache/camel/component/opensearch/opensearch.json
@@ -0,0 +1,72 @@
+{
+ "component": {
+ "kind": "component",
+ "name": "opensearch",
+ "title": "OpenSearch",
+ "description": "Send requests to OpenSearch via Java Client API.",
+ "deprecated": false,
+ "firstVersion": "4.0.0",
+ "label": "search,monitoring",
+ "javaType": "org.apache.camel.component.opensearch.OpensearchComponent",
+ "supportLevel": "Preview",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-opensearch",
+ "version": "4.0.0-SNAPSHOT",
+ "scheme": "opensearch",
+ "extendsScheme": "",
+ "syntax": "opensearch:clusterName",
+ "async": false,
+ "api": false,
+ "consumerOnly": false,
+ "producerOnly": true,
+ "lenientProperties": false
+ },
+ "componentProperties": {
+ "connectionTimeout": { "index": 0, "kind": "property", "displayName": "Connection Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "description": "The time in ms to wait before connection will time out." },
+ "hostAddresses": { "index": 1, "kind": "property", "displayName": "Host Addresses", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Comma separated list with ip:port formatted remote transport addresses to use. The ip and port options must be left blank for hostAddresses to be considered instead." },
+ "lazyStartProducer": { "index": 2, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail [...]
+ "maxRetryTimeout": { "index": 3, "kind": "property", "displayName": "Max Retry Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "description": "The time in ms before retry" },
+ "socketTimeout": { "index": 4, "kind": "property", "displayName": "Socket Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "description": "The timeout in ms to wait before the socket will time out." },
+ "autowiredEnabled": { "index": 5, "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching t [...]
+ "client": { "index": 6, "kind": "property", "displayName": "Client", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.opensearch.client.RestClient", "deprecated": false, "autowired": true, "secret": false, "description": "To use an existing configured OpenSearch client, instead of creating a client per endpoint. This allows to customize the client with specific settings." },
+ "enableSniffer": { "index": 7, "kind": "property", "displayName": "Enable Sniffer", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable automatically discover nodes from a running OpenSearch cluster. If this option is used in conjunction with Spring Boot then it's managed by the Spring Boot configuration (see: Disable Sniffer in Sp [...]
+ "sniffAfterFailureDelay": { "index": 8, "kind": "property", "displayName": "Sniff After Failure Delay", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000, "description": "The delay of a sniff execution scheduled after a failure (in milliseconds)" },
+ "snifferInterval": { "index": 9, "kind": "property", "displayName": "Sniffer Interval", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 300000, "description": "The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFailure is disabled or when there are no failures between consecutive sniff executions" },
+ "enableSSL": { "index": 10, "kind": "property", "displayName": "Enable SSL", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enable SSL" },
+ "password": { "index": 11, "kind": "property", "displayName": "Password", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for authenticate" },
+ "user": { "index": 12, "kind": "property", "displayName": "User", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Basic authenticate user" }
+ },
+ "headers": {
+ "operation": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "org.apache.camel.component.opensearch.OpensearchOperation", "enum": [ "Index", "Update", "Bulk", "GetById", "MultiGet", "MultiSearch", "Delete", "DeleteIndex", "Search", "Exists", "Ping" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The operation to perform", "constantName": "org.apache.camel.component. [...]
+ "indexId": { "index": 1, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The id of the indexed document.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_INDEX_ID" },
+ "indexName": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the index to act against", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_INDEX_NAME" },
+ "documentClass": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Class", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": "ObjectNode", "description": "The full qualified name of the class of the document to unmarshall", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_DOCUMENT_CLASS" },
+ "waitForActiveShards": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The index creation waits for the write consistency number of shards to be available", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_WAIT_FOR_ACTIVE_SHARDS" },
+ "scrollKeepAliveMs": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The starting index of the response.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_SCROLL_KEEP_ALIVE_MS" },
+ "useScroll": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Set to true to enable scroll usage", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_SCROLL" },
+ "size": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The size of the response.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_SIZE" },
+ "from": { "index": 8, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The starting index of the response.", "constantName": "org.apache.camel.component.opensearch.OpensearchConstants#PARAM_FROM" }
+ },
+ "properties": {
+ "clusterName": { "index": 0, "kind": "path", "displayName": "Cluster Name", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Name of the cluster" },
+ "connectionTimeout": { "index": 1, "kind": "parameter", "displayName": "Connection Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The time in ms to wait before connection will timeout." },
+ "disconnect": { "index": 2, "kind": "parameter", "displayName": "Disconnect", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Disconnect after it finish calling the producer" },
+ "from": { "index": 3, "kind": "parameter", "displayName": "From", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Starting index of the response." },
+ "hostAddresses": { "index": 4, "kind": "parameter", "displayName": "Host Addresses", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Comma separated list with ip:port formatted remote transport addresses to use." },
+ "indexName": { "index": 5, "kind": "parameter", "displayName": "Index Name", "group": "producer", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The name of the index to act against" },
+ "maxRetryTimeout": { "index": 6, "kind": "parameter", "displayName": "Max Retry Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The time in ms before retry" },
+ "operation": { "index": 7, "kind": "parameter", "displayName": "Operation", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.opensearch.OpensearchOperation", "enum": [ "Index", "Update", "Bulk", "GetById", "MultiGet", "MultiSearch", "Delete", "DeleteIndex", "Search", "Exists", "Ping" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfigura [...]
+ "scrollKeepAliveMs": { "index": 8, "kind": "parameter", "displayName": "Scroll Keep Alive Ms", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Time in ms during which OpenSearch will keep search context alive" },
+ "size": { "index": 9, "kind": "parameter", "displayName": "Size", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Size of the response." },
+ "socketTimeout": { "index": 10, "kind": "parameter", "displayName": "Socket Timeout", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The timeout in ms to wait before the socket will timeout." },
+ "useScroll": { "index": 11, "kind": "parameter", "displayName": "Use Scroll", "group": "producer", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Enable scroll usage" },
+ "waitForActiveShards": { "index": 12, "kind": "parameter", "displayName": "Wait For Active Shards", "group": "producer", "label": "", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Index creation waits for the write consistency number of shards to be available" },
+ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...]
+ "documentClass": { "index": 14, "kind": "parameter", "displayName": "Document Class", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.Class<java.lang.Object>", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ObjectNode", "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The class to use when deserializing the docu [...]
+ "enableSniffer": { "index": 15, "kind": "parameter", "displayName": "Enable Sniffer", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Enable automatically discover nodes from a running OpenSearch cluster. If th [...]
+ "sniffAfterFailureDelay": { "index": 16, "kind": "parameter", "displayName": "Sniff After Failure Delay", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 60000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The delay of a sniff execution scheduled after a failure (in [...]
+ "snifferInterval": { "index": 17, "kind": "parameter", "displayName": "Sniffer Interval", "group": "advanced", "label": "advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The interval between consecutive ordinary sniff executions in milliseconds. [...]
+ "certificatePath": { "index": 18, "kind": "parameter", "displayName": "Certificate Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "The certificate that can be used to access the ES Cluster. It can be loaded by default [...]
+ "enableSSL": { "index": 19, "kind": "parameter", "displayName": "Enable SSL", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.opensearch.OpensearchConfiguration", "configurationField": "configuration", "description": "Enable SSL" }
+ }
+}
diff --git a/components/camel-opensearch/src/main/docs/opensearch-component.adoc b/components/camel-opensearch/src/main/docs/opensearch-component.adoc
new file mode 100644
index 00000000000..682a66ecfdb
--- /dev/null
+++ b/components/camel-opensearch/src/main/docs/opensearch-component.adoc
@@ -0,0 +1,278 @@
+= OpenSearch Component
+:doctitle: OpenSearch
+:shortname: opensearch
+:artifactid: camel-opensearch
+:description: Send requests to OpenSearch via Java Client API.
+:since: 4.0
+:supportlevel: Preview
+:tabs-sync-option:
+:component-header: Only producer is supported
+//Manually maintained attributes
+:camel-spring-boot-name: opensearch
+
+*Since Camel {since}*
+
+*{component-header}*
+
+The OpenSearch component allows you to interface with an
+https://opensearch.org/[OpenSearch] 2.8.x API using the Java API Client library.
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,xml]
+----
+<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-opensearch</artifactId>
+ <version>x.x.x</version>
+ <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+== URI format
+
+----
+opensearch://clusterName[?options]
+----
+
+
+// component-configure options: START
+
+// component-configure options: END
+
+// component options: START
+include::partial$component-configure-options.adoc[]
+include::partial$component-endpoint-options.adoc[]
+// component options: END
+
+// endpoint options: START
+
+// endpoint options: END
+
+// component headers: START
+include::partial$component-endpoint-headers.adoc[]
+// component headers: END
+
+== Message Operations
+
+The following https://opensearch.org/ operations are currently supported. Simply
+set an endpoint URI option or exchange header with a key of "operation"
+and a value set to one of the following. Some operations also require
+other parameters or the message body to be set.
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|===
+|operation |message body |description
+
+|Index |*Map*, *String*, *byte[]*, *Reader*, *InputStream* or *IndexRequest.Builder* content to index |Adds content to an index and returns the content's indexId in the body.
+You can set the name of the target index by setting the message header with the key "indexName".
+You can set the indexId by setting the message header with
+the key "indexId".
+
+|GetById |*String* or *GetRequest.Builder* index id of content to retrieve |Retrieves the document corresponding to the given index id and returns a GetResponse object in the body.
+You can set the name of the target index by setting the message header with the key "indexName".
+You can set the type of document by setting the message header with
+the key "documentClass".
+
+|Delete |*String* or *DeleteRequest.Builder* index id of content to delete |Deletes the specified indexName and returns a Result object in the body.
+You can set the name of the target index by setting the message header with the key "indexName".
+
+|DeleteIndex |*String* or *DeleteIndexRequest.Builder* index name of the index to delete |Deletes the specified indexName and returns a status code in the body.
+You can set the name of the target index by setting the message header with the key "indexName".
+
+|Bulk |*Iterable* or *BulkRequest.Builder* of any type that is already accepted (DeleteOperation.Builder for delete operation, UpdateOperation.Builder for update operation, CreateOperation.Builder for create operation, byte[], InputStream, String, Reader, Map or any document type for index operation) | Adds/Updates/Deletes content from/to an index and returns a List<BulkResponseItem> object in the body
+You can set the name of the target index by setting the message header with the key "indexName".
+
+|Search |*Map*, *String* or *SearchRequest.Builder* |Search the content with the map of query string.
+You can set the name of the target index by setting the message header with the key "indexName".
+You can set the number of hits to return by setting the message header with the key "size".
+You can set the starting document offset by setting the message header with the key "from".
+
+|MultiSearch |*MsearchRequest.Builder* |Multiple search in one
+
+|MultiGet |*Iterable<String>* or *MgetRequest.Builder* the id of the document to retrieve |Multiple get in one
+
+You can set the name of the target index by setting the message header with the key "indexName".
+
+|Exists |None |Checks whether the index exists or not and returns a Boolean flag in the body.
+
+You must set the name of the target index by setting the message header with the key "indexName".
+
+|Update |*byte[]*, *InputStream*, *String*, *Reader*, *Map* or any document type content to update |Updates content to an index and returns the content's indexId in the body.
+You can set the name of the target index by setting the message header with the key "indexName".
+You can set the indexId by setting the message header with
+the key "indexId".
+
+|Ping |None |Pings the OpenSearch cluster and returns true if the ping succeeded, false otherwise
+
+|===
+
+== Configure the component and enable basic authentication
+To use the OpenSearch component it has to be configured with a minimum configuration.
+
+[source,java]
+----
+OpensearchComponent opensearchComponent = new OpensearchComponent();
+opensearchComponent.setHostAddresses("opensearch-host:9200");
+camelContext.addComponent("opensearch", opensearchComponent);
+----
+
+For basic authentication with OpenSearch or using reverse http proxy in front of the OpenSearch cluster, simply setup
+basic authentication and SSL on the component like the example below
+
+[source,java]
+----
+OpenSearchComponent opensearchComponent = new OpenSearchComponent();
+opensearchComponent.setHostAddresses("opensearch-host:9200");
+opensearchComponent.setUser("opensearchuser");
+opensearchComponent.setPassword("secure!!");
+
+camelContext.addComponent("opensearch", opensearchComponent);
+----
+
+== Index Example
+
+Below is a simple INDEX example
+
+[source,java]
+----
+from("direct:index")
+ .to("opensearch://opensearch?operation=Index&indexName=twitter");
+----
+
+[source,xml]
+----
+<route>
+ <from uri="direct:index"/>
+ <to uri="opensearch://opensearch?operation=Index&indexName=twitter"/>
+</route>
+----
+
+*For this operation you'll need to specify a indexId header.*
+
+A client would simply need to pass a body message containing a Map to
+the route. The result body contains the indexId created.
+
+[source,java]
+----
+Map<String, String> map = new HashMap<String, String>();
+map.put("content", "test");
+String indexId = template.requestBody("direct:index", map, String.class);
+----
+
+== Search Example
+
+Searching on specific field(s) and value use the Operation ´Search´.
+Pass in the query JSON String or the Map
+
+[source,java]
+----
+from("direct:search")
+ .to("opensearch://opensearch?operation=Search&indexName=twitter");
+----
+
+[source,xml]
+----
+<route>
+ <from uri="direct:search"/>
+ <to uri="opensearch://opensearch?operation=Search&indexName=twitter"/>
+</route>
+----
+
+[source,java]
+----
+String query = "{\"query\":{\"match\":{\"doc.content\":\"new release of ApacheCamel\"}}}";
+HitsMetadata<?> response = template.requestBody("direct:search", query, HitsMetadata.class);
+
+----
+
+Search on specific field(s) using Map.
+
+[source,java]
+----
+Map<String, Object> actualQuery = new HashMap<>();
+actualQuery.put("doc.content", "new release of ApacheCamel");
+
+Map<String, Object> match = new HashMap<>();
+match.put("match", actualQuery);
+
+Map<String, Object> query = new HashMap<>();
+query.put("query", match);
+HitsMetadata<?> response = template.requestBody("direct:search", query, HitsMetadata.class);
+
+----
+
+Search using OpenSearch scroll api in order to fetch all results.
+
+[source,java]
+----
+from("direct:search")
+ .to("opensearch://opensearch?operation=Search&indexName=twitter&useScroll=true&scrollKeepAliveMs=30000");
+----
+
+[source,xml]
+----
+<route>
+ <from uri="direct:search"/>
+ <to uri="opensearch://opensearch?operation=Search&indexName=twitter&useScroll=true&scrollKeepAliveMs=30000"/>
+</route>
+----
+
+[source,java]
+----
+String query = "{\"query\":{\"match\":{\"doc.content\":\"new release of ApacheCamel\"}}}";
+try (OpenSearchScrollRequestIterator response = template.requestBody("direct:search", query, OpenSearchScrollRequestIterator.class)) {
+ // do something smart with results
+}
+----
+
+xref:eips:split-eip.adoc[Split EIP] can also be used.
+
+[source,java]
+----
+from("direct:search")
+ .to("opensearch://opensearch?operation=Search&indexName=twitter&useScroll=true&scrollKeepAliveMs=30000")
+ .split()
+ .body()
+ .streaming()
+ .to("mock:output")
+ .end();
+----
+
+== MultiSearch Example
+
+MultiSearching on specific field(s) and value use the Operation ´MultiSearch´.
+Pass in the MultiSearchRequest instance
+
+[source,java]
+----
+from("direct:multiSearch")
+ .to("opensearch://opensearch?operation=MultiSearch");
+----
+
+[source,xml]
+----
+<route>
+ <from uri="direct:multiSearch"/>
+ <to uri="opensearch://opensearch?operation=MultiSearch"/>
+</route>
+----
+
+MultiSearch on specific field(s)
+
+[source,java]
+----
+MsearchRequest.Builder builder = new MsearchRequest.Builder().index("twitter").searches(
+ new RequestItem.Builder().header(new MultisearchHeader.Builder().build())
+ .body(new MultisearchBody.Builder().query(b -> b.matchAll(x -> x)).build()).build(),
+ new RequestItem.Builder().header(new MultisearchHeader.Builder().build())
+ .body(new MultisearchBody.Builder().query(b -> b.matchAll(x -> x)).build()).build());
+List<MultiSearchResponseItem<?>> response = template.requestBody("direct:multiSearch", builder, List.class);
+----
+
+== Document type
+
+For all the search operations, it is possible to indicate the type of document to retrieve in order to get the result already unmarshalled with the expected type.
+
+The document type can be set using the header "documentClass" or via the uri parameter of the same name.
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchComponent.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchComponent.java
new file mode 100644
index 00000000000..2d84999455a
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchComponent.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.http.HttpHost;
+import org.opensearch.client.RestClient;
+
+/**
+ * Represents the component that manages {@link OpensearchEndpoint}.
+ */
+@Component("opensearch")
+public class OpensearchComponent extends DefaultComponent {
+
+ @Metadata(label = "advanced", autowired = true)
+ private RestClient client;
+ @Metadata
+ private String hostAddresses;
+ @Metadata(defaultValue = "" + OpensearchConstants.DEFAULT_SOCKET_TIMEOUT)
+ private int socketTimeout = OpensearchConstants.DEFAULT_SOCKET_TIMEOUT;
+ @Metadata(defaultValue = "" + OpensearchConstants.MAX_RETRY_TIMEOUT)
+ private int maxRetryTimeout = OpensearchConstants.MAX_RETRY_TIMEOUT;
+ @Metadata(defaultValue = "" + OpensearchConstants.DEFAULT_CONNECTION_TIMEOUT)
+ private int connectionTimeout = OpensearchConstants.DEFAULT_CONNECTION_TIMEOUT;
+ @Metadata(label = "security", secret = true)
+ private String user;
+ @Metadata(label = "security", secret = true)
+ private String password;
+ @Metadata(label = "security")
+ private boolean enableSSL;
+ @Metadata(label = "advanced")
+ private boolean enableSniffer;
+ @Metadata(label = "advanced", defaultValue = "" + OpensearchConstants.DEFAULT_SNIFFER_INTERVAL)
+ private int snifferInterval = OpensearchConstants.DEFAULT_SNIFFER_INTERVAL;
+ @Metadata(label = "advanced", defaultValue = "" + OpensearchConstants.DEFAULT_AFTER_FAILURE_DELAY)
+ private int sniffAfterFailureDelay = OpensearchConstants.DEFAULT_AFTER_FAILURE_DELAY;
+
+ public OpensearchComponent() {
+ this(null);
+ }
+
+ public OpensearchComponent(CamelContext context) {
+ super(context);
+ registerExtension(new OpensearchComponentVerifierExtension());
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ OpensearchConfiguration config = new OpensearchConfiguration();
+ config.setHostAddresses(this.getHostAddresses());
+ config.setSocketTimeout(this.getSocketTimeout());
+ config.setMaxRetryTimeout(this.getMaxRetryTimeout());
+ config.setConnectionTimeout(this.getConnectionTimeout());
+ config.setUser(this.getUser());
+ config.setEnableSSL(this.isEnableSSL());
+ config.setPassword(this.getPassword());
+ config.setEnableSniffer(this.isEnableSniffer());
+ config.setSnifferInterval(this.getSnifferInterval());
+ config.setSniffAfterFailureDelay(this.getSniffAfterFailureDelay());
+ config.setClusterName(remaining);
+
+ Endpoint endpoint = new OpensearchEndpoint(uri, this, config, client);
+ setProperties(endpoint, parameters);
+ config.setHostAddressesList(parseHostAddresses(config.getHostAddresses(), config));
+
+ return endpoint;
+ }
+
+ private List<HttpHost> parseHostAddresses(String ipsString, OpensearchConfiguration config) {
+ if (ipsString == null || ipsString.isEmpty()) {
+ return null;
+ }
+ List<String> addressesStr = Arrays.asList(ipsString.split(","));
+ List<HttpHost> addressesTrAd = new ArrayList<>(addressesStr.size());
+ for (String address : addressesStr) {
+ String[] split = address.split(":");
+ String hostname;
+ if (split.length > 0) {
+ hostname = split[0];
+ } else {
+ throw new IllegalArgumentException();
+ }
+ int port = split.length > 1 ? Integer.parseInt(split[1]) : OpensearchConstants.DEFAULT_PORT;
+ addressesTrAd.add(new HttpHost(hostname, port, config.isEnableSSL() ? "HTTPS" : "HTTP"));
+ }
+ return addressesTrAd;
+ }
+
+ public RestClient getClient() {
+ return client;
+ }
+
+ /**
+ * To use an existing configured OpenSearch client, instead of creating a client per endpoint. This allows to
+ * customize the client with specific settings.
+ */
+ public void setClient(RestClient client) {
+ this.client = client;
+ }
+
+ /**
+ * Comma separated list with ip:port formatted remote transport addresses to use. The ip and port options must be
+ * left blank for hostAddresses to be considered instead.
+ */
+ public String getHostAddresses() {
+ return hostAddresses;
+ }
+
+ public void setHostAddresses(String hostAddresses) {
+ this.hostAddresses = hostAddresses;
+ }
+
+ /**
+ * The timeout in ms to wait before the socket will time out.
+ */
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ /**
+ * The time in ms to wait before connection will time out.
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ /**
+ * Basic authenticate user
+ */
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * Password for authenticate
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public boolean isEnableSSL() {
+ return enableSSL;
+ }
+
+ /**
+ * Enable SSL
+ */
+ public void setEnableSSL(boolean enableSSL) {
+ this.enableSSL = enableSSL;
+ }
+
+ /**
+ * The time in ms before retry
+ */
+ public int getMaxRetryTimeout() {
+ return maxRetryTimeout;
+ }
+
+ public void setMaxRetryTimeout(int maxRetryTimeout) {
+ this.maxRetryTimeout = maxRetryTimeout;
+ }
+
+ public boolean isEnableSniffer() {
+ return enableSniffer;
+ }
+
+ /**
+ * Enable automatically discover nodes from a running OpenSearch cluster. If this option is used in conjunction with
+ * Spring Boot then it's managed by the Spring Boot configuration (see: Disable Sniffer in Spring Boot).
+ */
+ public void setEnableSniffer(boolean enableSniffer) {
+ this.enableSniffer = enableSniffer;
+ }
+
+ /**
+ * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFailure
+ * is disabled or when there are no failures between consecutive sniff executions
+ */
+ public int getSnifferInterval() {
+ return snifferInterval;
+ }
+
+ public void setSnifferInterval(int snifferInterval) {
+ this.snifferInterval = snifferInterval;
+ }
+
+ /**
+ * The delay of a sniff execution scheduled after a failure (in milliseconds)
+ */
+ public int getSniffAfterFailureDelay() {
+ return sniffAfterFailureDelay;
+ }
+
+ public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
+ this.sniffAfterFailureDelay = sniffAfterFailureDelay;
+ }
+
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchComponentVerifierExtension.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchComponentVerifierExtension.java
new file mode 100644
index 00000000000..c7f5a50a382
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchComponentVerifierExtension.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension;
+import org.apache.camel.component.extension.verifier.ResultBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorHelper;
+import org.apache.http.HttpHost;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch.OpenSearchClient;
+import org.opensearch.client.transport.OpenSearchTransport;
+import org.opensearch.client.transport.rest_client.RestClientTransport;
+
+public class OpensearchComponentVerifierExtension extends DefaultComponentVerifierExtension {
+
+ public OpensearchComponentVerifierExtension() {
+ this("opensearch");
+ }
+
+ public OpensearchComponentVerifierExtension(String scheme) {
+ super(scheme);
+ }
+
+ // *********************************
+ // Parameters validation
+ // *********************************
+
+ @Override
+ protected Result verifyParameters(Map<String, Object> parameters) {
+
+ ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.PARAMETERS)
+ .error(ResultErrorHelper.requiresOption("clusterName", parameters))
+ .error(ResultErrorHelper.requiresOption("hostAddresses", parameters));
+ // Validate using the catalog
+
+ super.verifyParametersAgainstCatalog(builder, parameters);
+
+ return builder.build();
+ }
+
+ // *********************************
+ // Connectivity validation
+ // *********************************
+
+ @Override
+ protected Result verifyConnectivity(Map<String, Object> parameters) {
+ ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.CONNECTIVITY);
+
+ try {
+ OpensearchConfiguration configuration = setProperties(new OpensearchConfiguration(), parameters);
+ RestClientBuilder clientBuilder = RestClient.builder(configuration.getHostAddressesList().toArray(new HttpHost[0]));
+ try (OpenSearchTransport transport = new RestClientTransport(clientBuilder.build(), new JacksonJsonpMapper())) {
+ OpenSearchClient esClient = new OpenSearchClient(transport);
+ esClient.ping();
+ }
+ } catch (IOException e) {
+ ResultErrorBuilder errorBuilder
+ = ResultErrorBuilder.withCodeAndDescription(VerificationError.StandardCode.AUTHENTICATION, e.getMessage())
+ .detail("opensearch_exception_message", e.getMessage())
+ .detail(VerificationError.ExceptionAttribute.EXCEPTION_CLASS, e.getClass().getName())
+ .detail(VerificationError.ExceptionAttribute.EXCEPTION_INSTANCE, e);
+
+ builder.error(errorBuilder.build());
+ } catch (Exception e) {
+ builder.error(ResultErrorBuilder.withException(e).build());
+ }
+ return builder.build();
+ }
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchConfiguration.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchConfiguration.java
new file mode 100644
index 00000000000..cc3831ceb1b
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchConfiguration.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import java.util.List;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+import org.apache.http.HttpHost;
+
+@UriParams
+public class OpensearchConfiguration {
+
+ private List<HttpHost> hostAddressesList;
+ private String user;
+ private String password;
+
+ @UriPath
+ @Metadata(required = true)
+ private String clusterName;
+ @UriParam
+ private OpensearchOperation operation;
+ @UriParam
+ private Integer size;
+ @UriParam
+ private Integer from;
+ @UriParam
+ private String indexName;
+ @UriParam(defaultValue = "" + OpensearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS)
+ private int waitForActiveShards = OpensearchConstants.DEFAULT_FOR_WAIT_ACTIVE_SHARDS;
+ @UriParam
+ private String hostAddresses;
+ @UriParam(defaultValue = "" + OpensearchConstants.DEFAULT_SOCKET_TIMEOUT)
+ private int socketTimeout = OpensearchConstants.DEFAULT_SOCKET_TIMEOUT;
+ @UriParam(defaultValue = "" + OpensearchConstants.MAX_RETRY_TIMEOUT)
+ private int maxRetryTimeout = OpensearchConstants.MAX_RETRY_TIMEOUT;
+ @UriParam(defaultValue = "" + OpensearchConstants.DEFAULT_CONNECTION_TIMEOUT)
+ private int connectionTimeout = OpensearchConstants.DEFAULT_CONNECTION_TIMEOUT;
+ @UriParam
+ private boolean disconnect;
+ @UriParam(label = "security")
+ private boolean enableSSL;
+ @UriParam(label = "security")
+ private String certificatePath;
+ @UriParam
+ private boolean useScroll;
+ @UriParam(defaultValue = "" + OpensearchConstants.DEFAULT_SCROLL_KEEP_ALIVE_MS)
+ private int scrollKeepAliveMs = OpensearchConstants.DEFAULT_SCROLL_KEEP_ALIVE_MS;
+ @UriParam(label = "advanced")
+ private boolean enableSniffer;
+ @UriParam(label = "advanced", defaultValue = "" + OpensearchConstants.DEFAULT_SNIFFER_INTERVAL)
+ private int snifferInterval = OpensearchConstants.DEFAULT_SNIFFER_INTERVAL;
+ @UriParam(label = "advanced", defaultValue = "" + OpensearchConstants.DEFAULT_AFTER_FAILURE_DELAY)
+ private int sniffAfterFailureDelay = OpensearchConstants.DEFAULT_AFTER_FAILURE_DELAY;
+ @UriParam(label = "advanced", defaultValue = "ObjectNode")
+ private Class<?> documentClass = ObjectNode.class;
+
+ /**
+ * Starting index of the response.
+ */
+ public Integer getFrom() {
+ return from;
+ }
+
+ public void setFrom(Integer from) {
+ this.from = from;
+ }
+
+ /**
+ * Size of the response.
+ */
+ public Integer getSize() {
+ return size;
+ }
+
+ public void setSize(Integer size) {
+ this.size = size;
+ }
+
+ /**
+ * Name of the cluster
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ /**
+ * What operation to perform
+ */
+ public OpensearchOperation getOperation() {
+ return operation;
+ }
+
+ public void setOperation(OpensearchOperation operation) {
+ this.operation = operation;
+ }
+
+ /**
+ * The name of the index to act against
+ */
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public void setIndexName(String indexName) {
+ this.indexName = indexName;
+ }
+
+ /**
+ * Comma separated list with ip:port formatted remote transport addresses to use.
+ */
+ public String getHostAddresses() {
+ return hostAddresses;
+ }
+
+ public void setHostAddresses(String hostAddresses) {
+ this.hostAddresses = hostAddresses;
+ }
+
+ /**
+ * Index creation waits for the write consistency number of shards to be available
+ */
+ public int getWaitForActiveShards() {
+ return waitForActiveShards;
+ }
+
+ public void setWaitForActiveShards(int waitForActiveShards) {
+ this.waitForActiveShards = waitForActiveShards;
+ }
+
+ public List<HttpHost> getHostAddressesList() {
+ return hostAddressesList;
+ }
+
+ public void setHostAddressesList(List<HttpHost> hostAddressesList) {
+ this.hostAddressesList = hostAddressesList;
+ }
+
+ /**
+ * The timeout in ms to wait before the socket will timeout.
+ */
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ /**
+ * The time in ms to wait before connection will timeout.
+ */
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ /**
+ * Basic authenticate user
+ */
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ /**
+ * Password for authenticate
+ */
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /**
+ * Enable SSL
+ */
+ public boolean isEnableSSL() {
+ return enableSSL;
+ }
+
+ public void setEnableSSL(boolean enableSSL) {
+ this.enableSSL = enableSSL;
+ }
+
+ /**
+ * The certificate that can be used to access the ES Cluster. It can be loaded by default from classpath, but you
+ * can prefix with classpath:, file:, or http: to load the resource from different systems.
+ */
+ public String getCertificatePath() {
+ return certificatePath;
+ }
+
+ public void setCertificatePath(String certificatePath) {
+ this.certificatePath = certificatePath;
+ }
+
+ /**
+ * The time in ms before retry
+ */
+ public int getMaxRetryTimeout() {
+ return maxRetryTimeout;
+ }
+
+ public void setMaxRetryTimeout(int maxRetryTimeout) {
+ this.maxRetryTimeout = maxRetryTimeout;
+ }
+
+ /**
+ * Disconnect after it finish calling the producer
+ */
+ public boolean isDisconnect() {
+ return disconnect;
+ }
+
+ public void setDisconnect(boolean disconnect) {
+ this.disconnect = disconnect;
+ }
+
+ /**
+ * Enable automatically discover nodes from a running OpenSearch cluster. If this option is used in conjunction with
+ * Spring Boot then it's managed by the Spring Boot configuration (see: Disable Sniffer in Spring Boot).
+ */
+ public boolean isEnableSniffer() {
+ return enableSniffer;
+ }
+
+ public void setEnableSniffer(boolean enableSniffer) {
+ this.enableSniffer = enableSniffer;
+ }
+
+ /**
+ * The interval between consecutive ordinary sniff executions in milliseconds. Will be honoured when sniffOnFailure
+ * is disabled or when there are no failures between consecutive sniff executions
+ */
+ public int getSnifferInterval() {
+ return snifferInterval;
+ }
+
+ public void setSnifferInterval(int snifferInterval) {
+ this.snifferInterval = snifferInterval;
+ }
+
+ /**
+ * The delay of a sniff execution scheduled after a failure (in milliseconds)
+ */
+ public int getSniffAfterFailureDelay() {
+ return sniffAfterFailureDelay;
+ }
+
+ public void setSniffAfterFailureDelay(int sniffAfterFailureDelay) {
+ this.sniffAfterFailureDelay = sniffAfterFailureDelay;
+ }
+
+ /**
+ * Enable scroll usage
+ */
+ public boolean isUseScroll() {
+ return useScroll;
+ }
+
+ public void setUseScroll(boolean useScroll) {
+ this.useScroll = useScroll;
+ }
+
+ /**
+ * Time in ms during which OpenSearch will keep search context alive
+ */
+ public int getScrollKeepAliveMs() {
+ return scrollKeepAliveMs;
+ }
+
+ public void setScrollKeepAliveMs(int scrollKeepAliveMs) {
+ this.scrollKeepAliveMs = scrollKeepAliveMs;
+ }
+
+ /**
+ * The class to use when deserializing the documents.
+ */
+ public Class<?> getDocumentClass() {
+ return documentClass;
+ }
+
+ public void setDocumentClass(Class<?> documentClass) {
+ this.documentClass = documentClass;
+ }
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchConstants.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchConstants.java
new file mode 100644
index 00000000000..e16f776111c
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchConstants.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import org.apache.camel.spi.Metadata;
+
+public interface OpensearchConstants {
+
+ @Metadata(description = "The operation to perform",
+ javaType = "org.apache.camel.component.opensearch.OpensearchOperation")
+ String PARAM_OPERATION = "operation";
+ @Metadata(description = "The id of the indexed document.", javaType = "String")
+ String PARAM_INDEX_ID = "indexId";
+ @Metadata(description = "The name of the index to act against", javaType = "String")
+ String PARAM_INDEX_NAME = "indexName";
+ @Metadata(description = "The full qualified name of the class of the document to unmarshall", javaType = "Class",
+ defaultValue = "ObjectNode")
+ String PARAM_DOCUMENT_CLASS = "documentClass";
+ @Metadata(description = "The index creation waits for the write consistency number of shards to be available",
+ javaType = "Integer")
+ String PARAM_WAIT_FOR_ACTIVE_SHARDS = "waitForActiveShards";
+ @Metadata(description = "The starting index of the response.", javaType = "Integer")
+ String PARAM_SCROLL_KEEP_ALIVE_MS = "scrollKeepAliveMs";
+ @Metadata(description = "Set to true to enable scroll usage", javaType = "Boolean")
+ String PARAM_SCROLL = "useScroll";
+ @Metadata(description = "The size of the response.", javaType = "Integer")
+ String PARAM_SIZE = "size";
+ @Metadata(description = "The starting index of the response.", javaType = "Integer")
+ String PARAM_FROM = "from";
+
+ String PROPERTY_SCROLL_OPENSEARCH_QUERY_COUNT = "CamelOpenSearchScrollQueryCount";
+
+ int DEFAULT_PORT = 9200;
+ int DEFAULT_FOR_WAIT_ACTIVE_SHARDS = 1; // Meaning only wait for the primary shard
+ int DEFAULT_SOCKET_TIMEOUT = 30000; // Meaning how long time to wait before the socket timeout
+ int MAX_RETRY_TIMEOUT = 30000; // Meaning how long to wait before retry again
+ int DEFAULT_CONNECTION_TIMEOUT = 30000; // Meaning how many seconds before it timeouts when establish connection
+ int DEFAULT_SNIFFER_INTERVAL = 60000 * 5; // Meaning how often it should search for OpenSearch nodes
+ int DEFAULT_AFTER_FAILURE_DELAY = 60000; // Meaning when should the sniff execution scheduled after a failure
+ int DEFAULT_SCROLL_KEEP_ALIVE_MS = 60000; // Meaning how many milliseconds OpenSearch will keep the search context
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchEndpoint.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchEndpoint.java
new file mode 100644
index 00000000000..0b1821251dc
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchEndpoint.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import org.apache.camel.Category;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.DefaultEndpoint;
+import org.opensearch.client.RestClient;
+
+/**
+ * Send requests to OpenSearch via Java Client API.
+ */
+@UriEndpoint(firstVersion = "4.0.0", scheme = "opensearch", title = "OpenSearch",
+ syntax = "opensearch:clusterName", producerOnly = true,
+ category = { Category.SEARCH, Category.MONITORING }, headersClass = OpensearchConstants.class)
+public class OpensearchEndpoint extends DefaultEndpoint {
+
+ @UriParam
+ private final OpensearchConfiguration configuration;
+
+ private final RestClient client;
+
+ public OpensearchEndpoint(String uri, OpensearchComponent component, OpensearchConfiguration config,
+ RestClient client) {
+ super(uri, component);
+ this.configuration = config;
+ this.client = client;
+ }
+
+ public OpensearchConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public Producer createProducer() {
+ return new OpensearchProducer(this, configuration);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) {
+ throw new UnsupportedOperationException("Cannot consume from an OpenSearch: " + getEndpointUri());
+ }
+
+ public RestClient getClient() {
+ return client;
+ }
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchOperation.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchOperation.java
new file mode 100644
index 00000000000..adca03e01ce
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchOperation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+/**
+ * The OpenSearch server operations list which are implemented
+ * <p>
+ * <ul>
+ * <li>Index - Index a document associated with a given index</li>
+ * <li>Update - Updates a document based on a script</li>
+ * <li>Bulk - Executes a bulk of index / create/ update delete operations</li>
+ * <li>GetById - Get an indexed document from its id</li>
+ * <li>MultiGet - Multiple get documents</li>
+ * <li>Delete - Deletes a document from the index based on the index and id</li>
+ * <li>DeleteIndex - Deletes an index based on the index name</li>
+ * <li>MultiSearch - Multiple Search across one or more indices with a query</li>
+ * <li>Search - Search across one or more indices with a query</li>
+ * <li>Exists - Checks whether the index exists or not</li>
+ * <li>Ping - Pings the Opensearch cluster</li>
+ * </ul>
+ *
+ *
+ *
+ *
+ * (using search with size=0 and terminate_after=1 parameters)
+ */
+public enum OpensearchOperation {
+ Index("Index"),
+ Update("Update"),
+ Bulk("Bulk"),
+ GetById("GetById"),
+ MultiGet("MultiGet"),
+ MultiSearch("MultiSearch"),
+ Delete("Delete"),
+ DeleteIndex("DeleteIndex"),
+ Search("Search"),
+ Exists("Exists"),
+ Ping("Ping");
+
+ private final String text;
+
+ OpensearchOperation(final String text) {
+ this.text = text;
+ }
+
+ @Override
+ public String toString() {
+ return text;
+ }
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchProducer.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchProducer.java
new file mode 100644
index 00000000000..5ee5ebed4c6
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchProducer.java
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateFactory;
+import java.util.concurrent.CompletableFuture;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch.OpenSearchAsyncClient;
+import org.opensearch.client.opensearch.OpenSearchClient;
+import org.opensearch.client.opensearch._types.WriteResponseBase;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.BulkResponse;
+import org.opensearch.client.opensearch.core.DeleteRequest;
+import org.opensearch.client.opensearch.core.DeleteResponse;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.MgetRequest;
+import org.opensearch.client.opensearch.core.MgetResponse;
+import org.opensearch.client.opensearch.core.MsearchRequest;
+import org.opensearch.client.opensearch.core.MsearchResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.SearchResponse;
+import org.opensearch.client.opensearch.core.UpdateRequest;
+import org.opensearch.client.opensearch.core.UpdateResponse;
+import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
+import org.opensearch.client.opensearch.indices.DeleteIndexResponse;
+import org.opensearch.client.opensearch.indices.ExistsRequest;
+import org.opensearch.client.sniff.Sniffer;
+import org.opensearch.client.sniff.SnifferBuilder;
+import org.opensearch.client.transport.OpenSearchTransport;
+import org.opensearch.client.transport.endpoints.BooleanResponse;
+import org.opensearch.client.transport.rest_client.RestClientTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.opensearch.OpensearchConstants.PARAM_SCROLL;
+import static org.apache.camel.component.opensearch.OpensearchConstants.PARAM_SCROLL_KEEP_ALIVE_MS;
+
+/**
+ * Represents an Opensearch producer.
+ */
+class OpensearchProducer extends DefaultAsyncProducer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OpensearchProducer.class);
+
+ protected final OpensearchConfiguration configuration;
+ private final Object mutex = new Object();
+ private volatile RestClient client;
+ private Sniffer sniffer;
+
+ public OpensearchProducer(OpensearchEndpoint endpoint, OpensearchConfiguration configuration) {
+ super(endpoint);
+ this.configuration = configuration;
+ this.client = endpoint.getClient();
+ }
+
+ private OpensearchOperation resolveOperation(Exchange exchange) {
+ // 1. Operation can be driven by either (in order of preference):
+ // a. If the body is an ActionRequest the operation is set by the type
+ // of request.
+ // b. If the body is not an ActionRequest, the operation is set by the
+ // header if it exists.
+ // c. If neither the operation can not be derived from the body or
+ // header, the configuration is used.
+ // In the event we can't discover the operation from a, b or c we throw
+ // an error.
+ Object request = exchange.getIn().getBody();
+ if (request != null) {
+ LOG.debug("Operation request body: {}", request);
+ }
+
+ if (request instanceof IndexRequest) {
+ return OpensearchOperation.Index;
+ } else if (request instanceof GetRequest) {
+ return OpensearchOperation.GetById;
+ } else if (request instanceof MgetRequest) {
+ return OpensearchOperation.MultiGet;
+ } else if (request instanceof UpdateRequest) {
+ return OpensearchOperation.Update;
+ } else if (request instanceof BulkRequest) {
+ return OpensearchOperation.Bulk;
+ } else if (request instanceof DeleteRequest) {
+ return OpensearchOperation.Delete;
+ } else if (request instanceof SearchRequest) {
+ return OpensearchOperation.Search;
+ } else if (request instanceof MsearchRequest) {
+ return OpensearchOperation.MultiSearch;
+ } else if (request instanceof DeleteIndexRequest) {
+ return OpensearchOperation.DeleteIndex;
+ }
+
+ OpensearchOperation operationConfig
+ = exchange.getIn().getHeader(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.class);
+
+ LOG.debug("Operation obtained from header {}: {}", OpensearchConstants.PARAM_OPERATION, operationConfig);
+
+ if (operationConfig == null) {
+ operationConfig = configuration.getOperation();
+ }
+
+ LOG.debug("Operation obtained from config: {}", operationConfig);
+
+ if (operationConfig == null) {
+ throw new IllegalArgumentException(
+ OpensearchConstants.PARAM_OPERATION + " value is mandatory");
+ }
+ return operationConfig;
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ if (configuration.isDisconnect() && client == null) {
+ startClient();
+ }
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ OpenSearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper(mapper));
+ // 2. Index and type will be set by:
+ // a. If the incoming body is already an action request
+ // b. If the body is not an action request we will use headers if they
+ // are set.
+ // c. If the body is not an action request and the headers aren't set we
+ // will use the configuration.
+ // No error is thrown by the component in the event none of the above
+ // conditions are met. The java es client
+ // will throw.
+
+ Message message = exchange.getIn();
+ final OpensearchOperation operation = resolveOperation(exchange);
+
+ // Set the index/type headers on the exchange if necessary. This is used
+ // for type conversion.
+ boolean configIndexName = false;
+ String indexName = message.getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class);
+ if (indexName == null) {
+ message.setHeader(OpensearchConstants.PARAM_INDEX_NAME, configuration.getIndexName());
+ configIndexName = true;
+ }
+
+ Integer size = message.getHeader(OpensearchConstants.PARAM_SIZE, Integer.class);
+ if (size == null) {
+ message.setHeader(OpensearchConstants.PARAM_SIZE, configuration.getSize());
+ }
+
+ Integer from = message.getHeader(OpensearchConstants.PARAM_FROM, Integer.class);
+ if (from == null) {
+ message.setHeader(OpensearchConstants.PARAM_FROM, configuration.getFrom());
+ }
+
+ boolean configWaitForActiveShards = false;
+ Integer waitForActiveShards = message.getHeader(OpensearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, Integer.class);
+ if (waitForActiveShards == null) {
+ message.setHeader(OpensearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS, configuration.getWaitForActiveShards());
+ configWaitForActiveShards = true;
+ }
+
+ Class<?> documentClass = message.getHeader(OpensearchConstants.PARAM_DOCUMENT_CLASS, Class.class);
+ if (documentClass == null) {
+ documentClass = configuration.getDocumentClass();
+ }
+
+ ActionContext ctx = new ActionContext(exchange, callback, transport, configIndexName, configWaitForActiveShards);
+
+ switch (operation) {
+ case Index -> processIndexAsync(ctx);
+ case Update -> processUpdateAsync(ctx, documentClass);
+ case GetById -> processGetByIdAsync(ctx, documentClass);
+ case Bulk -> processBulkAsync(ctx);
+ case Delete -> processDeleteAsync(ctx);
+ case DeleteIndex -> processDeleteIndexAsync(ctx);
+ case Exists -> processExistsAsync(ctx);
+ case Search -> {
+ SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
+ if (searchRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
+ }
+ // is it a scroll request ?
+ boolean useScroll = message.getHeader(PARAM_SCROLL, configuration.isUseScroll(), Boolean.class);
+ if (useScroll) {
+ // As a scroll request is expected, for the sake of simplicity, the synchronous mode is preserved
+ int scrollKeepAliveMs
+ = message.getHeader(PARAM_SCROLL_KEEP_ALIVE_MS, configuration.getScrollKeepAliveMs(),
+ Integer.class);
+ OpensearchScrollRequestIterator<?> scrollRequestIterator = new OpensearchScrollRequestIterator<>(
+ searchRequestBuilder, new OpenSearchClient(transport), scrollKeepAliveMs, exchange,
+ documentClass);
+ exchange.getIn().setBody(scrollRequestIterator);
+ cleanup(ctx);
+ callback.done(true);
+ return true;
+ } else {
+ onComplete(
+ ctx.getClient().search(searchRequestBuilder.build(), documentClass)
+ .thenApply(SearchResponse::hits),
+ ctx);
+ }
+ }
+ case MultiSearch -> processMultiSearchAsync(ctx, documentClass);
+ case MultiGet -> processMultiGetAsync(ctx, documentClass);
+ case Ping -> processPingAsync(ctx);
+ default -> throw new IllegalArgumentException(
+ OpensearchConstants.PARAM_OPERATION + " value '" + operation + "' is not supported");
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Executes asynchronously a ping to the OpenSearch cluster.
+ */
+ private void processPingAsync(ActionContext ctx) throws IOException {
+ onComplete(
+ ctx.getClient().ping()
+ .thenApply(BooleanResponse::value),
+ ctx);
+ }
+
+ /**
+ * Executes asynchronously a multi-get request.
+ */
+ private void processMultiGetAsync(ActionContext ctx, Class<?> documentClass) throws IOException {
+ MgetRequest.Builder mgetRequestBuilder = ctx.getMessage().getBody(MgetRequest.Builder.class);
+ if (mgetRequestBuilder == null) {
+ throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().mget(mgetRequestBuilder.build(), documentClass)
+ .thenApply(MgetResponse::docs),
+ ctx);
+ }
+
+ /**
+ * Executes asynchronously a multi-search request.
+ */
+ private void processMultiSearchAsync(ActionContext ctx, Class<?> documentClass) throws IOException {
+ MsearchRequest.Builder msearchRequestBuilder = ctx.getMessage().getBody(MsearchRequest.Builder.class);
+ if (msearchRequestBuilder == null) {
+ throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().msearch(msearchRequestBuilder.build(), documentClass)
+ .thenApply(MsearchResponse::responses),
+ ctx);
+ }
+
+ /**
+ * Checks asynchronously if a given index exists.
+ */
+ private void processExistsAsync(ActionContext ctx) throws IOException {
+ ExistsRequest.Builder builder = new ExistsRequest.Builder();
+ builder.index(ctx.getMessage().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class));
+ onComplete(
+ ctx.getClient().indices().exists(builder.build())
+ .thenApply(BooleanResponse::value),
+ ctx);
+ }
+
+ /**
+ * Deletes asynchronously an index.
+ */
+ private void processDeleteIndexAsync(ActionContext ctx) throws IOException {
+ DeleteIndexRequest.Builder deleteIndexRequestBuilder = ctx.getMessage().getBody(DeleteIndexRequest.Builder.class);
+ if (deleteIndexRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().indices().delete(deleteIndexRequestBuilder.build())
+ .thenApply(DeleteIndexResponse::acknowledged),
+ ctx);
+ }
+
+ /**
+ * Deletes asynchronously a document.
+ */
+ private void processDeleteAsync(ActionContext ctx) throws IOException {
+ DeleteRequest.Builder deleteRequestBuilder = ctx.getMessage().getBody(DeleteRequest.Builder.class);
+ if (deleteRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().delete(deleteRequestBuilder.build())
+ .thenApply(DeleteResponse::result),
+ ctx);
+ }
+
+ /**
+ * Executes asynchronously bulk operations.
+ */
+ private void processBulkAsync(ActionContext ctx) throws IOException {
+ BulkRequest.Builder bulkRequestBuilder = ctx.getMessage().getBody(BulkRequest.Builder.class);
+ if (bulkRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().bulk(bulkRequestBuilder.build())
+ .thenApply(BulkResponse::items),
+ ctx);
+ }
+
+ /**
+ * Finds asynchronously a document by id.
+ */
+ private void processGetByIdAsync(ActionContext ctx, Class<?> documentClass) throws IOException {
+ GetRequest.Builder getRequestBuilder = ctx.getMessage().getBody(GetRequest.Builder.class);
+ if (getRequestBuilder == null) {
+ throw new IllegalArgumentException(
+ "Wrong body type. Only String or GetRequest.Builder is allowed as a type");
+ }
+ onComplete(
+ ctx.getClient().get(getRequestBuilder.build(), documentClass),
+ ctx);
+ }
+
+ /**
+ * Updates asynchronously a document.
+ */
+ private void processUpdateAsync(ActionContext ctx, Class<?> documentClass) throws IOException {
+ var updateRequestBuilder = ctx.getMessage().getBody(UpdateRequest.Builder.class);
+ onComplete(
+ ctx.getClient().update(updateRequestBuilder.build(), documentClass)
+ .thenApply(r -> ((UpdateResponse<?>) r).id()),
+ ctx);
+ }
+
+ /**
+ * Indexes asynchronously a document.
+ */
+ private void processIndexAsync(ActionContext ctx) throws IOException {
+ IndexRequest.Builder<?> indexRequestBuilder = ctx.getMessage().getBody(IndexRequest.Builder.class);
+ onComplete(
+ ctx.getClient().index(indexRequestBuilder.build())
+ .thenApply(WriteResponseBase::id),
+ ctx);
+ }
+
+ /**
+ * Add actions to perform once the given future is complete.
+ *
+ * @param future the future to complete with specific actions.
+ * @param ctx the context of the asynchronous task.
+ * @param <T> the result type returned by the future.
+ */
+ private <T> void onComplete(CompletableFuture<T> future, ActionContext ctx) {
+ final Exchange exchange = ctx.exchange();
+ future.thenAccept(r -> exchange.getIn().setBody(r))
+ .thenAccept(r -> cleanup(ctx))
+ .whenComplete(
+ (r, e) -> {
+ try {
+ if (e != null) {
+ exchange.setException(new CamelExchangeException(
+ "An error occurred while executing the action", exchange, e));
+ }
+ } finally {
+ ctx.callback().done(false);
+ }
+ });
+ }
+
+ /**
+ * The cleanup task to execute once everything is done.
+ */
+ private void cleanup(ActionContext ctx) {
+
+ try {
+ Message message = ctx.getMessage();
+
+ // If we set params via the configuration on this exchange, remove them
+ // now. This preserves legacy behavior for this component and enables a
+ // use case where one message can be sent to multiple OpenSearch
+ // endpoints where the user is relying on the endpoint configuration
+ // (index/type) rather than header values. If we do not clear this out
+ // sending the same message (index request, for example) to multiple
+ // OpenSearch endpoints would have the effect overriding any
+ // subsequent endpoint index/type with the first endpoint index/type.
+ if (ctx.configIndexName()) {
+ message.removeHeader(OpensearchConstants.PARAM_INDEX_NAME);
+ }
+
+ if (ctx.configWaitForActiveShards()) {
+ message.removeHeader(OpensearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS);
+ }
+ if (configuration.isDisconnect()) {
+ IOHelper.close(ctx.transport());
+ if (configuration.isEnableSniffer()) {
+ IOHelper.close(sniffer);
+ sniffer = null;
+ }
+ IOHelper.close(client);
+ client = null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not execute the cleanup task", e);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ if (!configuration.isDisconnect()) {
+ startClient();
+ }
+ }
+
+ private void startClient() {
+ if (client == null) {
+ synchronized (mutex) {
+ if (client == null) {
+ LOG.info("Connecting to the OpenSearch cluster: {}", configuration.getClusterName());
+ if (configuration.getHostAddressesList() != null
+ && !configuration.getHostAddressesList().isEmpty()) {
+ client = createClient();
+ } else {
+ LOG.warn("Incorrect ip address and port parameters settings for OpenSearch cluster");
+ }
+ }
+ }
+ }
+ }
+
+ private RestClient createClient() {
+ final RestClientBuilder builder = RestClient.builder(configuration.getHostAddressesList().toArray(new HttpHost[0]));
+
+ builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
+ .setConnectTimeout(configuration.getConnectionTimeout()).setSocketTimeout(configuration.getSocketTimeout()));
+ if (configuration.getUser() != null && configuration.getPassword() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(configuration.getUser(), configuration.getPassword()));
+ builder.setHttpClientConfigCallback(httpClientBuilder -> {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ if (configuration.getCertificatePath() != null) {
+ httpClientBuilder.setSSLContext(createSslContextFromCa());
+ }
+ return httpClientBuilder;
+ });
+ }
+ final RestClient restClient = builder.build();
+ if (configuration.isEnableSniffer()) {
+ SnifferBuilder snifferBuilder = Sniffer.builder(restClient);
+ snifferBuilder.setSniffIntervalMillis(configuration.getSnifferInterval());
+ snifferBuilder.setSniffAfterFailureDelayMillis(configuration.getSniffAfterFailureDelay());
+ sniffer = snifferBuilder.build();
+ }
+ return restClient;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (client != null) {
+ LOG.info("Disconnecting from OpenSearch cluster: {}", configuration.getClusterName());
+ client.close();
+ if (sniffer != null) {
+ sniffer.close();
+ }
+ }
+ super.doStop();
+ }
+
+ public RestClient getClient() {
+ return client;
+ }
+
+ /**
+ * An SSL context based on the self-signed CA, so that using this SSL Context allows to connect to the OpenSearch
+ * service
+ *
+ * @return a customized SSL Context
+ */
+ private SSLContext createSslContextFromCa() {
+ try {
+ CertificateFactory factory = CertificateFactory.getInstance("X.509");
+ InputStream resolveMandatoryResourceAsInputStream
+ = ResourceHelper.resolveMandatoryResourceAsInputStream(getEndpoint().getCamelContext(),
+ configuration.getCertificatePath());
+ Certificate trustedCa = factory.generateCertificate(resolveMandatoryResourceAsInputStream);
+ KeyStore trustStore = KeyStore.getInstance("pkcs12");
+ trustStore.load(null, null);
+ trustStore.setCertificateEntry("ca", trustedCa);
+
+ final SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
+ TrustManagerFactory trustManagerFactory
+ = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(trustStore);
+ sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
+ return sslContext;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * An inner class providing all the information that an asynchronous action could need.
+ */
+ private record ActionContext(Exchange exchange, AsyncCallback callback, OpenSearchTransport transport,
+ boolean configIndexName, boolean configWaitForActiveShards) {
+
+ OpenSearchAsyncClient getClient() {
+ return new OpenSearchAsyncClient(transport);
+ }
+
+ Message getMessage() {
+ return exchange.getIn();
+ }
+ }
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchScrollRequestIterator.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchScrollRequestIterator.java
new file mode 100644
index 00000000000..70e19c9c088
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/OpensearchScrollRequestIterator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.opensearch.client.opensearch.OpenSearchClient;
+import org.opensearch.client.opensearch._types.Time;
+import org.opensearch.client.opensearch.core.ClearScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.SearchResponse;
+import org.opensearch.client.opensearch.core.search.Hit;
+
+public class OpensearchScrollRequestIterator<TDocument> implements Iterator<Hit<TDocument>>, Closeable {
+
+ private final SearchRequest searchRequest;
+ private final OpenSearchClient esClient;
+ private final Class<TDocument> documentClass;
+ private Iterator<? extends Hit<TDocument>> currentSearchHits;
+ private final int scrollKeepAliveMs;
+ private final Exchange exchange;
+ private String scrollId;
+ private boolean closed;
+ private int requestCount;
+
+ public OpensearchScrollRequestIterator(SearchRequest.Builder searchRequestBuilder, OpenSearchClient esClient,
+ int scrollKeepAliveMs, Exchange exchange, Class<TDocument> documentClass) {
+ // add scroll option on the first query
+ this.searchRequest = searchRequestBuilder
+ .scroll(Time.of(b -> b.time(String.format("%sms", scrollKeepAliveMs))))
+ .build();
+ this.esClient = esClient;
+ this.scrollKeepAliveMs = scrollKeepAliveMs;
+ this.exchange = exchange;
+ this.closed = false;
+ this.requestCount = 0;
+ this.documentClass = documentClass;
+
+ setFirstCurrentSearchHits();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (closed) {
+ return false;
+ }
+
+ boolean hasNext = currentSearchHits.hasNext();
+ if (!hasNext) {
+ updateCurrentSearchHits();
+
+ hasNext = currentSearchHits.hasNext();
+ }
+
+ return hasNext;
+ }
+
+ @Override
+ public Hit<TDocument> next() {
+ return closed ? null : currentSearchHits.next();
+ }
+
+ /**
+ * Execute next OpenSearch scroll request and update the current scroll result.
+ */
+ private void updateCurrentSearchHits() {
+ ScrollResponse<TDocument> scrollResponse = scrollSearch();
+ this.currentSearchHits = scrollResponse.hits().hits().iterator();
+ }
+
+ private void setFirstCurrentSearchHits() {
+ SearchResponse<TDocument> searchResponse = firstSearch();
+ this.currentSearchHits = searchResponse.hits().hits().iterator();
+ this.scrollId = searchResponse.scrollId();
+ }
+
+ private SearchResponse<TDocument> firstSearch() {
+ SearchResponse<TDocument> searchResponse;
+ try {
+ searchResponse = esClient.search(searchRequest, documentClass);
+ requestCount++;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ return searchResponse;
+ }
+
+ private ScrollResponse<TDocument> scrollSearch() {
+ ScrollResponse<TDocument> scrollResponse;
+ try {
+ ScrollRequest searchScrollRequest = new ScrollRequest.Builder()
+ .scroll(Time.of(b -> b.time(String.format("%sms", scrollKeepAliveMs))))
+ .scrollId(scrollId)
+ .build();
+
+ scrollResponse = esClient.scroll(searchScrollRequest, documentClass);
+ requestCount++;
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ return scrollResponse;
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ try {
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest.Builder()
+ .scrollId(List.of(scrollId))
+ .build();
+
+ esClient.clearScroll(clearScrollRequest);
+ closed = true;
+ exchange.setProperty(OpensearchConstants.PROPERTY_SCROLL_OPENSEARCH_QUERY_COUNT, requestCount);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ public int getRequestCount() {
+ return requestCount;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/aggregation/BulkRequestAggregationStrategy.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/aggregation/BulkRequestAggregationStrategy.java
new file mode 100644
index 00000000000..40e109e8631
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/aggregation/BulkRequestAggregationStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.aggregation;
+
+import java.util.List;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadRuntimeException;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.bulk.BulkOperation;
+
+/**
+ * Aggregates two {@link BulkOperation}s into a single {@link BulkRequest}.
+ */
+public class BulkRequestAggregationStrategy implements AggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ // Don't use getBody(Class<T>) here as we don't want to coerce the body type using a type converter.
+ Object objBody = newExchange.getIn().getBody();
+ if (!(objBody instanceof BulkOperation[])) {
+ throw new InvalidPayloadRuntimeException(newExchange, BulkOperation[].class);
+ }
+
+ BulkOperation[] newBody = (BulkOperation[]) objBody;
+ BulkRequest.Builder builder = new BulkRequest.Builder();
+ builder.operations(List.of(newBody));
+ if (oldExchange != null) {
+ BulkRequest request = oldExchange.getIn().getBody(BulkRequest.class);
+ builder.operations(request.operations());
+ }
+ newExchange.getIn().setBody(builder.build());
+ return oldExchange;
+ }
+}
diff --git a/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/converter/OpensearchActionRequestConverter.java b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/converter/OpensearchActionRequestConverter.java
new file mode 100644
index 00000000000..a5dececfbf0
--- /dev/null
+++ b/components/camel-opensearch/src/main/java/org/apache/camel/component/opensearch/converter/OpensearchActionRequestConverter.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.converter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.camel.Converter;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.opensearch.OpensearchConstants;
+import org.apache.camel.util.ObjectHelper;
+import org.opensearch.client.json.JsonData;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch._types.WaitForActiveShards;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.DeleteRequest;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.MgetRequest;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.UpdateRequest;
+import org.opensearch.client.opensearch.core.bulk.BulkOperation;
+import org.opensearch.client.opensearch.core.bulk.BulkOperationVariant;
+import org.opensearch.client.opensearch.core.bulk.CreateOperation;
+import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
+import org.opensearch.client.opensearch.core.bulk.IndexOperation;
+import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
+import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Converter(generateLoader = true)
+public final class OpensearchActionRequestConverter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OpensearchActionRequestConverter.class);
+
+ private static final String OPENSEARCH_QUERY_DSL_PREFIX = "query";
+ private static final String OPENSEARCH_UPDATE_DOC_PREFIX = "doc";
+
+ private OpensearchActionRequestConverter() {
+ }
+
+ // Index requests
+ private static IndexOperation.Builder<?> createIndexOperationBuilder(Object document, Exchange exchange)
+ throws IOException {
+ if (document instanceof IndexOperation.Builder) {
+ return (IndexOperation.Builder<?>) document;
+ }
+ JacksonJsonpMapper mapper = createMapper();
+ IndexOperation.Builder<Object> builder = new IndexOperation.Builder<>();
+ if (document instanceof byte[] byteArray) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(byteArray), mapper).toJson());
+ } else if (document instanceof InputStream inputStream) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(inputStream), mapper).toJson());
+ } else if (document instanceof String string) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(new StringReader(string)), mapper).toJson());
+ } else if (document instanceof Reader reader) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(reader), mapper).toJson());
+ } else {
+ builder.document(document);
+ }
+ return builder
+ .index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class));
+ }
+
+ @Converter
+ public static IndexRequest.Builder<?> toIndexRequestBuilder(Object document, Exchange exchange) throws IOException {
+ if (document instanceof IndexRequest.Builder<?> builder) {
+ return builder.id(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_ID, String.class));
+ }
+ JacksonJsonpMapper mapper = createMapper();
+ IndexRequest.Builder<Object> builder = new IndexRequest.Builder<>();
+ if (document instanceof byte[] byteArray) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(byteArray), mapper).toJson());
+ } else if (document instanceof InputStream inputStream) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(inputStream), mapper).toJson());
+ } else if (document instanceof String string) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(new StringReader(string)), mapper).toJson());
+ } else if (document instanceof Reader reader) {
+ builder.document(JsonData.of(mapper.objectMapper().reader().readTree(reader), mapper).toJson());
+ } else {
+ builder.document(document);
+ }
+ return builder
+ .waitForActiveShards(
+ new WaitForActiveShards.Builder()
+ .count(exchange.getIn().getHeader(OpensearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS,
+ Integer.class))
+ .build())
+ .id(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_ID, String.class))
+ .index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class));
+ }
+
+ @Converter
+ public static UpdateRequest.Builder<?, ?> toUpdateRequestBuilder(Object document, Exchange exchange) throws IOException {
+ if (document instanceof UpdateRequest.Builder<?, ?> builder) {
+ return builder.id(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_ID, String.class));
+ }
+ JacksonJsonpMapper mapper = createMapper();
+ UpdateRequest.Builder<?, Object> builder = new UpdateRequest.Builder<>();
+ if (document instanceof byte[] byteArray) {
+ document = JsonData.of(mapper.objectMapper().reader().readTree(byteArray), mapper).to(JsonNode.class);
+ } else if (document instanceof InputStream inputStream) {
+ document = JsonData.of(mapper.objectMapper().reader().readTree(inputStream), mapper).to(JsonNode.class);
+ } else if (document instanceof String string) {
+ document = JsonData.of(mapper.objectMapper().reader().readTree(new StringReader(string)), mapper)
+ .to(JsonNode.class);
+ } else if (document instanceof Reader reader) {
+ document = JsonData.of(mapper.objectMapper().reader().readTree(reader), mapper).to(JsonNode.class);
+ } else if (document instanceof Map<?, ?> map) {
+ document = mapper.objectMapper().convertValue(map, JsonNode.class);
+ }
+
+ if (document instanceof JsonNode jsonNode) {
+ JsonNode parentJsonNode = jsonNode.get(OPENSEARCH_UPDATE_DOC_PREFIX);
+ if (parentJsonNode != null) {
+ document = parentJsonNode;
+ }
+ document = JsonData.of(document, mapper).toJson();
+ }
+
+ return builder
+ .doc(document)
+ .waitForActiveShards(
+ new WaitForActiveShards.Builder()
+ .count(exchange.getIn().getHeader(OpensearchConstants.PARAM_WAIT_FOR_ACTIVE_SHARDS,
+ Integer.class))
+ .build())
+ .index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class))
+ .id(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_ID, String.class));
+ }
+
+ @Converter
+ public static GetRequest.Builder toGetRequestBuilder(Object document, Exchange exchange) {
+ if (document instanceof GetRequest.Builder) {
+ return (GetRequest.Builder) document;
+ }
+ if (document instanceof String) {
+ return new GetRequest.Builder()
+ .index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class))
+ .id((String) document);
+ }
+ return null;
+ }
+
+ @Converter
+ public static DeleteRequest.Builder toDeleteRequestBuilder(Object document, Exchange exchange) {
+ if (document instanceof DeleteRequest.Builder) {
+ return (DeleteRequest.Builder) document;
+ }
+ if (document instanceof String) {
+ return new DeleteRequest.Builder()
+ .index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class))
+ .id((String) document);
+ }
+ return null;
+ }
+
+ @Converter
+ public static DeleteIndexRequest.Builder toDeleteIndexRequestBuilder(Object document, Exchange exchange) {
+ if (document instanceof DeleteIndexRequest.Builder) {
+ return (DeleteIndexRequest.Builder) document;
+ }
+ if (document instanceof String) {
+ return new DeleteIndexRequest.Builder()
+ .index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class));
+ }
+ return null;
+ }
+
+ @Converter
+ public static MgetRequest.Builder toMgetRequestBuilder(Object documents, Exchange exchange) {
+ if (documents instanceof MgetRequest.Builder) {
+ return (MgetRequest.Builder) documents;
+ }
+ if (documents instanceof Iterable<?> documentIterable) {
+ MgetRequest.Builder builder = new MgetRequest.Builder();
+ builder.index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class));
+ for (Object document : documentIterable) {
+ if (document instanceof String) {
+ builder.ids((String) document);
+ } else {
+ LOG.warn(
+ "Cannot convert document id of type {} into a String",
+ document == null ? "null" : document.getClass().getName());
+ return null;
+ }
+ }
+ return builder;
+ }
+ return null;
+ }
+
+ @Converter
+ public static SearchRequest.Builder toSearchRequestBuilder(Object queryObject, Exchange exchange) throws IOException {
+ String indexName = exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class);
+
+ if (queryObject instanceof SearchRequest.Builder) {
+ SearchRequest.Builder builder = (SearchRequest.Builder) queryObject;
+ if (builder.build().index().isEmpty()) {
+ builder.index(indexName);
+ }
+ return builder;
+ }
+ SearchRequest.Builder builder = new SearchRequest.Builder();
+
+ // Only set up the indexName if the message header has the
+ // setting
+
+ Integer size = exchange.getIn().getHeader(OpensearchConstants.PARAM_SIZE, Integer.class);
+ Integer from = exchange.getIn().getHeader(OpensearchConstants.PARAM_FROM, Integer.class);
+ if (ObjectHelper.isNotEmpty(indexName)) {
+ builder.index(indexName);
+ }
+
+ if (queryObject instanceof Map<?, ?> mapQuery) {
+ // Remove 'query' prefix from the query object for backward
+ // compatibility with Elasticsearch
+ if (mapQuery.containsKey(OPENSEARCH_QUERY_DSL_PREFIX)) {
+ mapQuery = (Map<?, ?>) mapQuery.get(OPENSEARCH_QUERY_DSL_PREFIX);
+ }
+ queryObject = mapQuery;
+ } else if (queryObject instanceof String queryString) {
+ JacksonJsonpMapper mapper = createMapper();
+ JsonNode jsonTextObject = mapper.objectMapper().readValue(queryString, JsonNode.class);
+ JsonNode parentJsonNode = jsonTextObject.get(OPENSEARCH_QUERY_DSL_PREFIX);
+ if (parentJsonNode != null) {
+ queryString = parentJsonNode.toString();
+ }
+ mapper.objectMapper().reader().readTree(new StringReader(queryString));
+ queryObject = JsonData.of(mapper.objectMapper().reader().readTree(new StringReader(queryString)), mapper).toJson();
+ } else {
+ // Cannot convert the queryObject into SearchRequest
+ LOG.warn(
+ "Cannot convert queryObject of type {} into SearchRequest object",
+ queryObject == null ? "null" : queryObject.getClass().getName());
+ return null;
+ }
+ if (size != null) {
+ builder.size(size);
+ }
+ if (from != null) {
+ builder.from(from);
+ }
+
+ builder.query(JsonData.of(queryObject, createMapper()).to(Query.class));
+
+ return builder;
+ }
+
+ @Converter
+ public static BulkRequest.Builder toBulkRequestBuilder(Object documents, Exchange exchange) throws IOException {
+ if (documents instanceof BulkRequest.Builder) {
+ return (BulkRequest.Builder) documents;
+ }
+ if (documents instanceof Iterable) {
+ BulkRequest.Builder builder = new BulkRequest.Builder();
+ builder.index(exchange.getIn().getHeader(OpensearchConstants.PARAM_INDEX_NAME, String.class));
+ for (Object document : (List<?>) documents) {
+ if (document instanceof BulkOperationVariant) {
+ builder.operations(((BulkOperationVariant) document)._toBulkOperation());
+ } else if (document instanceof DeleteOperation.Builder) {
+ builder.operations(
+ new BulkOperation.Builder().delete(((DeleteOperation.Builder) document).build()).build());
+ } else if (document instanceof UpdateOperation.Builder) {
+ builder.operations(
+ new BulkOperation.Builder().update(((UpdateOperation.Builder<?>) document).build()).build());
+ } else if (document instanceof CreateOperation.Builder) {
+ builder.operations(
+ new BulkOperation.Builder().create(((CreateOperation.Builder<?>) document).build()).build());
+ } else {
+ builder.operations(
+ new BulkOperation.Builder().index(createIndexOperationBuilder(document, exchange).build()).build());
+ }
+ }
+
+ return builder;
+ }
+
+ return null;
+ }
+
+ private static JacksonJsonpMapper createMapper() {
+ ObjectMapper objectMapper = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, false)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ return new JacksonJsonpMapper(objectMapper);
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/OpensearchComponentVerifierExtensionTest.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/OpensearchComponentVerifierExtensionTest.java
new file mode 100644
index 00000000000..550ecccaa0f
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/OpensearchComponentVerifierExtensionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class OpensearchComponentVerifierExtensionTest extends CamelTestSupport {
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ void testParameters() {
+ Component component = context().getComponent("opensearch");
+
+ ComponentVerifierExtension verifier
+ = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("hostAddresses", "http://localhost:9000");
+ parameters.put("clusterName", "es-test");
+
+ ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+
+ assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+ }
+
+ @Test
+ void testConnectivity() {
+ Component component = context().getComponent("opensearch");
+ ComponentVerifierExtension verifier
+ = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("hostAddresses", "http://localhost:9000");
+
+ ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+
+ assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ }
+
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchBulkIT.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchBulkIT.java
new file mode 100644
index 00000000000..cb3bf9e98a8
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchBulkIT.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.bulk.BulkOperation;
+import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
+import org.opensearch.client.opensearch.core.bulk.CreateOperation;
+import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
+import org.opensearch.client.opensearch.core.bulk.IndexOperation;
+import org.opensearch.client.opensearch.core.bulk.UpdateOperation;
+
+import static org.apache.camel.test.junit5.TestSupport.assertCollectionSize;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class OpensearchBulkIT extends OpensearchTestSupport {
+
+ @Test
+ void testBulkWithMap() {
+ List<Map<String, String>> documents = new ArrayList<>();
+ Map<String, String> document1 = createIndexedData("1");
+ Map<String, String> document2 = createIndexedData("2");
+
+ documents.add(document1);
+ documents.add(document2);
+
+ List<?> indexIds = template.requestBody("direct:bulk", documents, List.class);
+ assertNotNull(indexIds, "indexIds should be set");
+ assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size());
+ }
+
+ @Test
+ void testBulkWithString() {
+ List<String> documents = List.of(
+ "{\"testBulkWithString1\": \"some-value\"}", "{\"testBulkWithString2\": \"some-value\"}");
+
+ List<?> indexIds = template.requestBody("direct:bulk", documents, List.class);
+ assertNotNull(indexIds, "indexIds should be set");
+ assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size());
+ }
+
+ @Test
+ void testBulkWithBytes() {
+ List<byte[]> documents = List.of(
+ "{\"testBulkWithBytes1\": \"some-value\"}".getBytes(StandardCharsets.UTF_8),
+ "{\"testBulkWithBytes2\": \"some-value\"}".getBytes(StandardCharsets.UTF_8));
+
+ List<?> indexIds = template.requestBody("direct:bulk", documents, List.class);
+ assertNotNull(indexIds, "indexIds should be set");
+ assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size());
+ }
+
+ @Test
+ void testBulkWithReader() {
+ List<Reader> documents = List.of(
+ new StringReader("{\"testBulkWithReader1\": \"some-value\"}"),
+ new StringReader("{\"testBulkWithReader2\": \"some-value\"}"));
+
+ List<?> indexIds = template.requestBody("direct:bulk", documents, List.class);
+ assertNotNull(indexIds, "indexIds should be set");
+ assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size());
+ }
+
+ @Test
+ void testBulkWithInputStream() {
+ List<InputStream> documents = List.of(
+ new ByteArrayInputStream(
+ "{\"testBulkWithInputStream1\": \"some-value\"}".getBytes(StandardCharsets.UTF_8)),
+ new ByteArrayInputStream(
+ "{\"testBulkWithInputStream2\": \"some-value\"}".getBytes(StandardCharsets.UTF_8)));
+
+ List<?> indexIds = template.requestBody("direct:bulk", documents, List.class);
+ assertNotNull(indexIds, "indexIds should be set");
+ assertCollectionSize("Indexed documents should match the size of documents", indexIds, documents.size());
+ }
+
+ @Test
+ void testBulkListRequestBody() {
+ String prefix = createPrefix();
+
+ // given
+ List<Map<String, String>> request = new ArrayList<>();
+ final HashMap<String, String> valueMap = new HashMap<>();
+ valueMap.put("id", prefix + "baz");
+ valueMap.put("content", prefix + "hello");
+ request.add(valueMap);
+ // when
+ List<?> indexedDocumentIds = template.requestBody("direct:bulk", request, List.class);
+
+ // then
+ assertThat(indexedDocumentIds, notNullValue());
+ assertThat(indexedDocumentIds.size(), equalTo(1));
+ }
+
+ @Test
+ void testBulkRequestBody() {
+ String prefix = createPrefix();
+
+ // given
+ BulkRequest.Builder builder = new BulkRequest.Builder();
+ builder.operations(
+ new BulkOperation.Builder()
+ .index(new IndexOperation.Builder<>().index(prefix + "foo").id(prefix + "baz")
+ .document(Map.of(prefix + "content", prefix + "hello")).build())
+ .build());
+
+ // when
+ @SuppressWarnings("unchecked")
+ List<BulkResponseItem> response = template.requestBody("direct:bulk", builder, List.class);
+
+ // then
+ assertThat(response, notNullValue());
+ assertThat(response.size(), equalTo(1));
+ assertThat(response.get(0).error(), nullValue());
+ assertThat(response.get(0).id(), equalTo(prefix + "baz"));
+ }
+
+ @Test
+ void bulkRequestBody() {
+ String prefix = createPrefix();
+
+ // given
+ BulkRequest.Builder builder = new BulkRequest.Builder();
+ builder.operations(
+ new BulkOperation.Builder()
+ .index(new IndexOperation.Builder<>().index(prefix + "foo").id(prefix + "baz")
+ .document(Map.of(prefix + "content", prefix + "hello")).build())
+ .build());
+ // when
+ @SuppressWarnings("unchecked")
+ List<BulkResponseItem> response = template.requestBody("direct:bulk", builder, List.class);
+
+ // then
+ assertThat(response, notNullValue());
+ assertEquals(prefix + "baz", response.get(0).id());
+ }
+
+ @Test
+ void bulkDeleteOperation() {
+ // given
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ DeleteOperation.Builder builder = new DeleteOperation.Builder().index("twitter").id(indexId);
+ // when
+ @SuppressWarnings("unchecked")
+ List<BulkResponseItem> response = template.requestBody("direct:bulk", List.of(builder), List.class);
+
+ // then
+ assertThat(response, notNullValue());
+ assertEquals(indexId, response.get(0).id());
+ GetResponse<?> resp = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(resp, "response should not be null");
+ assertNull(resp.source(), "response source should be null");
+ }
+
+ @Test
+ void bulkCreateOperation() {
+ // given
+ String prefix = createPrefix();
+
+ CreateOperation.Builder<?> builder
+ = new CreateOperation.Builder<>().index("twitter").document(Map.of(prefix + "content", prefix + "hello"));
+ // when
+ @SuppressWarnings("unchecked")
+ List<BulkResponseItem> response = template.requestBody("direct:bulk", List.of(builder), List.class);
+
+ // then
+ assertThat(response, notNullValue());
+ GetResponse<?> resp = template.requestBody("direct:get", response.get(0).id(), GetResponse.class);
+ assertNotNull(resp, "response should not be null");
+ assertNotNull(resp.source(), "response source should not be null");
+ }
+
+ @Test
+ void bulkUpdateOperation() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ Map<String, String> document
+ = Map.of(String.format("%skey2", createPrefix()), String.format("%svalue2", createPrefix()));
+
+ UpdateOperation<?> builder = new UpdateOperation.Builder<>()
+ .index("twitter")
+ .id(indexId)
+ .document(document)
+ .build();
+
+ @SuppressWarnings("unchecked")
+ List<BulkResponseItem> response = template.requestBody("direct:bulk", List.of(builder), List.class);
+
+ //now, verify GET succeeded
+ assertThat(response, notNullValue());
+ GetResponse<?> resp = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(resp, "response should not be null");
+ assertNotNull(resp.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, resp.source(), "response source should be a ObjectNode");
+ assertTrue(((ObjectNode) resp.source()).has(createPrefix() + "key2"));
+ assertEquals(createPrefix() + "value2", ((ObjectNode) resp.source()).get(createPrefix() + "key2").asText());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:index")
+ .to("opensearch://opensearch?operation=Index&indexName=twitter");
+ from("direct:get")
+ .to("opensearch://opensearch?operation=GetById&indexName=twitter");
+ from("direct:bulk")
+ .to("opensearch://opensearch?operation=Bulk&indexName=twitter");
+ }
+ };
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchClusterIndexIT.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchClusterIndexIT.java
new file mode 100644
index 00000000000..e2ad8dbbfe6
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchClusterIndexIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.opensearch.OpensearchConstants;
+import org.apache.camel.component.opensearch.OpensearchOperation;
+import org.apache.http.impl.client.BasicResponseHandler;
+import org.junit.jupiter.api.Test;
+import org.opensearch.client.Request;
+import org.opensearch.client.opensearch.core.GetRequest;
+
+import static org.apache.camel.test.junit5.TestSupport.assertStringContains;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class OpensearchClusterIndexIT extends OpensearchTestSupport {
+ @Test
+ void indexWithIpAndPort() throws Exception {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, "1");
+
+ String indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ indexId = template.requestBodyAndHeaders("direct:indexWithIpAndPort", map, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ assertTrue(client.get(new GetRequest.Builder().index("twitter").id("1").build(), ObjectNode.class).found(),
+ "Index id 1 must exists");
+ }
+
+ @Test
+ void indexWithSnifferEnable() throws Exception {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "facebook");
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, "4");
+
+ String indexId = template.requestBodyAndHeaders("direct:indexWithSniffer", map, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ assertTrue(client.get(new GetRequest.Builder().index("facebook").id("4").build(), ObjectNode.class).found(),
+ "Index id 4 must exists");
+
+ final BasicResponseHandler responseHandler = new BasicResponseHandler();
+ Request request = new Request("GET", "/_cluster/health?pretty");
+ String body = responseHandler.handleEntity(restClient.performRequest(request).getEntity());
+ assertStringContains(body, "\"number_of_data_nodes\" : 1");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:indexWithIpAndPort")
+ .to("opensearch://" + clusterName + "?operation=Index&indexName=twitter");
+ from("direct:indexWithSniffer")
+ .to("opensearch://" + clusterName + "?operation=Index&indexName=twitter&enableSniffer=true");
+ }
+ };
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchGetSearchDeleteExistsUpdateIT.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchGetSearchDeleteExistsUpdateIT.java
new file mode 100644
index 00000000000..a5e63a99349
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchGetSearchDeleteExistsUpdateIT.java
@@ -0,0 +1,913 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.opensearch.OpensearchConstants;
+import org.apache.camel.component.opensearch.OpensearchOperation;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.opensearch.client.opensearch._types.FieldValue;
+import org.opensearch.client.opensearch._types.Result;
+import org.opensearch.client.opensearch._types.query_dsl.MatchQuery;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.core.DeleteRequest;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.MsearchRequest;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.mget.MultiGetResponseItem;
+import org.opensearch.client.opensearch.core.msearch.MultiSearchResponseItem;
+import org.opensearch.client.opensearch.core.msearch.MultisearchBody;
+import org.opensearch.client.opensearch.core.msearch.MultisearchHeader;
+import org.opensearch.client.opensearch.core.msearch.RequestItem;
+import org.opensearch.client.opensearch.core.search.HitsMetadata;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class OpensearchGetSearchDeleteExistsUpdateIT extends OpensearchTestSupport {
+
+ @Test
+ void testIndexWithMap() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.source(), "response source should be a ObjectNode");
+ String key = map.keySet().iterator().next();
+ assertTrue(((ObjectNode) response.source()).has(key));
+ assertEquals(map.get(key), ((ObjectNode) response.source()).get(key).asText());
+ }
+
+ @Test
+ void testIndexWithString() {
+ //first, Index a value
+ String indexId = template.requestBody("direct:index", "{\"testIndexWithString\": \"some-value\"}", String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.source(), "response source should be a ObjectNode");
+ assertTrue(((ObjectNode) response.source()).has("testIndexWithString"));
+ assertEquals("some-value", ((ObjectNode) response.source()).get("testIndexWithString").asText());
+ }
+
+ @Test
+ void testIndexWithReader() {
+ //first, Index a value
+ String indexId = template.requestBody("direct:index", new StringReader("{\"testIndexWithReader\": \"some-value\"}"),
+ String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.source(), "response source should be a ObjectNode");
+ assertTrue(((ObjectNode) response.source()).has("testIndexWithReader"));
+ assertEquals("some-value", ((ObjectNode) response.source()).get("testIndexWithReader").asText());
+ }
+
+ @Test
+ void testIndexWithBytes() {
+ //first, Index a value
+ String indexId = template.requestBody("direct:index",
+ "{\"testIndexWithBytes\": \"some-value\"}".getBytes(StandardCharsets.UTF_8), String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.source(), "response source should be a ObjectNode");
+ assertTrue(((ObjectNode) response.source()).has("testIndexWithBytes"));
+ assertEquals("some-value", ((ObjectNode) response.source()).get("testIndexWithBytes").asText());
+ }
+
+ @Test
+ void testIndexWithInputStream() {
+ //first, Index a value
+ String indexId = template.requestBody("direct:index",
+ new ByteArrayInputStream("{\"testIndexWithInputStream\": \"some-value\"}".getBytes(StandardCharsets.UTF_8)),
+ String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.source(), "response source should be a ObjectNode");
+ assertTrue(((ObjectNode) response.source()).has("testIndexWithInputStream"));
+ assertEquals("some-value", ((ObjectNode) response.source()).get("testIndexWithInputStream").asText());
+ }
+
+ @Test
+ void testIndexWithDocumentType() {
+ Product product = new Product();
+ product.setId("book-world-records-2021");
+ product.setStockAvailable(1);
+ product.setPrice(100);
+ product.setDescription("The book of the year!");
+ product.setName("Guinness book of records 2021");
+
+ //first, Index a value
+ String indexId = template.requestBody("direct:index-product", product, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBodyAndHeader("direct:get", indexId,
+ OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(Product.class, response.source(), "response source should be a Product");
+ Product actual = (Product) response.source();
+ assertNotSame(product, actual);
+ assertEquals(product, actual);
+ }
+
+ @Test
+ void testGetWithString() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.source());
+ }
+
+ @Test
+ void testGetWithDocumentType() {
+ //first, Index a value
+ Product product = new Product();
+ product.setId("book-world-records-1890");
+ product.setStockAvailable(0);
+ product.setPrice(200);
+ product.setDescription("The book of the year!");
+ product.setName("Guinness book of records 1890");
+
+ String indexId = template.requestBody("direct:index", product, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBodyAndHeader(
+ "direct:get", indexId, OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(Product.class, response.source());
+ Product p = (Product) response.source();
+ assertEquals(product, p);
+ }
+
+ @Test
+ void testMGetWithString() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ @SuppressWarnings("unchecked")
+ List<MultiGetResponseItem<?>> response = template.requestBody("direct:multiget", List.of(indexId), List.class);
+ assertNotNull(response, "response should not be null");
+ assertEquals(1, response.size(), "response should contain one result");
+ assertTrue(response.get(0).isResult());
+ assertNotNull(response.get(0).result().source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.get(0).result().source());
+ }
+
+ @Test
+ void testMGetWithDocumentType() {
+ //first, Index a value
+ Product product = new Product();
+ product.setId("book-world-records-1890");
+ product.setStockAvailable(0);
+ product.setPrice(200);
+ product.setDescription("The book of the year!");
+ product.setName("Guinness book of records 1890");
+
+ String indexId = template.requestBody("direct:index", product, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ @SuppressWarnings("unchecked")
+ List<MultiGetResponseItem<?>> response = template.requestBodyAndHeader(
+ "direct:multiget", List.of(indexId), OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class, List.class);
+ assertNotNull(response, "response should not be null");
+ assertEquals(1, response.size(), "response should contain one result");
+ assertTrue(response.get(0).isResult());
+ assertNotNull(response.get(0).result().source(), "response source should not be null");
+ assertInstanceOf(Product.class, response.get(0).result().source());
+ Product p = (Product) response.get(0).result().source();
+ assertEquals(product, p);
+ }
+
+ @Test
+ void testDeleteWithString() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+
+ //now, perform Delete
+ Result deleteResponse = template.requestBody("direct:delete", indexId, Result.class);
+ assertNotNull(deleteResponse, "response should not be null");
+
+ //now, verify GET fails to find the indexed value
+ response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNull(response.source(), "response source should be null");
+ }
+
+ @Test
+ void testSearchWithMapQuery() {
+ //first, Index a value
+ Map<String, String> map1 = Map.of("testSearchWithMapQuery1", "foo");
+ Map<String, String> map2 = Map.of("testSearchWithMapQuery2", "bar");
+ Map<String, Object> headers = Map.of(
+ OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Bulk,
+ OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ template.requestBodyAndHeaders("direct:start", List.of(Map.of("doc", map1), Map.of("doc", map2)), headers,
+ String.class);
+
+ // No match
+ Map<String, Object> actualQuery = new HashMap<>();
+ actualQuery.put("doc.testSearchWithMapQuery1", "bar");
+ Map<String, Object> match = new HashMap<>();
+ match.put("match", actualQuery);
+ Map<String, Object> query = new HashMap<>();
+ query.put("query", match);
+ HitsMetadata<?> response = template.requestBody("direct:search", query, HitsMetadata.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.total());
+ assertEquals(0, response.total().value(), "response hits should be == 0");
+
+ // Match
+ actualQuery.put("doc.testSearchWithMapQuery1", "foo");
+ // the result may see stale data so use Awaitility
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ HitsMetadata<?> resp = template.requestBody("direct:search", query, HitsMetadata.class);
+ assertNotNull(resp, "response should not be null");
+ assertNotNull(resp.total());
+ assertEquals(1, resp.total().value(), "response hits should be == 1");
+ assertEquals(1, resp.hits().size(), "response hits should be == 1");
+ Object result = resp.hits().get(0).source();
+ assertInstanceOf(ObjectNode.class, result);
+ assertTrue(((ObjectNode) result).has("doc"));
+ JsonNode node = ((ObjectNode) result).get("doc");
+ assertTrue(node.has("testSearchWithMapQuery1"));
+ assertEquals("foo", node.get("testSearchWithMapQuery1").asText());
+ });
+ }
+
+ @Test
+ void testSearchWithStringQuery() {
+ //first, Index a value
+ Map<String, String> map1 = Map.of("testSearchWithStringQuery1", "foo");
+ Map<String, String> map2 = Map.of("testSearchWithStringQuery2", "bar");
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Bulk);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ template.requestBodyAndHeaders("direct:start", List.of(Map.of("doc", map1), Map.of("doc", map2)), headers,
+ String.class);
+
+ // No match
+ String query = """
+ {
+ "query" : { "match" : { "doc.testSearchWithStringQuery1" : "bar" }}
+ }
+ """;
+
+ HitsMetadata<?> response = template.requestBody("direct:search", query, HitsMetadata.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.total());
+ assertEquals(0, response.total().value(), "response hits should be == 0");
+
+ // Match
+ String q = """
+ {
+ "query" : { "match" : { "doc.testSearchWithStringQuery1" : "foo" }}
+ }
+ """;
+ // the result may see stale data so use Awaitility
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ HitsMetadata<?> resp = template.requestBody("direct:search", q, HitsMetadata.class);
+ assertNotNull(resp, "response should not be null");
+ assertNotNull(resp.total());
+ assertEquals(1, resp.total().value(), "response hits should be == 1");
+ assertEquals(1, resp.hits().size(), "response hits should be == 1");
+ Object result = resp.hits().get(0).source();
+ assertInstanceOf(ObjectNode.class, result);
+ assertTrue(((ObjectNode) result).has("doc"));
+ JsonNode node = ((ObjectNode) result).get("doc");
+ assertTrue(node.has("testSearchWithStringQuery1"));
+ assertEquals("foo", node.get("testSearchWithStringQuery1").asText());
+ });
+ }
+
+ @Test
+ void testSearchWithBuilder() {
+ //first, Index a value
+ Map<String, String> map1 = Map.of("testSearchWithBuilder1", "foo");
+ Map<String, String> map2 = Map.of("testSearchWithBuilder2", "bar");
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Bulk);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ template.requestBodyAndHeaders("direct:start", List.of(Map.of("doc", map1), Map.of("doc", map2)), headers,
+ String.class);
+
+ // No match
+ SearchRequest.Builder builder = new SearchRequest.Builder()
+ .query(new Query.Builder()
+ .match(new MatchQuery.Builder().field("doc.testSearchWithBuilder1").query(FieldValue.of("bar")).build())
+ .build());
+ HitsMetadata<?> response = template.requestBody("direct:search", builder, HitsMetadata.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.total());
+ assertEquals(0, response.total().value(), "response hits should be == 0");
+
+ // Match
+ // the result may see stale data so use Awaitility
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ SearchRequest.Builder b = new SearchRequest.Builder()
+ .query(new Query.Builder()
+ .match(new MatchQuery.Builder().field("doc.testSearchWithBuilder1").query(FieldValue.of("foo"))
+ .build())
+ .build());
+
+ HitsMetadata<?> resp = template.requestBody("direct:search", b, HitsMetadata.class);
+ assertNotNull(resp, "response should not be null");
+ assertNotNull(resp.total());
+ assertEquals(1, resp.total().value(), "response hits should be == 1");
+ assertEquals(1, resp.hits().size(), "response hits should be == 1");
+ Object result = resp.hits().get(0).source();
+ assertInstanceOf(ObjectNode.class, result);
+ assertTrue(((ObjectNode) result).has("doc"));
+ JsonNode node = ((ObjectNode) result).get("doc");
+ assertTrue(node.has("testSearchWithBuilder1"));
+ assertEquals("foo", node.get("testSearchWithBuilder1").asText());
+ });
+ }
+
+ @Test
+ void testSearchWithDocumentType() {
+ //first, Index a value
+ Product product1 = new Product();
+ product1.setId("book-world-records-2020");
+ product1.setStockAvailable(1);
+ product1.setPrice(100);
+ product1.setDescription("The book of the year!");
+ product1.setName("Guinness book of records 2020");
+
+ Product product2 = new Product();
+ product2.setId("book-world-records-2010");
+ product2.setStockAvailable(200);
+ product2.setPrice(80);
+ product2.setDescription("The book of the year!");
+ product2.setName("Guinness book of records 2010");
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Bulk);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ template.requestBodyAndHeaders("direct:start", List.of(product1, product2), headers, String.class);
+
+ // No match
+ SearchRequest.Builder builder = new SearchRequest.Builder()
+ .query(new Query.Builder().match(new MatchQuery.Builder().field("doc.id").query(FieldValue.of("bar")).build())
+ .build());
+ HitsMetadata<?> response = template.requestBodyAndHeader(
+ "direct:search", builder, OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class, HitsMetadata.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.total());
+ assertEquals(0, response.total().value(), "response hits should be == 0");
+
+ // Match
+ // the result may see stale data so use Awaitility
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ SearchRequest.Builder b = new SearchRequest.Builder()
+ .query(new Query.Builder().match(new MatchQuery.Builder().field("id").query(FieldValue.of("2020")).build())
+ .build());
+
+ HitsMetadata<?> resp = template.requestBodyAndHeader(
+ "direct:search", b, OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class, HitsMetadata.class);
+ assertNotNull(resp, "response should not be null");
+ assertNotNull(resp.total());
+ assertEquals(1, resp.total().value(), "response hits should be == 1");
+ assertEquals(1, resp.hits().size(), "response hits should be == 1");
+ Object result = resp.hits().get(0).source();
+ assertInstanceOf(Product.class, result);
+ Product p = (Product) result;
+ assertEquals(product1, p);
+ });
+ }
+
+ @Test
+ void testMultiSearch() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ // the result may see stale data so use Awaitility
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ //now, verify GET succeeded
+ MsearchRequest.Builder builder = new MsearchRequest.Builder().index("twitter").searches(
+ new RequestItem.Builder().header(new MultisearchHeader.Builder().build())
+ .body(new MultisearchBody.Builder().query(b -> b.matchAll(x -> x)).build()).build(),
+ new RequestItem.Builder().header(new MultisearchHeader.Builder().build())
+ .body(new MultisearchBody.Builder().query(b -> b.matchAll(x -> x)).build()).build());
+ @SuppressWarnings("unchecked")
+ List<MultiSearchResponseItem<?>> response = template.requestBody("direct:multiSearch", builder, List.class);
+ assertNotNull(response, "response should not be null");
+ assertEquals(2, response.size(), "response should be == 2");
+ assertInstanceOf(MultiSearchResponseItem.class, response.get(0));
+ assertTrue(response.get(0).isResult());
+ assertNotNull(response.get(0).result());
+ assertTrue(response.get(0).result().hits().total().value() > 0);
+ assertInstanceOf(MultiSearchResponseItem.class, response.get(1));
+ assertTrue(response.get(1).isResult());
+ assertNotNull(response.get(1).result());
+ assertTrue(response.get(1).result().hits().total().value() > 0);
+ });
+ }
+
+ @Test
+ void testMultiSearchWithDocumentType() {
+ //first, Index a value
+ Product product = new Product();
+ product.setId("book-world-records-2022");
+ product.setStockAvailable(1);
+ product.setPrice(100);
+ product.setDescription("The book of the year!");
+ product.setName("Guinness book of records 2022");
+ String indexId = template.requestBodyAndHeader("direct:index", product, OpensearchConstants.PARAM_INDEX_NAME,
+ "multi-search", String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ // the result may see stale data so use Awaitility
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ //now, verify GET succeeded
+ MsearchRequest.Builder builder = new MsearchRequest.Builder().index("multi-search").searches(
+ new RequestItem.Builder().header(new MultisearchHeader.Builder().build())
+ .body(new MultisearchBody.Builder().query(b -> b.matchAll(x -> x)).build()).build(),
+ new RequestItem.Builder().header(new MultisearchHeader.Builder().build())
+ .body(new MultisearchBody.Builder().query(b -> b.matchAll(x -> x)).build()).build());
+ @SuppressWarnings("unchecked")
+ List<MultiSearchResponseItem<?>> response = template.requestBodyAndHeaders(
+ "direct:multiSearch", builder,
+ Map.of(
+ OpensearchConstants.PARAM_INDEX_NAME, "multi-search",
+ OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class),
+ List.class);
+ assertNotNull(response, "response should not be null");
+ assertEquals(2, response.size(), "response should be == 2");
+ assertInstanceOf(MultiSearchResponseItem.class, response.get(0));
+ assertTrue(response.get(0).isResult());
+ assertNotNull(response.get(0).result());
+ assertTrue(response.get(0).result().hits().total().value() > 0);
+ assertInstanceOf(MultiSearchResponseItem.class, response.get(1));
+ assertTrue(response.get(1).isResult());
+ assertNotNull(response.get(1).result());
+ assertTrue(response.get(1).result().hits().total().value() > 0);
+ });
+ }
+
+ @Test
+ void testUpdateWithMap() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ Map<String, String> newMap = new HashMap<>();
+ newMap.put(createPrefix() + "key2", createPrefix() + "value2");
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, indexId);
+ indexId = template.requestBodyAndHeaders("direct:update", Map.of("doc", newMap), headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ //now, verify GET succeeded
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ assertInstanceOf(ObjectNode.class, response.source(), "response source should be a ObjectNode");
+ assertTrue(((ObjectNode) response.source()).has(createPrefix() + "key2"));
+ assertEquals(createPrefix() + "value2", ((ObjectNode) response.source()).get(createPrefix() + "key2").asText());
+ }
+
+ @Test
+ void testGetWithHeaders() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+
+ String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+
+ //now, verify GET
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.GetById);
+ GetResponse<?> response = template.requestBodyAndHeaders("direct:start", indexId, headers, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+ }
+
+ @Test
+ void testExistsWithHeaders() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+
+ template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+
+ //now, verify GET
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Exists);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ Boolean exists = template.requestBodyAndHeaders("direct:exists", "", headers, Boolean.class);
+ assertNotNull(exists, "response should not be null");
+ assertTrue(exists, "Index should exists");
+ }
+
+ @Test
+ void testNotExistsWithHeaders() {
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Exists);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter-tweet");
+ Boolean exists = template.requestBodyAndHeaders("direct:exists", "", headers, Boolean.class);
+ assertNotNull(exists, "response should not be null");
+ assertFalse(exists, "Index should not exists");
+ }
+
+ @Test
+ void testDeleteWithHeaders() {
+ //first, Index a value
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+
+ String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+
+ //now, verify GET
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.GetById);
+ GetResponse<?> response = template.requestBodyAndHeaders("direct:start", indexId, headers, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNotNull(response.source(), "response source should not be null");
+
+ //now, perform Delete
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Delete);
+ Result deleteResponse
+ = template.requestBodyAndHeaders("direct:start", indexId, headers, Result.class);
+ assertEquals(Result.Deleted, deleteResponse, "response should not be null");
+
+ //now, verify GET fails to find the indexed value
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.GetById);
+ response = template.requestBodyAndHeaders("direct:start", indexId, headers, GetResponse.class);
+ assertNotNull(response, "response should not be null");
+ assertNull(response.source(), "response source should be null");
+ }
+
+ @Test
+ void testUpdateWithIDInHeader() {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, "123");
+
+ String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ assertEquals("123", indexId, "indexId should be equals to the provided id");
+
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Update);
+
+ indexId = template.requestBodyAndHeaders("direct:start", Map.of("doc", map), headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ assertEquals("123", indexId, "indexId should be equals to the provided id");
+ }
+
+ @Test
+ void testGetRequestBody() {
+ String prefix = createPrefix();
+
+ // given
+ GetRequest.Builder builder = new GetRequest.Builder().index(prefix + "foo");
+
+ // when
+ String documentId = template.requestBody("direct:index",
+ new IndexRequest.Builder<>()
+ .index(prefix + "foo")
+ .id(prefix + "testId")
+ .document(Map.of(prefix + "content", prefix + "hello")),
+ String.class);
+ GetResponse<?> response = template.requestBody("direct:get",
+ builder.id(documentId), GetResponse.class);
+
+ // then
+ assertThat(response, notNullValue());
+
+ assertThat(response.source(), notNullValue());
+ ObjectNode node = (ObjectNode) response.source();
+ assertThat(node.has(prefix + "content"), equalTo(true));
+ assertThat(node.get(prefix + "content").asText(), equalTo(prefix + "hello"));
+ }
+
+ @Test
+ void testDeleteWithBuilder() {
+ String prefix = createPrefix();
+
+ // given
+ String documentId = template.requestBody("direct:index",
+ new IndexRequest.Builder<>()
+ .index(prefix + "foo")
+ .id(prefix + "testId")
+ .document(Map.of(prefix + "content", prefix + "hello")),
+ String.class);
+
+ GetResponse<?> getResponse = template.requestBodyAndHeader(
+ "direct:get", documentId, OpensearchConstants.PARAM_INDEX_NAME, prefix + "foo", GetResponse.class);
+ assertNotNull(getResponse, "response should not be null");
+ assertNotNull(getResponse.source(), "response source should not be null");
+
+ // when
+ Result response
+ = template.requestBody("direct:delete", new DeleteRequest.Builder().index(prefix + "foo").id(documentId),
+ Result.class);
+
+ // then
+ assertThat(response, equalTo(Result.Deleted));
+ getResponse = template.requestBodyAndHeader(
+ "direct:get", documentId, OpensearchConstants.PARAM_INDEX_NAME, prefix + "foo", GetResponse.class);
+ assertNotNull(getResponse, "response should not be null");
+ assertNull(getResponse.source(), "response source should be null");
+ }
+
+ @Test
+ void testUpdateWithString() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ String key = map.keySet().iterator().next();
+ Object body = String.format("{ \"doc\": {\"%s\" : \"testUpdateWithString-updated\"}}", key);
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, indexId);
+ indexId = template.requestBodyAndHeaders("direct:update", body, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertThat(response.source(), notNullValue());
+ ObjectNode node = (ObjectNode) response.source();
+ assertThat(node.has(key), equalTo(true));
+ assertThat(node.get(key).asText(), equalTo("testUpdateWithString-updated"));
+ }
+
+ @Test
+ void testUpdateWithReader() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ String key = map.keySet().iterator().next();
+ Object body = new StringReader(String.format("{ \"doc\": {\"%s\" : \"testUpdateWithReader-updated\"}}", key));
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, indexId);
+ indexId = template.requestBodyAndHeaders("direct:update", body, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertThat(response.source(), notNullValue());
+ ObjectNode node = (ObjectNode) response.source();
+ assertThat(node.has(key), equalTo(true));
+ assertThat(node.get(key).asText(), equalTo("testUpdateWithReader-updated"));
+ }
+
+ @Test
+ void testUpdateWithBytes() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ String key = map.keySet().iterator().next();
+ Object body
+ = String.format("{ \"doc\": {\"%s\" : \"testUpdateWithBytes-updated\"}}", key).getBytes(StandardCharsets.UTF_8);
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, indexId);
+ indexId = template.requestBodyAndHeaders("direct:update", body, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertThat(response.source(), notNullValue());
+ ObjectNode node = (ObjectNode) response.source();
+ assertThat(node.has(key), equalTo(true));
+ assertThat(node.get(key).asText(), equalTo("testUpdateWithBytes-updated"));
+ }
+
+ @Test
+ void testUpdateWithInputStream() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ String key = map.keySet().iterator().next();
+ Object body = new ByteArrayInputStream(
+ String.format("{ \"doc\": {\"%s\" : \"testUpdateWithInputStream-updated\"}}", key)
+ .getBytes(StandardCharsets.UTF_8));
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, indexId);
+ indexId = template.requestBodyAndHeaders("direct:update", body, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ GetResponse<?> response = template.requestBody("direct:get", indexId, GetResponse.class);
+ assertThat(response.source(), notNullValue());
+ ObjectNode node = (ObjectNode) response.source();
+ assertThat(node.has(key), equalTo(true));
+ assertThat(node.get(key).asText(), equalTo("testUpdateWithInputStream-updated"));
+ }
+
+ @Test
+ void testUpdateWithDocumentType() {
+ Product product = new Product();
+ product.setId("book-world-records-2010");
+ product.setStockAvailable(200);
+ product.setPrice(80);
+ product.setDescription("The book of the year!");
+ product.setName("Guinness book of records 2010");
+
+ String indexId = template.requestBody("direct:index", product, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ Product productUpdate = new Product();
+ productUpdate.setStockAvailable(250);
+ productUpdate.setPrice(82);
+ productUpdate.setName("Guinness book of records 2010 2nd edition");
+
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, indexId);
+ headers.put(OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class);
+ indexId = template.requestBodyAndHeaders("direct:update", productUpdate, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ GetResponse<?> response = template.requestBodyAndHeader(
+ "direct:get", indexId, OpensearchConstants.PARAM_DOCUMENT_CLASS, Product.class, GetResponse.class);
+ assertThat(response.source(), notNullValue());
+ Product actual = (Product) response.source();
+ assertThat(actual.getId(), equalTo("book-world-records-2010"));
+ assertThat(actual.getStockAvailable(), equalTo(250));
+ assertThat(actual.getPrice(), equalTo(82d));
+ assertThat(actual.getDescription(), equalTo("The book of the year!"));
+ assertThat(actual.getName(), equalTo("Guinness book of records 2010 2nd edition"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start")
+ .to("opensearch://opensearch?operation=Index");
+ from("direct:index")
+ .to("opensearch://opensearch?operation=Index&indexName=twitter");
+ from("direct:index-product")
+ .toF("opensearch://opensearch?operation=Index&indexName=twitter&documentClass=%s",
+ Product.class.getName());
+ from("direct:get")
+ .to("opensearch://opensearch?operation=GetById&indexName=twitter");
+ from("direct:multiget")
+ .to("opensearch://opensearch?operation=MultiGet&indexName=twitter");
+ from("direct:delete")
+ .to("opensearch://opensearch?operation=Delete&indexName=twitter");
+ from("direct:search")
+ .to("opensearch://opensearch?operation=Search&indexName=twitter");
+ from("direct:search-1")
+ .to("opensearch://opensearch?operation=Search");
+ from("direct:multiSearch")
+ .to("opensearch://opensearch?operation=MultiSearch");
+ from("direct:update")
+ .to("opensearch://opensearch?operation=Update&indexName=twitter");
+ from("direct:exists")
+ .to("opensearch://opensearch?operation=Exists");
+ }
+ };
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public static class Product {
+
+ private String id;
+ private String name;
+ private String description;
+ private double price;
+ private int stockAvailable;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public double getPrice() {
+ return price;
+ }
+
+ public void setPrice(double price) {
+ this.price = price;
+ }
+
+ public int getStockAvailable() {
+ return stockAvailable;
+ }
+
+ public void setStockAvailable(int stockAvailable) {
+ this.stockAvailable = stockAvailable;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Product product = (Product) o;
+ return Double.compare(product.price, price) == 0 && stockAvailable == product.stockAvailable
+ && Objects.equals(id, product.id) && Objects.equals(name, product.name)
+ && Objects.equals(description, product.description);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name, description, price, stockAvailable);
+ }
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchIndexIT.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchIndexIT.java
new file mode 100644
index 00000000000..d5174a84b59
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchIndexIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.opensearch.OpensearchConstants;
+import org.apache.camel.component.opensearch.OpensearchOperation;
+import org.junit.jupiter.api.Test;
+import org.opensearch.client.opensearch.indices.DeleteIndexRequest;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class OpensearchIndexIT extends OpensearchTestSupport {
+
+ @Test
+ void testIndex() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ }
+
+ @Test
+ void testIndexDeleteWithBuilder() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ boolean exists = template.requestBody("direct:exists", null, Boolean.class);
+ assertTrue(exists, "index should be present");
+
+ DeleteIndexRequest.Builder builder = new DeleteIndexRequest.Builder().index("twitter");
+ Boolean status = template.requestBody("direct:deleteIndex", builder, Boolean.class);
+ assertEquals(true, status, "status should be 200");
+
+ exists = template.requestBody("direct:exists", null, Boolean.class);
+ assertFalse(exists, "index should be absent");
+ }
+
+ @Test
+ void testIndexDeleteWithString() {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+
+ boolean exists = template.requestBody("direct:exists", null, Boolean.class);
+ assertTrue(exists, "index should be present");
+
+ Boolean status = template.requestBody("direct:deleteIndex", "twitter", Boolean.class);
+ assertEquals(true, status, "status should be 200");
+
+ exists = template.requestBody("direct:exists", null, Boolean.class);
+ assertFalse(exists, "index should be absent");
+ }
+
+ @Test
+ void testIndexWithHeaders() {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+
+ String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ }
+
+ @Test
+ void testIndexWithIDInHeader() {
+ Map<String, String> map = createIndexedData();
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(OpensearchConstants.PARAM_OPERATION, OpensearchOperation.Index);
+ headers.put(OpensearchConstants.PARAM_INDEX_NAME, "twitter");
+ headers.put(OpensearchConstants.PARAM_INDEX_ID, "123");
+
+ String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ assertEquals("123", indexId, "indexId should be equals to the provided id");
+ }
+
+ @Test
+ void testExists() {
+ boolean exists = template.requestBodyAndHeader(
+ "direct:exists", null, OpensearchConstants.PARAM_INDEX_NAME, "test_exists", Boolean.class);
+ assertFalse(exists, "index should be absent");
+
+ Map<String, String> map = createIndexedData();
+ template.sendBodyAndHeader("direct:index", map, OpensearchConstants.PARAM_INDEX_NAME, "test_exists");
+
+ exists = template.requestBodyAndHeader(
+ "direct:exists", null, OpensearchConstants.PARAM_INDEX_NAME, "test_exists", Boolean.class);
+ assertTrue(exists, "index should be present");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start")
+ .to("opensearch://opensearch");
+ from("direct:index")
+ .to("opensearch://opensearch?operation=Index&indexName=twitter");
+ from("direct:exists")
+ .to("opensearch://opensearch?operation=Exists&indexName=twitter");
+ from("direct:deleteIndex")
+ .to("opensearch://opensearch?operation=DeleteIndex&indexName=twitter");
+ }
+ };
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchPingIT.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchPingIT.java
new file mode 100644
index 00000000000..23e0fb0c96a
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchPingIT.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class OpensearchPingIT extends OpensearchTestSupport {
+
+ @Test
+ void testPing() {
+ boolean pingResult = template.requestBody("direct:ping", "test", Boolean.class);
+ assertTrue(pingResult, "indexId should be set");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:ping")
+ .to("opensearch://opensearch?operation=Ping");
+ }
+ };
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchScrollSearchIT.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchScrollSearchIT.java
new file mode 100644
index 00000000000..585290db046
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchScrollSearchIT.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.ExchangeBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.opensearch.OpensearchScrollRequestIterator;
+import org.junit.jupiter.api.Test;
+import org.opensearch.client.Request;
+import org.opensearch.client.Response;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.search.Hit;
+
+import static org.apache.camel.component.opensearch.OpensearchConstants.PARAM_SCROLL;
+import static org.apache.camel.component.opensearch.OpensearchConstants.PARAM_SCROLL_KEEP_ALIVE_MS;
+import static org.apache.camel.component.opensearch.OpensearchConstants.PROPERTY_SCROLL_OPENSEARCH_QUERY_COUNT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class OpensearchScrollSearchIT extends OpensearchTestSupport {
+
+ private static final String TWITTER_OPENSEARCH_INDEX_NAME = "scroll-search";
+ private static final String SPLIT_TWITTER_OPENSEARCH_INDEX_NAME = "split-" + TWITTER_OPENSEARCH_INDEX_NAME;
+
+ @Test
+ void testScrollSearch() throws IOException {
+ // add some documents
+ for (int i = 0; i < 10; i++) {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:scroll-index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ }
+
+ // perform a refresh
+ Response refreshResponse
+ = getClient().performRequest(new Request("post", "/" + TWITTER_OPENSEARCH_INDEX_NAME + "/_refresh"));
+ assertEquals(200, refreshResponse.getStatusLine().getStatusCode(), "Cannot perform a refresh");
+
+ SearchRequest.Builder req = getScrollSearchRequestBuilder(TWITTER_OPENSEARCH_INDEX_NAME);
+
+ Exchange exchange = ExchangeBuilder.anExchange(context)
+ .withHeader(PARAM_SCROLL_KEEP_ALIVE_MS, 50000)
+ .withHeader(PARAM_SCROLL, true)
+ .withBody(req)
+ .build();
+
+ exchange = template.send("direct:scroll-search", exchange);
+
+ try (OpensearchScrollRequestIterator<?> scrollRequestIterator
+ = exchange.getIn().getBody(OpensearchScrollRequestIterator.class)) {
+ assertNotNull(scrollRequestIterator, "response should not be null");
+
+ List<Hit<?>> result = new ArrayList<>();
+ scrollRequestIterator.forEachRemaining(result::add);
+
+ assertEquals(10, result.size(), "response hits should be == 10");
+ assertEquals(11, scrollRequestIterator.getRequestCount(), "11 request should have been send to OpenSearch");
+ }
+
+ OpensearchScrollRequestIterator<?> scrollRequestIterator
+ = exchange.getIn().getBody(OpensearchScrollRequestIterator.class);
+ assertTrue(scrollRequestIterator.isClosed(), "iterator should be closed");
+ assertEquals(11, (int) exchange.getProperty(PROPERTY_SCROLL_OPENSEARCH_QUERY_COUNT, Integer.class),
+ "11 request should have been send to OpenSearch");
+ }
+
+ @Test
+ void testScrollAndSplitSearch() throws IOException, InterruptedException {
+ // add some documents
+ for (int i = 0; i < 10; i++) {
+ Map<String, String> map = createIndexedData();
+ String indexId = template.requestBody("direct:scroll-n-split-index", map, String.class);
+ assertNotNull(indexId, "indexId should be set");
+ }
+
+ // perform a refresh
+ Response refreshResponse
+ = getClient().performRequest(new Request("post", "/" + SPLIT_TWITTER_OPENSEARCH_INDEX_NAME + "/_refresh"));
+ assertEquals(200, refreshResponse.getStatusLine().getStatusCode(), "Cannot perform a refresh");
+
+ MockEndpoint mock = getMockEndpoint("mock:output");
+ mock.expectedMessageCount(1);
+ mock.setResultWaitTime(8000);
+
+ SearchRequest.Builder req = getScrollSearchRequestBuilder(SPLIT_TWITTER_OPENSEARCH_INDEX_NAME);
+
+ Exchange exchange = ExchangeBuilder.anExchange(context).withBody(req).build();
+ exchange = template.send("direct:scroll-n-split-search", exchange);
+
+ // wait for aggregation
+ mock.assertIsSatisfied();
+ Iterator<Exchange> iterator = mock.getReceivedExchanges().iterator();
+ assertTrue(iterator.hasNext(), "response should contain 1 exchange");
+ Collection<?> aggregatedExchanges = iterator.next().getIn().getBody(Collection.class);
+
+ assertEquals(10, aggregatedExchanges.size(), "response hits should be == 10");
+
+ OpensearchScrollRequestIterator<?> scrollRequestIterator
+ = exchange.getIn().getBody(OpensearchScrollRequestIterator.class);
+ assertTrue(scrollRequestIterator.isClosed(), "iterator should be closed");
+ assertEquals(11, scrollRequestIterator.getRequestCount(), "11 request should have been send to Opensearch");
+ assertEquals(11, (int) exchange.getProperty(PROPERTY_SCROLL_OPENSEARCH_QUERY_COUNT, Integer.class),
+ "11 request should have been send to Opensearch");
+ }
+
+ private SearchRequest.Builder getScrollSearchRequestBuilder(String indexName) {
+ SearchRequest.Builder builder = new SearchRequest.Builder().index(indexName);
+ builder.size(1);
+ builder.query(new Query.Builder().matchAll(new MatchAllQuery.Builder().build()).build());
+ return builder;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:scroll-index")
+ .to("opensearch://opensearch?operation=Index&indexName=" + TWITTER_OPENSEARCH_INDEX_NAME);
+ from("direct:scroll-search")
+ .to("opensearch://opensearch?operation=Search&indexName=" + TWITTER_OPENSEARCH_INDEX_NAME);
+
+ from("direct:scroll-n-split-index")
+ .to("opensearch://opensearch?operation=Index&indexName=" + SPLIT_TWITTER_OPENSEARCH_INDEX_NAME);
+ from("direct:scroll-n-split-search")
+ .to("opensearch://opensearch?"
+ + "useScroll=true&scrollKeepAliveMs=50000&operation=Search&indexName="
+ + SPLIT_TWITTER_OPENSEARCH_INDEX_NAME)
+ .split()
+ .body()
+ .streaming()
+ .parallelProcessing()
+ .threads(12)
+ .aggregate(AggregationStrategies.groupedExchange())
+ .constant(true)
+ .completionSize(20)
+ .completionTimeout(2000)
+ .to("mock:output")
+ .end();
+ }
+ };
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchSizeLimitIT.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchSizeLimitIT.java
new file mode 100644
index 00000000000..cac6a8063a2
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchSizeLimitIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.opensearch.client.opensearch.core.search.HitsMetadata;
+
+class OpensearchSizeLimitIT extends OpensearchTestSupport {
+
+ @Test
+ void testSize() {
+ //put 4
+ template.requestBody("direct:index", getContent("content"), String.class);
+ template.requestBody("direct:index", getContent("content1"), String.class);
+ template.requestBody("direct:index", getContent("content2"), String.class);
+ template.requestBody("direct:index", getContent("content3"), String.class);
+
+ String query = """
+ {
+ "query" : {
+ "match_all": {}
+ }
+ }
+ """;
+
+ // the result may see stale data so use Awaitility
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ HitsMetadata<?> searchWithSizeTwo = template.requestBody("direct:searchWithSizeTwo", query, HitsMetadata.class);
+ HitsMetadata<?> searchFrom3 = template.requestBody("direct:searchFrom3", query, HitsMetadata.class);
+ return searchWithSizeTwo.hits().size() == 2 && searchFrom3.hits().size() == 1;
+ });
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:index")
+ .to("opensearch://opensearch?operation=Index&indexName=size-limit");
+ from("direct:searchWithSizeTwo")
+ .to("opensearch://opensearch?operation=Search&indexName=size-limit&size=2");
+ from("direct:searchFrom3")
+ .to("opensearch://opensearch?operation=Search&indexName=size-limit&from=3");
+ }
+ };
+ }
+
+ private Map<String, String> getContent(String content) {
+ Map<String, String> map = new HashMap<>();
+ map.put("content", content);
+ return map;
+ }
+}
diff --git a/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchTestSupport.java b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchTestSupport.java
new file mode 100644
index 00000000000..431217a51e6
--- /dev/null
+++ b/components/camel-opensearch/src/test/java/org/apache/camel/component/opensearch/integration/OpensearchTestSupport.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.opensearch.integration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.opensearch.OpensearchComponent;
+import org.apache.camel.test.infra.opensearch.services.OpenSearchService;
+import org.apache.camel.test.infra.opensearch.services.OpenSearchServiceFactory;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch.OpenSearchClient;
+import org.opensearch.client.transport.rest_client.RestClientTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class OpensearchTestSupport extends CamelTestSupport {
+
+ @RegisterExtension
+ protected static OpenSearchService service = OpenSearchServiceFactory.createSingletonService();
+
+ protected static String clusterName = "docker-cluster";
+ protected static RestClient restClient;
+ protected static OpenSearchClient client;
+ private static final Logger LOG = LoggerFactory.getLogger(OpensearchTestSupport.class);
+
+ @Override
+ protected void setupResources() throws Exception {
+ super.setupResources();
+ HttpHost host
+ = new HttpHost(service.getOpenSearchHost(), service.getPort(), "http");
+ final RestClientBuilder builder = RestClient.builder(host);
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(service.getUsername(), service.getPassword()));
+ builder.setHttpClientConfigCallback(
+ httpClientBuilder -> {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ return httpClientBuilder;
+ });
+ restClient = builder.build();
+ client = new OpenSearchClient(new RestClientTransport(restClient, new JacksonJsonpMapper()));
+ }
+
+ @Override
+ protected void cleanupResources() throws Exception {
+ super.cleanupResources();
+ if (restClient != null) {
+ restClient.close();
+ }
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ final OpensearchComponent openSearchComponent = new OpensearchComponent();
+ openSearchComponent.setHostAddresses(String.format("%s:%d", service.getOpenSearchHost(), service.getPort()));
+ openSearchComponent.setUser(service.getUsername());
+ openSearchComponent.setPassword(service.getPassword());
+
+ CamelContext context = super.createCamelContext();
+ context.addComponent("opensearch", openSearchComponent);
+
+ return context;
+ }
+
+ /**
+ * As we don't delete the {@code target/data} folder for <b>each</b> test below (otherwise they would run much
+ * slower), we need to make sure there's no side effect of the same used data through creating unique indexes.
+ */
+ Map<String, String> createIndexedData(String... additionalPrefixes) {
+ String prefix = createPrefix();
+
+ // take over any potential prefixes we may have been asked for
+ if (additionalPrefixes.length > 0) {
+ StringBuilder sb = new StringBuilder(prefix);
+ for (String additionalPrefix : additionalPrefixes) {
+ sb.append(additionalPrefix).append("-");
+ }
+ prefix = sb.toString();
+ }
+
+ String key = prefix + "key";
+ String value = prefix + "value";
+ LOG.info("Creating indexed data using the key/value pair {} => {}", key, value);
+
+ Map<String, String> map = new HashMap<>();
+ map.put(key, value);
+ return map;
+ }
+
+ String createPrefix() {
+ // make use of the test method name to avoid collision
+ return getCurrentTestName().toLowerCase() + "-";
+ }
+
+ RestClient getClient() {
+ return restClient;
+ }
+}
diff --git a/components/camel-opensearch/src/test/resources/log4j2.properties b/components/camel-opensearch/src/test/resources/log4j2.properties
new file mode 100644
index 00000000000..b8b18be85eb
--- /dev/null
+++ b/components/camel-opensearch/src/test/resources/log4j2.properties
@@ -0,0 +1,30 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+appender.stdout.type = Console
+appender.stdout.name = stdout
+appender.stdout.layout.type = PatternLayout
+appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-opensearch-test.log
+appender.file.append = true
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
diff --git a/components/pom.xml b/components/pom.xml
index 77da6aea578..90818694d6c 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -289,6 +289,7 @@
<module>camel-stitch</module>
<module>camel-swift</module>
<module>camel-openapi-java</module>
+ <module>camel-opensearch</module>
<module>camel-optaplanner</module>
<module>camel-syslog</module>
<module>camel-tarfile</module>
diff --git a/core/camel-main/src/generated/resources/org/apache/camel/main/components.properties b/core/camel-main/src/generated/resources/org/apache/camel/main/components.properties
index b9d1583d995..34bf2e3c9fe 100644
--- a/core/camel-main/src/generated/resources/org/apache/camel/main/components.properties
+++ b/core/camel-main/src/generated/resources/org/apache/camel/main/components.properties
@@ -225,6 +225,7 @@ nitrite
oaipmh
olingo2
olingo4
+opensearch
openshift-build-configs
openshift-builds
openshift-deploymentconfigs
diff --git a/docs/components/modules/ROOT/examples/json/opensearch.json b/docs/components/modules/ROOT/examples/json/opensearch.json
new file mode 120000
index 00000000000..153639eb262
--- /dev/null
+++ b/docs/components/modules/ROOT/examples/json/opensearch.json
@@ -0,0 +1 @@
+../../../../../../components/camel-opensearch/src/generated/resources/org/apache/camel/component/opensearch/opensearch.json
\ No newline at end of file
diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc
index e0c48b7b564..61971cae2c3 100644
--- a/docs/components/modules/ROOT/nav.adoc
+++ b/docs/components/modules/ROOT/nav.adoc
@@ -233,6 +233,7 @@
** xref:oaipmh-component.adoc[OAI-PMH]
** xref:olingo2-component.adoc[Olingo2]
** xref:olingo4-component.adoc[Olingo4]
+** xref:opensearch-component.adoc[OpenSearch]
** xref:openstack-summary.adoc[OpenStack]
*** xref:openstack-cinder-component.adoc[OpenStack Cinder]
*** xref:openstack-glance-component.adoc[OpenStack Glance]
diff --git a/docs/components/modules/ROOT/pages/opensearch-component.adoc b/docs/components/modules/ROOT/pages/opensearch-component.adoc
new file mode 120000
index 00000000000..df5609abb07
--- /dev/null
+++ b/docs/components/modules/ROOT/pages/opensearch-component.adoc
@@ -0,0 +1 @@
+../../../../../components/camel-opensearch/src/main/docs/opensearch-component.adoc
\ No newline at end of file
diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/ComponentsBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/ComponentsBuilderFactory.java
index 7863ddf7e2d..9f77d5e81f6 100644
--- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/ComponentsBuilderFactory.java
+++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/ComponentsBuilderFactory.java
@@ -3060,6 +3060,19 @@ public interface ComponentsBuilderFactory {
static org.apache.camel.builder.component.dsl.Olingo4ComponentBuilderFactory.Olingo4ComponentBuilder olingo4() {
return org.apache.camel.builder.component.dsl.Olingo4ComponentBuilderFactory.olingo4();
}
+ /**
+ * OpenSearch (camel-opensearch)
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Category: search,monitoring
+ * Since: 4.0
+ * Maven coordinates: org.apache.camel:camel-opensearch
+ *
+ * @return the dsl builder
+ */
+ static org.apache.camel.builder.component.dsl.OpensearchComponentBuilderFactory.OpensearchComponentBuilder opensearch() {
+ return org.apache.camel.builder.component.dsl.OpensearchComponentBuilderFactory.opensearch();
+ }
/**
* Openshift Build Config (camel-kubernetes)
* Perform operations on OpenShift Build Configs.
diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/OpensearchComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/OpensearchComponentBuilderFactory.java
new file mode 100644
index 00000000000..191e9b301ac
--- /dev/null
+++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/OpensearchComponentBuilderFactory.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder.component.dsl;
+
+import javax.annotation.processing.Generated;
+import org.apache.camel.Component;
+import org.apache.camel.builder.component.AbstractComponentBuilder;
+import org.apache.camel.builder.component.ComponentBuilder;
+import org.apache.camel.component.opensearch.OpensearchComponent;
+
+/**
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Generated by camel-package-maven-plugin - do not edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.ComponentDslMojo")
+public interface OpensearchComponentBuilderFactory {
+
+ /**
+ * OpenSearch (camel-opensearch)
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Category: search,monitoring
+ * Since: 4.0
+ * Maven coordinates: org.apache.camel:camel-opensearch
+ *
+ * @return the dsl builder
+ */
+ static OpensearchComponentBuilder opensearch() {
+ return new OpensearchComponentBuilderImpl();
+ }
+
+ /**
+ * Builder for the OpenSearch component.
+ */
+ interface OpensearchComponentBuilder
+ extends
+ ComponentBuilder<OpensearchComponent> {
+ /**
+ * The time in ms to wait before connection will time out.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param connectionTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder connectionTimeout(
+ int connectionTimeout) {
+ doSetProperty("connectionTimeout", connectionTimeout);
+ return this;
+ }
+ /**
+ * Comma separated list with ip:port formatted remote transport
+ * addresses to use. The ip and port options must be left blank for
+ * hostAddresses to be considered instead.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: producer
+ *
+ * @param hostAddresses the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder hostAddresses(
+ java.lang.String hostAddresses) {
+ doSetProperty("hostAddresses", hostAddresses);
+ return this;
+ }
+ /**
+ * Whether the producer should be started lazy (on the first message).
+ * By starting lazy you can use this to allow CamelContext and routes to
+ * startup in situations where a producer may otherwise fail during
+ * starting and cause the route to fail being started. By deferring this
+ * startup to be lazy then the startup failure can be handled during
+ * routing messages via Camel's routing error handlers. Beware that when
+ * the first message is processed then creating and starting the
+ * producer may take a little time and prolong the total processing time
+ * of the processing.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param lazyStartProducer the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder lazyStartProducer(
+ boolean lazyStartProducer) {
+ doSetProperty("lazyStartProducer", lazyStartProducer);
+ return this;
+ }
+ /**
+ * The time in ms before retry.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param maxRetryTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder maxRetryTimeout(int maxRetryTimeout) {
+ doSetProperty("maxRetryTimeout", maxRetryTimeout);
+ return this;
+ }
+ /**
+ * The timeout in ms to wait before the socket will time out.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param socketTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder socketTimeout(int socketTimeout) {
+ doSetProperty("socketTimeout", socketTimeout);
+ return this;
+ }
+ /**
+ * Whether autowiring is enabled. This is used for automatic autowiring
+ * options (the option must be marked as autowired) by looking up in the
+ * registry to find if there is a single instance of matching type,
+ * which then gets configured on the component. This can be used for
+ * automatic configuring JDBC data sources, JMS connection factories,
+ * AWS Clients, etc.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: advanced
+ *
+ * @param autowiredEnabled the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder autowiredEnabled(
+ boolean autowiredEnabled) {
+ doSetProperty("autowiredEnabled", autowiredEnabled);
+ return this;
+ }
+ /**
+ * To use an existing configured OpenSearch client, instead of creating
+ * a client per endpoint. This allows to customize the client with
+ * specific settings.
+ *
+ * The option is a:
+ * <code>org.opensearch.client.RestClient</code> type.
+ *
+ * Group: advanced
+ *
+ * @param client the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder client(
+ org.opensearch.client.RestClient client) {
+ doSetProperty("client", client);
+ return this;
+ }
+ /**
+ * Enable automatically discover nodes from a running OpenSearch
+ * cluster. If this option is used in conjunction with Spring Boot then
+ * it's managed by the Spring Boot configuration (see: Disable Sniffer
+ * in Spring Boot).
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param enableSniffer the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder enableSniffer(boolean enableSniffer) {
+ doSetProperty("enableSniffer", enableSniffer);
+ return this;
+ }
+ /**
+ * The delay of a sniff execution scheduled after a failure (in
+ * milliseconds).
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 60000
+ * Group: advanced
+ *
+ * @param sniffAfterFailureDelay the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder sniffAfterFailureDelay(
+ int sniffAfterFailureDelay) {
+ doSetProperty("sniffAfterFailureDelay", sniffAfterFailureDelay);
+ return this;
+ }
+ /**
+ * The interval between consecutive ordinary sniff executions in
+ * milliseconds. Will be honoured when sniffOnFailure is disabled or
+ * when there are no failures between consecutive sniff executions.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 300000
+ * Group: advanced
+ *
+ * @param snifferInterval the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder snifferInterval(int snifferInterval) {
+ doSetProperty("snifferInterval", snifferInterval);
+ return this;
+ }
+ /**
+ * Enable SSL.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: security
+ *
+ * @param enableSSL the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder enableSSL(boolean enableSSL) {
+ doSetProperty("enableSSL", enableSSL);
+ return this;
+ }
+ /**
+ * Password for authenticate.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: security
+ *
+ * @param password the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder password(java.lang.String password) {
+ doSetProperty("password", password);
+ return this;
+ }
+ /**
+ * Basic authenticate user.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: security
+ *
+ * @param user the value to set
+ * @return the dsl builder
+ */
+ default OpensearchComponentBuilder user(java.lang.String user) {
+ doSetProperty("user", user);
+ return this;
+ }
+ }
+
+ class OpensearchComponentBuilderImpl
+ extends
+ AbstractComponentBuilder<OpensearchComponent>
+ implements
+ OpensearchComponentBuilder {
+ @Override
+ protected OpensearchComponent buildConcreteComponent() {
+ return new OpensearchComponent();
+ }
+ @Override
+ protected boolean setPropertyOnComponent(
+ Component component,
+ String name,
+ Object value) {
+ switch (name) {
+ case "connectionTimeout": ((OpensearchComponent) component).setConnectionTimeout((int) value); return true;
+ case "hostAddresses": ((OpensearchComponent) component).setHostAddresses((java.lang.String) value); return true;
+ case "lazyStartProducer": ((OpensearchComponent) component).setLazyStartProducer((boolean) value); return true;
+ case "maxRetryTimeout": ((OpensearchComponent) component).setMaxRetryTimeout((int) value); return true;
+ case "socketTimeout": ((OpensearchComponent) component).setSocketTimeout((int) value); return true;
+ case "autowiredEnabled": ((OpensearchComponent) component).setAutowiredEnabled((boolean) value); return true;
+ case "client": ((OpensearchComponent) component).setClient((org.opensearch.client.RestClient) value); return true;
+ case "enableSniffer": ((OpensearchComponent) component).setEnableSniffer((boolean) value); return true;
+ case "sniffAfterFailureDelay": ((OpensearchComponent) component).setSniffAfterFailureDelay((int) value); return true;
+ case "snifferInterval": ((OpensearchComponent) component).setSnifferInterval((int) value); return true;
+ case "enableSSL": ((OpensearchComponent) component).setEnableSSL((boolean) value); return true;
+ case "password": ((OpensearchComponent) component).setPassword((java.lang.String) value); return true;
+ case "user": ((OpensearchComponent) component).setUser((java.lang.String) value); return true;
+ default: return false;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/dsl/camel-componentdsl/src/generated/resources/metadata.json b/dsl/camel-componentdsl/src/generated/resources/metadata.json
index 6fd3db32d5a..33cd1e81d9c 100644
--- a/dsl/camel-componentdsl/src/generated/resources/metadata.json
+++ b/dsl/camel-componentdsl/src/generated/resources/metadata.json
@@ -5034,6 +5034,28 @@
"producerOnly": false,
"lenientProperties": false
},
+ "OpensearchComponentBuilderFactory": {
+ "kind": "component",
+ "name": "opensearch",
+ "title": "OpenSearch",
+ "description": "Send requests to OpenSearch via Java Client API.",
+ "deprecated": false,
+ "firstVersion": "4.0.0",
+ "label": "search,monitoring",
+ "javaType": "org.apache.camel.component.opensearch.OpensearchComponent",
+ "supportLevel": "Preview",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-opensearch",
+ "version": "4.0.0-SNAPSHOT",
+ "scheme": "opensearch",
+ "extendsScheme": "",
+ "syntax": "opensearch:clusterName",
+ "async": false,
+ "api": false,
+ "consumerOnly": false,
+ "producerOnly": true,
+ "lenientProperties": false
+ },
"OpenshiftBuildConfigsComponentBuilderFactory": {
"kind": "component",
"name": "openshift-build-configs",
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java
index 9c9c1c60303..d876c99c7d1 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilderFactory.java
@@ -244,6 +244,7 @@ public interface EndpointBuilderFactory
org.apache.camel.builder.endpoint.dsl.OBSEndpointBuilderFactory.OBSBuilders,
org.apache.camel.builder.endpoint.dsl.Olingo2EndpointBuilderFactory.Olingo2Builders,
org.apache.camel.builder.endpoint.dsl.Olingo4EndpointBuilderFactory.Olingo4Builders,
+ org.apache.camel.builder.endpoint.dsl.OpensearchEndpointBuilderFactory.OpensearchBuilders,
org.apache.camel.builder.endpoint.dsl.OpenshiftBuildConfigsEndpointBuilderFactory.OpenshiftBuildConfigsBuilders,
org.apache.camel.builder.endpoint.dsl.OpenshiftBuildsEndpointBuilderFactory.OpenshiftBuildsBuilders,
org.apache.camel.builder.endpoint.dsl.OpenshiftDeploymentConfigsEndpointBuilderFactory.OpenshiftDeploymentConfigsBuilders,
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilders.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilders.java
index 67f73172ea7..7ef7626a1dc 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilders.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/EndpointBuilders.java
@@ -241,6 +241,7 @@ public interface EndpointBuilders
org.apache.camel.builder.endpoint.dsl.OBSEndpointBuilderFactory,
org.apache.camel.builder.endpoint.dsl.Olingo2EndpointBuilderFactory,
org.apache.camel.builder.endpoint.dsl.Olingo4EndpointBuilderFactory,
+ org.apache.camel.builder.endpoint.dsl.OpensearchEndpointBuilderFactory,
org.apache.camel.builder.endpoint.dsl.OpenshiftBuildConfigsEndpointBuilderFactory,
org.apache.camel.builder.endpoint.dsl.OpenshiftBuildsEndpointBuilderFactory,
org.apache.camel.builder.endpoint.dsl.OpenshiftDeploymentConfigsEndpointBuilderFactory,
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/StaticEndpointBuilders.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/StaticEndpointBuilders.java
index c8394c3e42f..1684baf2b5b 100644
--- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/StaticEndpointBuilders.java
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/StaticEndpointBuilders.java
@@ -10621,6 +10621,49 @@ public class StaticEndpointBuilders {
String path) {
return org.apache.camel.builder.endpoint.dsl.Olingo4EndpointBuilderFactory.endpointBuilder(componentName, path);
}
+ /**
+ * OpenSearch (camel-opensearch)
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Category: search,monitoring
+ * Since: 4.0
+ * Maven coordinates: org.apache.camel:camel-opensearch
+ *
+ * Syntax: <code>opensearch:clusterName</code>
+ *
+ * Path parameter: clusterName (required)
+ * Name of the cluster
+ *
+ * @param path clusterName
+ * @return the dsl builder
+ */
+ public static org.apache.camel.builder.endpoint.dsl.OpensearchEndpointBuilderFactory.OpensearchEndpointBuilder opensearch(
+ String path) {
+ return org.apache.camel.builder.endpoint.dsl.OpensearchEndpointBuilderFactory.endpointBuilder("opensearch", path);
+ }
+ /**
+ * OpenSearch (camel-opensearch)
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Category: search,monitoring
+ * Since: 4.0
+ * Maven coordinates: org.apache.camel:camel-opensearch
+ *
+ * Syntax: <code>opensearch:clusterName</code>
+ *
+ * Path parameter: clusterName (required)
+ * Name of the cluster
+ *
+ * @param componentName to use a custom component name for the endpoint
+ * instead of the default name
+ * @param path clusterName
+ * @return the dsl builder
+ */
+ public static org.apache.camel.builder.endpoint.dsl.OpensearchEndpointBuilderFactory.OpensearchEndpointBuilder opensearch(
+ String componentName,
+ String path) {
+ return org.apache.camel.builder.endpoint.dsl.OpensearchEndpointBuilderFactory.endpointBuilder(componentName, path);
+ }
/**
* Openshift Build Config (camel-kubernetes)
* Perform operations on OpenShift Build Configs.
diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/OpensearchEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/OpensearchEndpointBuilderFactory.java
new file mode 100644
index 00000000000..1688bc6ea63
--- /dev/null
+++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/OpensearchEndpointBuilderFactory.java
@@ -0,0 +1,834 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder.endpoint.dsl;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.*;
+import java.util.stream.*;
+import javax.annotation.processing.Generated;
+import org.apache.camel.builder.EndpointConsumerBuilder;
+import org.apache.camel.builder.EndpointProducerBuilder;
+import org.apache.camel.builder.endpoint.AbstractEndpointBuilder;
+
+/**
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.EndpointDslMojo")
+public interface OpensearchEndpointBuilderFactory {
+
+
+ /**
+ * Builder for endpoint for the OpenSearch component.
+ */
+ public interface OpensearchEndpointBuilder
+ extends
+ EndpointProducerBuilder {
+ default AdvancedOpensearchEndpointBuilder advanced() {
+ return (AdvancedOpensearchEndpointBuilder) this;
+ }
+ /**
+ * The time in ms to wait before connection will timeout.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param connectionTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder connectionTimeout(
+ int connectionTimeout) {
+ doSetProperty("connectionTimeout", connectionTimeout);
+ return this;
+ }
+ /**
+ * The time in ms to wait before connection will timeout.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param connectionTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder connectionTimeout(
+ String connectionTimeout) {
+ doSetProperty("connectionTimeout", connectionTimeout);
+ return this;
+ }
+ /**
+ * Disconnect after it finish calling the producer.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param disconnect the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder disconnect(boolean disconnect) {
+ doSetProperty("disconnect", disconnect);
+ return this;
+ }
+ /**
+ * Disconnect after it finish calling the producer.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param disconnect the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder disconnect(String disconnect) {
+ doSetProperty("disconnect", disconnect);
+ return this;
+ }
+ /**
+ * Starting index of the response.
+ *
+ * The option is a: <code>java.lang.Integer</code> type.
+ *
+ * Group: producer
+ *
+ * @param from the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder from(Integer from) {
+ doSetProperty("from", from);
+ return this;
+ }
+ /**
+ * Starting index of the response.
+ *
+ * The option will be converted to a
+ * <code>java.lang.Integer</code> type.
+ *
+ * Group: producer
+ *
+ * @param from the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder from(String from) {
+ doSetProperty("from", from);
+ return this;
+ }
+ /**
+ * Comma separated list with ip:port formatted remote transport
+ * addresses to use.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: producer
+ *
+ * @param hostAddresses the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder hostAddresses(String hostAddresses) {
+ doSetProperty("hostAddresses", hostAddresses);
+ return this;
+ }
+ /**
+ * The name of the index to act against.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: producer
+ *
+ * @param indexName the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder indexName(String indexName) {
+ doSetProperty("indexName", indexName);
+ return this;
+ }
+ /**
+ * The time in ms before retry.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param maxRetryTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder maxRetryTimeout(int maxRetryTimeout) {
+ doSetProperty("maxRetryTimeout", maxRetryTimeout);
+ return this;
+ }
+ /**
+ * The time in ms before retry.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param maxRetryTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder maxRetryTimeout(String maxRetryTimeout) {
+ doSetProperty("maxRetryTimeout", maxRetryTimeout);
+ return this;
+ }
+ /**
+ * What operation to perform.
+ *
+ * The option is a:
+ * <code>org.apache.camel.component.opensearch.OpensearchOperation</code> type.
+ *
+ * Group: producer
+ *
+ * @param operation the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder operation(
+ org.apache.camel.component.opensearch.OpensearchOperation operation) {
+ doSetProperty("operation", operation);
+ return this;
+ }
+ /**
+ * What operation to perform.
+ *
+ * The option will be converted to a
+ * <code>org.apache.camel.component.opensearch.OpensearchOperation</code> type.
+ *
+ * Group: producer
+ *
+ * @param operation the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder operation(String operation) {
+ doSetProperty("operation", operation);
+ return this;
+ }
+ /**
+ * Time in ms during which OpenSearch will keep search context alive.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 60000
+ * Group: producer
+ *
+ * @param scrollKeepAliveMs the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder scrollKeepAliveMs(
+ int scrollKeepAliveMs) {
+ doSetProperty("scrollKeepAliveMs", scrollKeepAliveMs);
+ return this;
+ }
+ /**
+ * Time in ms during which OpenSearch will keep search context alive.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 60000
+ * Group: producer
+ *
+ * @param scrollKeepAliveMs the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder scrollKeepAliveMs(
+ String scrollKeepAliveMs) {
+ doSetProperty("scrollKeepAliveMs", scrollKeepAliveMs);
+ return this;
+ }
+ /**
+ * Size of the response.
+ *
+ * The option is a: <code>java.lang.Integer</code> type.
+ *
+ * Group: producer
+ *
+ * @param size the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder size(Integer size) {
+ doSetProperty("size", size);
+ return this;
+ }
+ /**
+ * Size of the response.
+ *
+ * The option will be converted to a
+ * <code>java.lang.Integer</code> type.
+ *
+ * Group: producer
+ *
+ * @param size the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder size(String size) {
+ doSetProperty("size", size);
+ return this;
+ }
+ /**
+ * The timeout in ms to wait before the socket will timeout.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param socketTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder socketTimeout(int socketTimeout) {
+ doSetProperty("socketTimeout", socketTimeout);
+ return this;
+ }
+ /**
+ * The timeout in ms to wait before the socket will timeout.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 30000
+ * Group: producer
+ *
+ * @param socketTimeout the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder socketTimeout(String socketTimeout) {
+ doSetProperty("socketTimeout", socketTimeout);
+ return this;
+ }
+ /**
+ * Enable scroll usage.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param useScroll the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder useScroll(boolean useScroll) {
+ doSetProperty("useScroll", useScroll);
+ return this;
+ }
+ /**
+ * Enable scroll usage.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param useScroll the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder useScroll(String useScroll) {
+ doSetProperty("useScroll", useScroll);
+ return this;
+ }
+ /**
+ * Index creation waits for the write consistency number of shards to be
+ * available.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 1
+ * Group: producer
+ *
+ * @param waitForActiveShards the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder waitForActiveShards(
+ int waitForActiveShards) {
+ doSetProperty("waitForActiveShards", waitForActiveShards);
+ return this;
+ }
+ /**
+ * Index creation waits for the write consistency number of shards to be
+ * available.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 1
+ * Group: producer
+ *
+ * @param waitForActiveShards the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder waitForActiveShards(
+ String waitForActiveShards) {
+ doSetProperty("waitForActiveShards", waitForActiveShards);
+ return this;
+ }
+ /**
+ * The certificate that can be used to access the ES Cluster. It can be
+ * loaded by default from classpath, but you can prefix with classpath:,
+ * file:, or http: to load the resource from different systems.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: security
+ *
+ * @param certificatePath the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder certificatePath(String certificatePath) {
+ doSetProperty("certificatePath", certificatePath);
+ return this;
+ }
+ /**
+ * Enable SSL.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: security
+ *
+ * @param enableSSL the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder enableSSL(boolean enableSSL) {
+ doSetProperty("enableSSL", enableSSL);
+ return this;
+ }
+ /**
+ * Enable SSL.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: security
+ *
+ * @param enableSSL the value to set
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder enableSSL(String enableSSL) {
+ doSetProperty("enableSSL", enableSSL);
+ return this;
+ }
+ }
+
+ /**
+ * Advanced builder for endpoint for the OpenSearch component.
+ */
+ public interface AdvancedOpensearchEndpointBuilder
+ extends
+ EndpointProducerBuilder {
+ default OpensearchEndpointBuilder basic() {
+ return (OpensearchEndpointBuilder) this;
+ }
+ /**
+ * Whether the producer should be started lazy (on the first message).
+ * By starting lazy you can use this to allow CamelContext and routes to
+ * startup in situations where a producer may otherwise fail during
+ * starting and cause the route to fail being started. By deferring this
+ * startup to be lazy then the startup failure can be handled during
+ * routing messages via Camel's routing error handlers. Beware that when
+ * the first message is processed then creating and starting the
+ * producer may take a little time and prolong the total processing time
+ * of the processing.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: producer (advanced)
+ *
+ * @param lazyStartProducer the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder lazyStartProducer(
+ boolean lazyStartProducer) {
+ doSetProperty("lazyStartProducer", lazyStartProducer);
+ return this;
+ }
+ /**
+ * Whether the producer should be started lazy (on the first message).
+ * By starting lazy you can use this to allow CamelContext and routes to
+ * startup in situations where a producer may otherwise fail during
+ * starting and cause the route to fail being started. By deferring this
+ * startup to be lazy then the startup failure can be handled during
+ * routing messages via Camel's routing error handlers. Beware that when
+ * the first message is processed then creating and starting the
+ * producer may take a little time and prolong the total processing time
+ * of the processing.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: producer (advanced)
+ *
+ * @param lazyStartProducer the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder lazyStartProducer(
+ String lazyStartProducer) {
+ doSetProperty("lazyStartProducer", lazyStartProducer);
+ return this;
+ }
+ /**
+ * The class to use when deserializing the documents.
+ *
+ * The option is a:
+ * <code>java.lang.Class&lt;java.lang.Object&gt;</code> type.
+ *
+ * Default: ObjectNode
+ * Group: advanced
+ *
+ * @param documentClass the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder documentClass(
+ Class<java.lang.Object> documentClass) {
+ doSetProperty("documentClass", documentClass);
+ return this;
+ }
+ /**
+ * The class to use when deserializing the documents.
+ *
+ * The option will be converted to a
+ * <code>java.lang.Class&lt;java.lang.Object&gt;</code> type.
+ *
+ * Default: ObjectNode
+ * Group: advanced
+ *
+ * @param documentClass the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder documentClass(
+ String documentClass) {
+ doSetProperty("documentClass", documentClass);
+ return this;
+ }
+ /**
+ * Enable automatically discover nodes from a running OpenSearch
+ * cluster. If this option is used in conjunction with Spring Boot then
+ * it's managed by the Spring Boot configuration (see: Disable Sniffer
+ * in Spring Boot).
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param enableSniffer the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder enableSniffer(
+ boolean enableSniffer) {
+ doSetProperty("enableSniffer", enableSniffer);
+ return this;
+ }
+ /**
+ * Enable automatically discover nodes from a running OpenSearch
+ * cluster. If this option is used in conjunction with Spring Boot then
+ * it's managed by the Spring Boot configuration (see: Disable Sniffer
+ * in Spring Boot).
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: advanced
+ *
+ * @param enableSniffer the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder enableSniffer(
+ String enableSniffer) {
+ doSetProperty("enableSniffer", enableSniffer);
+ return this;
+ }
+ /**
+ * The delay of a sniff execution scheduled after a failure (in
+ * milliseconds).
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 60000
+ * Group: advanced
+ *
+ * @param sniffAfterFailureDelay the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder sniffAfterFailureDelay(
+ int sniffAfterFailureDelay) {
+ doSetProperty("sniffAfterFailureDelay", sniffAfterFailureDelay);
+ return this;
+ }
+ /**
+ * The delay of a sniff execution scheduled after a failure (in
+ * milliseconds).
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 60000
+ * Group: advanced
+ *
+ * @param sniffAfterFailureDelay the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder sniffAfterFailureDelay(
+ String sniffAfterFailureDelay) {
+ doSetProperty("sniffAfterFailureDelay", sniffAfterFailureDelay);
+ return this;
+ }
+ /**
+ * The interval between consecutive ordinary sniff executions in
+ * milliseconds. Will be honoured when sniffOnFailure is disabled or
+ * when there are no failures between consecutive sniff executions.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 300000
+ * Group: advanced
+ *
+ * @param snifferInterval the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder snifferInterval(
+ int snifferInterval) {
+ doSetProperty("snifferInterval", snifferInterval);
+ return this;
+ }
+ /**
+ * The interval between consecutive ordinary sniff executions in
+ * milliseconds. Will be honoured when sniffOnFailure is disabled or
+ * when there are no failures between consecutive sniff executions.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 300000
+ * Group: advanced
+ *
+ * @param snifferInterval the value to set
+ * @return the dsl builder
+ */
+ default AdvancedOpensearchEndpointBuilder snifferInterval(
+ String snifferInterval) {
+ doSetProperty("snifferInterval", snifferInterval);
+ return this;
+ }
+ }
+
+ public interface OpensearchBuilders {
+ /**
+ * OpenSearch (camel-opensearch)
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Category: search,monitoring
+ * Since: 4.0
+ * Maven coordinates: org.apache.camel:camel-opensearch
+ *
+ * @return the dsl builder for the headers' name.
+ */
+ default OpensearchHeaderNameBuilder opensearch() {
+ return OpensearchHeaderNameBuilder.INSTANCE;
+ }
+ /**
+ * OpenSearch (camel-opensearch)
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Category: search,monitoring
+ * Since: 4.0
+ * Maven coordinates: org.apache.camel:camel-opensearch
+ *
+ * Syntax: <code>opensearch:clusterName</code>
+ *
+ * Path parameter: clusterName (required)
+ * Name of the cluster
+ *
+ * @param path clusterName
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder opensearch(String path) {
+ return OpensearchEndpointBuilderFactory.endpointBuilder("opensearch", path);
+ }
+ /**
+ * OpenSearch (camel-opensearch)
+ * Send requests to OpenSearch via Java Client API.
+ *
+ * Category: search,monitoring
+ * Since: 4.0
+ * Maven coordinates: org.apache.camel:camel-opensearch
+ *
+ * Syntax: <code>opensearch:clusterName</code>
+ *
+ * Path parameter: clusterName (required)
+ * Name of the cluster
+ *
+ * @param componentName to use a custom component name for the endpoint
+ * instead of the default name
+ * @param path clusterName
+ * @return the dsl builder
+ */
+ default OpensearchEndpointBuilder opensearch(
+ String componentName,
+ String path) {
+ return OpensearchEndpointBuilderFactory.endpointBuilder(componentName, path);
+ }
+ }
+
+ /**
+ * The builder of headers' name for the OpenSearch component.
+ */
+ public static class OpensearchHeaderNameBuilder {
+ /**
+ * The internal instance of the builder used to access to all the
+ * methods representing the name of headers.
+ */
+ private static final OpensearchHeaderNameBuilder INSTANCE = new OpensearchHeaderNameBuilder();
+
+ /**
+ * The operation to perform.
+ *
+ * The option is a: {@code
+ * org.apache.camel.component.opensearch.OpensearchOperation} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code operation}.
+ */
+ public String operation() {
+ return "operation";
+ }
+
+ /**
+ * The id of the indexed document.
+ *
+ * The option is a: {@code String} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code indexId}.
+ */
+ public String indexId() {
+ return "indexId";
+ }
+
+ /**
+ * The name of the index to act against.
+ *
+ * The option is a: {@code String} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code indexName}.
+ */
+ public String indexName() {
+ return "indexName";
+ }
+
+ /**
+ * The full qualified name of the class of the document to unmarshall.
+ *
+ * The option is a: {@code Class} type.
+ *
+ * Default: ObjectNode
+ * Group: producer
+ *
+ * @return the name of the header {@code documentClass}.
+ */
+ public String documentClass() {
+ return "documentClass";
+ }
+
+ /**
+ * The index creation waits for the write consistency number of shards
+ * to be available.
+ *
+ * The option is a: {@code Integer} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code waitForActiveShards}.
+ */
+ public String waitForActiveShards() {
+ return "waitForActiveShards";
+ }
+
+ /**
+ * The starting index of the response.
+ *
+ * The option is a: {@code Integer} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code scrollKeepAliveMs}.
+ */
+ public String scrollKeepAliveMs() {
+ return "scrollKeepAliveMs";
+ }
+
+ /**
+ * Set to true to enable scroll usage.
+ *
+ * The option is a: {@code Boolean} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code useScroll}.
+ */
+ public String useScroll() {
+ return "useScroll";
+ }
+
+ /**
+ * The size of the response.
+ *
+ * The option is a: {@code Integer} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code size}.
+ */
+ public String size() {
+ return "size";
+ }
+
+ /**
+ * The starting index of the response.
+ *
+ * The option is a: {@code Integer} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code from}.
+ */
+ public String from() {
+ return "from";
+ }
+ }
+ static OpensearchEndpointBuilder endpointBuilder(
+ String componentName,
+ String path) {
+ class OpensearchEndpointBuilderImpl extends AbstractEndpointBuilder implements OpensearchEndpointBuilder, AdvancedOpensearchEndpointBuilder {
+ public OpensearchEndpointBuilderImpl(String path) {
+ super(componentName, path);
+ }
+ }
+ return new OpensearchEndpointBuilderImpl(path);
+ }
+}
\ No newline at end of file
diff --git a/dsl/camel-kamelet-main/src/generated/resources/camel-component-known-dependencies.properties b/dsl/camel-kamelet-main/src/generated/resources/camel-component-known-dependencies.properties
index 1fc81ed91e2..19b43d4f8c9 100644
--- a/dsl/camel-kamelet-main/src/generated/resources/camel-component-known-dependencies.properties
+++ b/dsl/camel-kamelet-main/src/generated/resources/camel-component-known-dependencies.properties
@@ -236,6 +236,7 @@ org.apache.camel.component.netty.http.NettyHttpComponent=camel:netty-http
org.apache.camel.component.nitrite.NitriteComponent=camel:nitrite
org.apache.camel.component.olingo2.Olingo2Component=camel:olingo2
org.apache.camel.component.olingo4.Olingo4Component=camel:olingo4
+org.apache.camel.component.opensearch.OpensearchComponent=camel:opensearch
org.apache.camel.component.openshift.build_configs.OpenshiftBuildConfigsComponent=camel:kubernetes
org.apache.camel.component.openshift.builds.OpenshiftBuildsComponent=camel:kubernetes
org.apache.camel.component.openshift.deploymentconfigs.OpenshiftDeploymentConfigsComponent=camel:kubernetes
diff --git a/parent/pom.xml b/parent/pom.xml
index 1ea4cfccb31..9c9bb31ea04 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -380,6 +380,10 @@
<ognl-version>3.3.4</ognl-version>
<openapi-generator>6.2.1</openapi-generator>
<openjpa-version>3.2.2</openjpa-version>
+ <opensearch-rest-client-version>2.8.0</opensearch-rest-client-version>
+ <opensearch-java-client-version>2.5.0</opensearch-java-client-version>
+ <opensearch-version>2.8.0</opensearch-version>
+ <opensearch-testcontainers-version>2.0.0</opensearch-testcontainers-version>
<openstack4j-version>3.10</openstack4j-version>
<opentelemetry-version>1.26.0</opentelemetry-version>
<opentelemetry-alpha-version>${opentelemetry-version}-alpha</opentelemetry-alpha-version>
@@ -1877,6 +1881,11 @@
<artifactId>camel-openapi-java</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-opensearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-openstack</artifactId>
diff --git a/test-infra/camel-test-infra-common/src/test/java/org/apache/camel/test/infra/common/services/SimpleTestServiceBuilder.java b/test-infra/camel-test-infra-common/src/test/java/org/apache/camel/test/infra/common/services/SimpleTestServiceBuilder.java
index a7fa2f3085e..284b7e8ac29 100644
--- a/test-infra/camel-test-infra-common/src/test/java/org/apache/camel/test/infra/common/services/SimpleTestServiceBuilder.java
+++ b/test-infra/camel-test-infra-common/src/test/java/org/apache/camel/test/infra/common/services/SimpleTestServiceBuilder.java
@@ -20,7 +20,6 @@ package org.apache.camel.test.infra.common.services;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +67,7 @@ public class SimpleTestServiceBuilder<T extends TestService> implements TestServ
Supplier<T> supplier = mappings.get(instanceType);
if (supplier == null) {
- String valid = mappings.keySet().stream().collect(Collectors.joining(", "));
+ String valid = String.join(", ", mappings.keySet());
LOG.error("Invalid instance type: {}. Must one of: {}", instanceType, valid);
throw new UnsupportedOperationException("Invalid instance type: " + instanceType);
diff --git a/test-infra/camel-test-infra-opensearch/pom.xml b/test-infra/camel-test-infra-opensearch/pom.xml
new file mode 100644
index 00000000000..78844531480
--- /dev/null
+++ b/test-infra/camel-test-infra-opensearch/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>camel-test-infra-parent</artifactId>
+ <groupId>org.apache.camel</groupId>
+ <relativePath>../camel-test-infra-parent/pom.xml</relativePath>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>camel-test-infra-opensearch</artifactId>
+ <name>Camel :: Test Infra :: opensearch</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers-version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.opensearch</groupId>
+ <artifactId>opensearch-testcontainers</artifactId>
+ <version>${opensearch-testcontainers-version}</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
diff --git a/test-infra/camel-test-infra-opensearch/src/main/resources/META-INF/MANIFEST.MF b/test-infra/camel-test-infra-opensearch/src/main/resources/META-INF/MANIFEST.MF
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/common/OpenSearchProperties.java b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/common/OpenSearchProperties.java
new file mode 100644
index 00000000000..03bca15e2ad
--- /dev/null
+++ b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/common/OpenSearchProperties.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.opensearch.common;
+
+import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil;
+
+public final class OpenSearchProperties {
+ public static final String OPEN_SEARCH_HOST = "opensearch.host";
+ public static final String OPEN_SEARCH_PORT = "opensearch.port";
+ public static final String OPEN_SEARCH_USERNAME = "opensearch.username";
+ public static final String OPEN_SEARCH_PASSWORD = "opensearch.password";
+ public static final String OPEN_SEARCH_CONTAINER = "opensearch.container";
+ public static final String OPEN_SEARCH_CONTAINER_STARTUP
+ = OPEN_SEARCH_CONTAINER + ContainerEnvironmentUtil.STARTUP_ATTEMPTS_PROPERTY;
+
+ private OpenSearchProperties() {
+
+ }
+}
diff --git a/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchLocalContainerService.java b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchLocalContainerService.java
new file mode 100644
index 00000000000..48c3bc56a49
--- /dev/null
+++ b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchLocalContainerService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.opensearch.services;
+
+import java.time.Duration;
+
+import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil;
+import org.apache.camel.test.infra.common.services.ContainerService;
+import org.apache.camel.test.infra.opensearch.common.OpenSearchProperties;
+import org.opensearch.testcontainers.OpensearchContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+
+public class OpenSearchLocalContainerService implements OpenSearchService, ContainerService<OpensearchContainer> {
+ public static final String DEFAULT_OPEN_SEARCH_CONTAINER = "opensearchproject/opensearch:2.8.0";
+ private static final Logger LOG = LoggerFactory.getLogger(OpenSearchLocalContainerService.class);
+ private static final int OPEN_SEARCH_PORT = 9200;
+ private static final String USER_NAME = "admin";
+ private static final String PASSWORD = "admin";
+ private final OpensearchContainer container;
+
+ public OpenSearchLocalContainerService() {
+ this(System.getProperty(OpenSearchProperties.OPEN_SEARCH_CONTAINER, DEFAULT_OPEN_SEARCH_CONTAINER));
+ }
+
+ public OpenSearchLocalContainerService(String imageName) {
+ container = initContainer(imageName);
+ }
+
+ public OpenSearchLocalContainerService(OpensearchContainer container) {
+ this.container = container;
+ }
+
+ protected OpensearchContainer initContainer(String imageName) {
+ OpensearchContainer opensearchContainer = new OpensearchContainer(imageName);
+ // Increase the timeout from 60 seconds to 90 seconds to ensure that it will be long enough
+ // on the build pipeline
+ opensearchContainer.setWaitStrategy(
+ new LogMessageWaitStrategy()
+ .withRegEx(".*(\"message\":\\s?\"started[\\s?|\"].*|] started\n$)")
+ .withStartupTimeout(Duration.ofSeconds(90)));
+
+ opensearchContainer.withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ return opensearchContainer;
+
+ }
+
+ @Override
+ public int getPort() {
+ return container.getMappedPort(OPEN_SEARCH_PORT);
+ }
+
+ @Override
+ public String getOpenSearchHost() {
+ return container.getHost();
+ }
+
+ @Override
+ public String getHttpHostAddress() {
+ return container.getHttpHostAddress();
+ }
+
+ @Override
+ public void registerProperties() {
+ System.setProperty(OpenSearchProperties.OPEN_SEARCH_HOST, getOpenSearchHost());
+ System.setProperty(OpenSearchProperties.OPEN_SEARCH_PORT, String.valueOf(getPort()));
+ }
+
+ @Override
+ public void initialize() {
+ LOG.info("Trying to start the OpenSearch container");
+ ContainerEnvironmentUtil.configureContainerStartup(container, OpenSearchProperties.OPEN_SEARCH_CONTAINER_STARTUP,
+ 2);
+
+ container.start();
+
+ registerProperties();
+ LOG.info("OpenSearch instance running at {}", getHttpHostAddress());
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Stopping the OpenSearch container");
+ container.stop();
+ }
+
+ @Override
+ public OpensearchContainer getContainer() {
+ return container;
+ }
+
+ @Override
+ public String getUsername() {
+ return USER_NAME;
+ }
+
+ @Override
+ public String getPassword() {
+ return PASSWORD;
+ }
+}
diff --git a/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchService.java b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchService.java
new file mode 100644
index 00000000000..a3c4066c82b
--- /dev/null
+++ b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.opensearch.services;
+
+import org.apache.camel.test.infra.common.services.TestService;
+
+public interface OpenSearchService extends TestService {
+
+ int getPort();
+
+ String getOpenSearchHost();
+
+ default String getHttpHostAddress() {
+ return String.format("%s:%d", getOpenSearchHost(), getPort());
+ }
+
+ String getUsername();
+
+ String getPassword();
+}
diff --git a/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchServiceFactory.java b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchServiceFactory.java
new file mode 100644
index 00000000000..6ff2280c4c8
--- /dev/null
+++ b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/OpenSearchServiceFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.opensearch.services;
+
+import org.apache.camel.test.infra.common.services.SimpleTestServiceBuilder;
+import org.apache.camel.test.infra.common.services.SingletonService;
+
+public final class OpenSearchServiceFactory {
+
+ static class SingletonOpenSearchService extends SingletonService<OpenSearchService> implements OpenSearchService {
+ public SingletonOpenSearchService(OpenSearchService service, String name) {
+ super(service, name);
+ }
+
+ @Override
+ public int getPort() {
+ return getService().getPort();
+ }
+
+ public String getOpenSearchHost() {
+ return getService().getOpenSearchHost();
+ }
+
+ @Override
+ public String getHttpHostAddress() {
+ return getService().getHttpHostAddress();
+ }
+
+ @Override
+ public String getUsername() {
+ return getService().getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return getService().getPassword();
+ }
+ }
+
+ private OpenSearchServiceFactory() {
+
+ }
+
+ public static SimpleTestServiceBuilder<OpenSearchService> builder() {
+ return new SimpleTestServiceBuilder<>("opensearch");
+ }
+
+ public static OpenSearchService createService() {
+ return builder()
+ .addLocalMapping(OpenSearchLocalContainerService::new)
+ .addRemoteMapping(RemoteOpenSearchService::new)
+ .build();
+ }
+
+ public static OpenSearchService createSingletonService() {
+ return SingletonServiceHolder.INSTANCE;
+ }
+
+ private static class SingletonServiceHolder {
+ static final OpenSearchService INSTANCE;
+ static {
+ SimpleTestServiceBuilder<OpenSearchService> instance = builder();
+ instance.addLocalMapping(
+ () -> new SingletonOpenSearchService(new OpenSearchLocalContainerService(), "opensearch"))
+ .addRemoteMapping(RemoteOpenSearchService::new);
+ INSTANCE = instance.build();
+ }
+ }
+}
diff --git a/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/RemoteOpenSearchService.java b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/RemoteOpenSearchService.java
new file mode 100644
index 00000000000..1db5650e77c
--- /dev/null
+++ b/test-infra/camel-test-infra-opensearch/src/test/java/org/apache/camel/test/infra/opensearch/services/RemoteOpenSearchService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.opensearch.services;
+
+import org.apache.camel.test.infra.opensearch.common.OpenSearchProperties;
+
+public class RemoteOpenSearchService implements OpenSearchService {
+ private static final int OPEN_SEARCH_PORT = 9200;
+
+ @Override
+ public int getPort() {
+ String strPort = System.getProperty(OpenSearchProperties.OPEN_SEARCH_PORT);
+
+ if (strPort != null) {
+ return Integer.parseInt(strPort);
+ }
+
+ return OPEN_SEARCH_PORT;
+ }
+
+ @Override
+ public String getOpenSearchHost() {
+ return System.getProperty(OpenSearchProperties.OPEN_SEARCH_HOST);
+ }
+
+ @Override
+ public void registerProperties() {
+ // NO-OP
+ }
+
+ @Override
+ public void initialize() {
+ registerProperties();
+ }
+
+ @Override
+ public void shutdown() {
+ // NO-OP
+ }
+
+ @Override
+ public String getUsername() {
+ return System.getProperty(OpenSearchProperties.OPEN_SEARCH_USERNAME);
+ }
+
+ @Override
+ public String getPassword() {
+ return System.getProperty(OpenSearchProperties.OPEN_SEARCH_PASSWORD);
+ }
+}
diff --git a/test-infra/pom.xml b/test-infra/pom.xml
index e4f574441dc..5a0627502bd 100644
--- a/test-infra/pom.xml
+++ b/test-infra/pom.xml
@@ -74,5 +74,6 @@
<module>camel-test-infra-jetty</module>
<module>camel-test-infra-etcd3</module>
<module>camel-test-infra-core</module>
+ <module>camel-test-infra-opensearch</module>
</modules>
</project>