You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/02/28 13:51:11 UTC
[camel] 01/11: CAMEL-13165 - Camel-AWS: Create an AWS MSK component
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 80c410097a3f79844d9c9b678b4ca54494dcb2c6
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Feb 28 12:20:20 2019 +0100
CAMEL-13165 - Camel-AWS: Create an AWS MSK component
---
components/camel-aws-msk/bin/pom.xml | 81 +++++++++
.../bin/src/main/docs/aws-kms-component.adoc | 182 +++++++++++++++++++++
.../camel/component/aws/kms/MSKComponent.class | Bin 0 -> 6580 bytes
.../aws/kms/MSKComponentVerifierExtension.class | Bin 0 -> 5686 bytes
.../camel/component/aws/kms/MSKConfiguration.class | Bin 0 -> 5398 bytes
.../camel/component/aws/kms/MSKConstants.class | Bin 0 -> 799 bytes
.../camel/component/aws/kms/MSKEndpoint.class | Bin 0 -> 8731 bytes
.../camel/component/aws/kms/MSKOperations.class | Bin 0 -> 1146 bytes
.../camel/component/aws/kms/MSKProducer.class | Bin 0 -> 5348 bytes
.../component/aws/kms/AmazonMKSClientMock.class | Bin 0 -> 775 bytes
.../component/aws/kms/KMSProducerSpringTest.class | Bin 0 -> 9622 bytes
.../camel/component/aws/kms/KMSProducerTest.class | Bin 0 -> 9526 bytes
.../aws/kms/MSKComponentConfigurationTest.class | Bin 0 -> 2491 bytes
.../kms/MSKComponentVerifierExtensionTest.class | Bin 0 -> 3883 bytes
.../bin/src/test/resources/log4j2.properties | 28 ++++
.../aws/kms/KMSComponentSpringTest-context.xml | 60 +++++++
components/camel-aws-msk/pom.xml | 81 +++++++++
.../src/main/docs/aws-kms-component.adoc | 182 +++++++++++++++++++++
.../camel/component/aws/msk/MSKComponent.java | 121 ++++++++++++++
.../aws/msk/MSKComponentVerifierExtension.java | 89 ++++++++++
.../camel/component/aws/msk/MSKConfiguration.java | 137 ++++++++++++++++
.../camel/component/aws/msk/MSKConstants.java | 30 ++++
.../camel/component/aws/msk/MSKEndpoint.java | 121 ++++++++++++++
.../camel/component/aws/msk/MSKOperations.java | 24 +++
.../camel/component/aws/msk/MSKProducer.java | 170 +++++++++++++++++++
.../component/aws/msk/AmazonMSKClientMock.java | 65 ++++++++
.../aws/msk/MSKComponentConfigurationTest.java | 53 ++++++
.../aws/msk/MSKComponentVerifierExtensionTest.java | 74 +++++++++
.../component/aws/msk/MSKProducerSpringTest.java | 104 ++++++++++++
.../camel/component/aws/msk/MSKProducerTest.java | 129 +++++++++++++++
.../src/test/resources/log4j2.properties | 28 ++++
.../aws/msk/MSKComponentSpringTest-context.xml | 47 ++++++
platforms/spring-boot/components-starter/pom.xml | 1 +
33 files changed, 1807 insertions(+)
diff --git a/components/camel-aws-msk/bin/pom.xml b/components/camel-aws-msk/bin/pom.xml
new file mode 100644
index 0000000..c29ca1d
--- /dev/null
+++ b/components/camel-aws-msk/bin/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-aws-msk</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Camel :: AWS MSK</name>
+ <description>A Camel Amazon MSK Web Service Component</description>
+
+ <properties>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-support</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-kafka</artifactId>
+ <version>${aws-java-sdk-version}</version>
+ </dependency>
+
+ <!-- for testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc b/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc
new file mode 100644
index 0000000..77f8a26
--- /dev/null
+++ b/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc
@@ -0,0 +1,182 @@
+[[aws-kms-component]]
+== AWS KMS Component
+
+*Available as of Camel version 2.21*
+
+The KMS component supports create, run, start, stop and terminate
+https://aws.amazon.com/it/kms/[AWS KMS] instances.
+
+Prerequisites
+
+You must have a valid Amazon Web Services developer account, and be
+signed up to use Amazon KMS. More information are available at
+https://aws.amazon.com/it/mq/[Amazon KMS].
+
+### URI Format
+
+[source,java]
+-------------------------
+aws-kms://label[?options]
+-------------------------
+
+You can append query options to the URI in the following format,
+?options=value&option2=value&...
+
+### URI Options
+
+
+// component options: START
+The AWS KMS component supports 5 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *configuration* (advanced) | The AWS KMS default configuration | | KMSConfiguration
+| *accessKey* (producer) | Amazon AWS Access Key | | String
+| *secretKey* (producer) | Amazon AWS Secret Key | | String
+| *region* (producer) | The region in which KMS client needs to work | | String
+| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
+|===
+// component options: END
+
+
+
+
+// endpoint options: START
+The AWS KMS endpoint is configured using URI syntax:
+
+----
+aws-kms:label
+----
+
+with the following path and query parameters:
+
+==== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *label* | *Required* Logical name | | String
+|===
+
+
+==== Query Parameters (8 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *accessKey* (producer) | Amazon AWS Access Key | | String
+| *kmsClient* (producer) | To use a existing configured AWS KMS as client | | AWSKMS
+| *operation* (producer) | *Required* The operation to perform | | KMSOperations
+| *proxyHost* (producer) | To define a proxy host when instantiating the KMS client | | String
+| *proxyPort* (producer) | To define a proxy port when instantiating the KMS client | | Integer
+| *region* (producer) | The region in which KMS client needs to work | | String
+| *secretKey* (producer) | Amazon AWS Secret Key | | String
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+=== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws-kms-starter</artifactId>
+ <version>x.x.x</version>
+ <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 12 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.aws-kms.access-key* | Amazon AWS Access Key | | String
+| *camel.component.aws-kms.configuration.access-key* | Amazon AWS Access Key | | String
+| *camel.component.aws-kms.configuration.kms-client* | To use a existing configured AWS KMS as client | | AWSKMS
+| *camel.component.aws-kms.configuration.operation* | The operation to perform | | KMSOperations
+| *camel.component.aws-kms.configuration.proxy-host* | To define a proxy host when instantiating the KMS client | | String
+| *camel.component.aws-kms.configuration.proxy-port* | To define a proxy port when instantiating the KMS client | | Integer
+| *camel.component.aws-kms.configuration.region* | The region in which KMS client needs to work | | String
+| *camel.component.aws-kms.configuration.secret-key* | Amazon AWS Secret Key | | String
+| *camel.component.aws-kms.enabled* | Whether to enable auto configuration of the aws-kms component. This is enabled by default. | | Boolean
+| *camel.component.aws-kms.region* | The region in which KMS client needs to work | | String
+| *camel.component.aws-kms.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean
+| *camel.component.aws-kms.secret-key* | Amazon AWS Secret Key | | String
+|===
+// spring-boot-auto-configure options: END
+
+
+
+
+Required KMS component options
+
+You have to provide the amazonKmsClient in the
+Registry or your accessKey and secretKey to access
+the https://aws.amazon.com/it/kms/[Amazon KMS] service.
+
+### Usage
+
+#### Message headers evaluated by the MQ producer
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKMSLimit` |`Integer` |The limit number of keys to return while performing a listKeys operation
+
+|`CamelAwsKMSOperation` |`String` |The operation we want to perform
+
+|`CamelAwsKMSDescription` |`String` |A key description to use while performing a createKey operation
+
+|`CamelAwsKMSKeyId` |`String` |The key Id
+|=======================================================================
+
+#### KMS Producer operations
+
+Camel-AWS KMS component provides the following operation on the producer side:
+
+- listKeys
+- createKey
+- disableKey
+- scheduleKeyDeletion
+- describeKey
+- enableKey
+
+Dependencies
+
+Maven users will need to add the following dependency to their pom.xml.
+
+*pom.xml*
+
+[source,xml]
+---------------------------------------
+<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws-kms</artifactId>
+ <version>${camel-version}</version>
+</dependency>
+---------------------------------------
+
+where `${camel-version`} must be replaced by the actual version of Camel
+(2.16 or higher).
+
+### See Also
+
+* Configuring Camel
+* Component
+* Endpoint
+* Getting Started
+
+* AWS Component
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class
new file mode 100644
index 0000000..c7d4963
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class
new file mode 100644
index 0000000..144ae2e
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class
new file mode 100644
index 0000000..dbf1478
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class
new file mode 100644
index 0000000..fb77aaf
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class
new file mode 100644
index 0000000..7fd6129
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class
new file mode 100644
index 0000000..16ec708
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class
new file mode 100644
index 0000000..6c8b42e
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class
new file mode 100644
index 0000000..2a5517c
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class
new file mode 100644
index 0000000..5a6a3fe
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class
new file mode 100644
index 0000000..50605ab
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class
new file mode 100644
index 0000000..0e6e223
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class
new file mode 100644
index 0000000..e161b81
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/resources/log4j2.properties b/components/camel-aws-msk/bin/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..986f470
--- /dev/null
+++ b/components/camel-aws-msk/bin/src/test/resources/log4j2.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-aws-kms-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
diff --git a/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml b/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml
new file mode 100644
index 0000000..42eccfd
--- /dev/null
+++ b/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:listKeys"/>
+ <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=listKeys"/>
+ <to uri="mock:result"/>
+ </route>
+ <route>
+ <from uri="direct:createKey"/>
+ <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=createKey"/>
+ <to uri="mock:result"/>
+ </route>
+ <route>
+ <from uri="direct:disableKey"/>
+ <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=disableKey"/>
+ <to uri="mock:result"/>
+ </route>
+ <route>
+ <from uri="direct:enableKey"/>
+ <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=enableKey"/>
+ <to uri="mock:result"/>
+ </route>
+ <route>
+ <from uri="direct:scheduleDelete"/>
+ <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=scheduleKeyDeletion"/>
+ <to uri="mock:result"/>
+ </route>
+ <route>
+ <from uri="direct:describeKey"/>
+ <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&operation=describeKey"/>
+ <to uri="mock:result"/>
+ </route>
+ </camelContext>
+
+ <bean id="amazonKmsClient" class="org.apache.camel.component.aws.kms.AmazonKMSClientMock"/>
+</beans>
\ No newline at end of file
diff --git a/components/camel-aws-msk/pom.xml b/components/camel-aws-msk/pom.xml
new file mode 100644
index 0000000..c29ca1d
--- /dev/null
+++ b/components/camel-aws-msk/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-aws-msk</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Camel :: AWS MSK</name>
+ <description>A Camel Amazon MSK Web Service Component</description>
+
+ <properties>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-support</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-kafka</artifactId>
+ <version>${aws-java-sdk-version}</version>
+ </dependency>
+
+ <!-- for testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc b/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc
new file mode 100644
index 0000000..77f8a26
--- /dev/null
+++ b/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc
@@ -0,0 +1,182 @@
+[[aws-kms-component]]
+== AWS KMS Component
+
+*Available as of Camel version 2.21*
+
+The KMS component supports create, run, start, stop and terminate
+https://aws.amazon.com/it/kms/[AWS KMS] instances.
+
+Prerequisites
+
+You must have a valid Amazon Web Services developer account, and be
+signed up to use Amazon KMS. More information are available at
+https://aws.amazon.com/it/mq/[Amazon KMS].
+
+### URI Format
+
+[source,java]
+-------------------------
+aws-kms://label[?options]
+-------------------------
+
+You can append query options to the URI in the following format,
+?options=value&option2=value&...
+
+### URI Options
+
+
+// component options: START
+The AWS KMS component supports 5 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *configuration* (advanced) | The AWS KMS default configuration | | KMSConfiguration
+| *accessKey* (producer) | Amazon AWS Access Key | | String
+| *secretKey* (producer) | Amazon AWS Secret Key | | String
+| *region* (producer) | The region in which KMS client needs to work | | String
+| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
+|===
+// component options: END
+
+
+
+
+// endpoint options: START
+The AWS KMS endpoint is configured using URI syntax:
+
+----
+aws-kms:label
+----
+
+with the following path and query parameters:
+
+==== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *label* | *Required* Logical name | | String
+|===
+
+
+==== Query Parameters (8 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *accessKey* (producer) | Amazon AWS Access Key | | String
+| *kmsClient* (producer) | To use a existing configured AWS KMS as client | | AWSKMS
+| *operation* (producer) | *Required* The operation to perform | | KMSOperations
+| *proxyHost* (producer) | To define a proxy host when instantiating the KMS client | | String
+| *proxyPort* (producer) | To define a proxy port when instantiating the KMS client | | Integer
+| *region* (producer) | The region in which KMS client needs to work | | String
+| *secretKey* (producer) | Amazon AWS Secret Key | | String
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+=== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws-kms-starter</artifactId>
+ <version>x.x.x</version>
+ <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 12 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.aws-kms.access-key* | Amazon AWS Access Key | | String
+| *camel.component.aws-kms.configuration.access-key* | Amazon AWS Access Key | | String
+| *camel.component.aws-kms.configuration.kms-client* | To use a existing configured AWS KMS as client | | AWSKMS
+| *camel.component.aws-kms.configuration.operation* | The operation to perform | | KMSOperations
+| *camel.component.aws-kms.configuration.proxy-host* | To define a proxy host when instantiating the KMS client | | String
+| *camel.component.aws-kms.configuration.proxy-port* | To define a proxy port when instantiating the KMS client | | Integer
+| *camel.component.aws-kms.configuration.region* | The region in which KMS client needs to work | | String
+| *camel.component.aws-kms.configuration.secret-key* | Amazon AWS Secret Key | | String
+| *camel.component.aws-kms.enabled* | Whether to enable auto configuration of the aws-kms component. This is enabled by default. | | Boolean
+| *camel.component.aws-kms.region* | The region in which KMS client needs to work | | String
+| *camel.component.aws-kms.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean
+| *camel.component.aws-kms.secret-key* | Amazon AWS Secret Key | | String
+|===
+// spring-boot-auto-configure options: END
+
+
+
+
+Required KMS component options
+
+You have to provide the amazonKmsClient in the
+Registry or your accessKey and secretKey to access
+the https://aws.amazon.com/it/kms/[Amazon KMS] service.
+
+### Usage
+
+#### Message headers evaluated by the MQ producer
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKMSLimit` |`Integer` |The limit number of keys to return while performing a listKeys operation
+
+|`CamelAwsKMSOperation` |`String` |The operation we want to perform
+
+|`CamelAwsKMSDescription` |`String` |A key description to use while performing a createKey operation
+
+|`CamelAwsKMSKeyId` |`String` |The key Id
+|=======================================================================
+
+#### KMS Producer operations
+
+Camel-AWS KMS component provides the following operation on the producer side:
+
+- listKeys
+- createKey
+- disableKey
+- scheduleKeyDeletion
+- describeKey
+- enableKey
+
+Dependencies
+
+Maven users will need to add the following dependency to their pom.xml.
+
+*pom.xml*
+
+[source,xml]
+---------------------------------------
+<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-aws-kms</artifactId>
+ <version>${camel-version}</version>
+</dependency>
+---------------------------------------
+
+where `${camel-version`} must be replaced by the actual version of Camel
+(2.16 or higher).
+
+### See Also
+
+* Configuring Camel
+* Component
+* Endpoint
+* Getting Started
+
+* AWS Component
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java
new file mode 100644
index 0000000..5b6086a
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * For working with Amazon KMS.
+ */
+@Component("aws-msk")
+public class MSKComponent extends DefaultComponent {
+
+ @Metadata
+ private String accessKey;
+ @Metadata
+ private String secretKey;
+ @Metadata
+ private String region;
+ @Metadata(label = "advanced")
+ private MSKConfiguration configuration;
+
+ public MSKComponent() {
+ this(null);
+ }
+
+ public MSKComponent(CamelContext context) {
+ super(context);
+
+ this.configuration = new MSKConfiguration();
+ registerExtension(new MSKComponentVerifierExtension());
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ MSKConfiguration configuration = this.configuration.copy();
+ setProperties(configuration, parameters);
+
+ if (ObjectHelper.isEmpty(configuration.getAccessKey())) {
+ setAccessKey(accessKey);
+ }
+ if (ObjectHelper.isEmpty(configuration.getSecretKey())) {
+ setSecretKey(secretKey);
+ }
+ if (ObjectHelper.isEmpty(configuration.getRegion())) {
+ setRegion(region);
+ }
+ if (configuration.getMskClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
+ throw new IllegalArgumentException("Amazon msk client or accessKey and secretKey must be specified");
+ }
+
+ MSKEndpoint endpoint = new MSKEndpoint(uri, this, configuration);
+ return endpoint;
+ }
+
+ public MSKConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ /**
+ * The AWS KMS default configuration
+ */
+ public void setConfiguration(MSKConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public String getAccessKey() {
+ return configuration.getAccessKey();
+ }
+
+ /**
+ * Amazon AWS Access Key
+ */
+ public void setAccessKey(String accessKey) {
+ configuration.setAccessKey(accessKey);
+ }
+
+ public String getSecretKey() {
+ return configuration.getSecretKey();
+ }
+
+ /**
+ * Amazon AWS Secret Key
+ */
+ public void setSecretKey(String secretKey) {
+ configuration.setSecretKey(secretKey);
+ }
+
+ public String getRegion() {
+ return configuration.getRegion();
+ }
+
+ /**
+ * The region in which KMS client needs to work
+ */
+ public void setRegion(String region) {
+ configuration.setRegion(region);
+ }
+
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java
new file mode 100644
index 0000000..bf1fa3b
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.Map;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kafka.AWSKafka;
+import com.amazonaws.services.kafka.AWSKafkaClientBuilder;
+import com.amazonaws.services.kafka.model.ListClustersRequest;
+
+import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension;
+import org.apache.camel.component.extension.verifier.ResultBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorHelper;
+
+public class MSKComponentVerifierExtension extends DefaultComponentVerifierExtension {
+
+ public MSKComponentVerifierExtension() {
+ this("aws-msk");
+ }
+
+ public MSKComponentVerifierExtension(String scheme) {
+ super(scheme);
+ }
+
+ // *********************************
+ // Parameters validation
+ // *********************************
+
+ @Override
+ protected Result verifyParameters(Map<String, Object> parameters) {
+
+ ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.PARAMETERS).error(ResultErrorHelper.requiresOption("accessKey", parameters))
+ .error(ResultErrorHelper.requiresOption("secretKey", parameters)).error(ResultErrorHelper.requiresOption("region", parameters));
+
+ // Validate using the catalog
+
+ super.verifyParametersAgainstCatalog(builder, parameters);
+
+ return builder.build();
+ }
+
+ // *********************************
+ // Connectivity validation
+ // *********************************
+
+ @Override
+ protected Result verifyConnectivity(Map<String, Object> parameters) {
+ ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.CONNECTIVITY);
+
+ try {
+ MSKConfiguration configuration = setProperties(new MSKConfiguration(), parameters);
+ AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+ AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+ ListClustersRequest request = new ListClustersRequest();
+ AWSKafka client = AWSKafkaClientBuilder.standard().withCredentials(credentialsProvider).withRegion(Regions.valueOf(configuration.getRegion())).build();
+ client.listClusters(request);
+ } catch (SdkClientException e) {
+ ResultErrorBuilder errorBuilder = ResultErrorBuilder.withCodeAndDescription(VerificationError.StandardCode.AUTHENTICATION, e.getMessage())
+ .detail("aws_mks_exception_message", e.getMessage()).detail(VerificationError.ExceptionAttribute.EXCEPTION_CLASS, e.getClass().getName())
+ .detail(VerificationError.ExceptionAttribute.EXCEPTION_INSTANCE, e);
+
+ builder.error(errorBuilder.build());
+ } catch (Exception e) {
+ builder.error(ResultErrorBuilder.withException(e).build());
+ }
+ return builder.build();
+ }
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java
new file mode 100644
index 0000000..93e86bd
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+import com.amazonaws.services.kafka.AWSKafka;
+
+@UriParams
+public class MSKConfiguration implements Cloneable {
+
+ @UriPath(description = "Logical name")
+ @Metadata(required = true)
+ private String label;
+ @UriParam(label = "producer")
+ private AWSKafka mskClient;
+ @UriParam(label = "producer", secret = true)
+ private String accessKey;
+ @UriParam(label = "producer", secret = true)
+ private String secretKey;
+ @UriParam(label = "producer")
+ @Metadata(required = true)
+ private MSKOperations operation;
+ @UriParam(label = "producer")
+ private String proxyHost;
+ @UriParam(label = "producer")
+ private Integer proxyPort;
+ @UriParam
+ private String region;
+
+ public AWSKafka getMskClient() {
+ return mskClient;
+ }
+
+ /**
+ * To use a existing configured AWS MSK as client
+ */
+ public void setMskClient(AWSKafka mskClient) {
+ this.mskClient = mskClient;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ /**
+ * Amazon AWS Access Key
+ */
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ /**
+ * Amazon AWS Secret Key
+ */
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public MSKOperations getOperation() {
+ return operation;
+ }
+
+ /**
+ * The operation to perform
+ */
+ public void setOperation(MSKOperations operation) {
+ this.operation = operation;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ /**
+ * To define a proxy host when instantiating the KMS client
+ */
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public Integer getProxyPort() {
+ return proxyPort;
+ }
+
+ /**
+ * To define a proxy port when instantiating the KMS client
+ */
+ public void setProxyPort(Integer proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ /**
+ * The region in which KMS client needs to work
+ */
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ // *************************************************
+ //
+ // *************************************************
+
+ public MSKConfiguration copy() {
+ try {
+ return (MSKConfiguration)super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java
new file mode 100644
index 0000000..5f2c1e5
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+/**
+ * Constants used in Camel AWS MSK module
+ */
+public interface MSKConstants {
+ String OPERATION = "CamelAwsMSKOperation";
+ String CLUSTERS_FILTER = "CamelAwsMSKClusterFilter";
+ String CLUSTER_NAME = "CamelAwsMSKClusterName";
+ String CLUSTER_ARN = "CamelAwsMSKClusterArn";
+ String CLUSTER_KAFKA_VERSION = "CamelAwsMSKClusterKafkaVersion";
+ String BROKER_NODES_NUMBER = "CamelAwsMSKBrokerNodesNumber";
+ String BROKER_NODES_GROUP_INFO = "CamelAwsMSKBrokerNodesGroupInfo";
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java
new file mode 100644
index 0000000..f49ab3f
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.kafka.AWSKafka;
+import com.amazonaws.services.kafka.AWSKafkaClientBuilder;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * The aws-kms is used for managing Amazon KMS
+ */
+@UriEndpoint(firstVersion = "3.0.0", scheme = "aws-msk", title = "AWS MSK", syntax = "aws-msk:label", producerOnly = true, label = "cloud,management")
+public class MSKEndpoint extends ScheduledPollEndpoint {
+
+ private AWSKafka mskClient;
+
+ @UriParam
+ private MSKConfiguration configuration;
+
+ public MSKEndpoint(String uri, Component component, MSKConfiguration configuration) {
+ super(uri, component);
+ this.configuration = configuration;
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
+ }
+
+ public Producer createProducer() throws Exception {
+ return new MSKProducer(this);
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ @Override
+ public void doStart() throws Exception {
+ super.doStart();
+
+ mskClient = configuration.getMskClient() != null ? configuration.getMskClient() : createMSKClient();
+ }
+
+ @Override
+ public void doStop() throws Exception {
+ if (ObjectHelper.isEmpty(configuration.getMskClient())) {
+ if (mskClient != null) {
+ mskClient.shutdown();
+ }
+ }
+ super.doStop();
+ }
+
+ public MSKConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public AWSKafka getMskClient() {
+ return mskClient;
+ }
+
+ AWSKafka createMSKClient() {
+ AWSKafka client = null;
+ ClientConfiguration clientConfiguration = null;
+ AWSKafkaClientBuilder clientBuilder = null;
+ boolean isClientConfigFound = false;
+ if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+ clientConfiguration = new ClientConfiguration();
+ clientConfiguration.setProxyHost(configuration.getProxyHost());
+ clientConfiguration.setProxyPort(configuration.getProxyPort());
+ isClientConfigFound = true;
+ }
+ if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {
+ AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+ AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+ if (isClientConfigFound) {
+ clientBuilder = AWSKafkaClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider);
+ } else {
+ clientBuilder = AWSKafkaClientBuilder.standard().withCredentials(credentialsProvider);
+ }
+ } else {
+ if (isClientConfigFound) {
+ clientBuilder = AWSKafkaClientBuilder.standard();
+ } else {
+ clientBuilder = AWSKafkaClientBuilder.standard().withClientConfiguration(clientConfiguration);
+ }
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+ clientBuilder = clientBuilder.withRegion(configuration.getRegion());
+ }
+ client = clientBuilder.build();
+ return client;
+ }
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java
new file mode 100644
index 0000000..a06b4e7
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+public enum MSKOperations {
+
+ listClusters,
+ createCluster,
+ deleteCluster
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java
new file mode 100644
index 0000000..b59b9e1
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.kafka.AWSKafka;
+import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo;
+import com.amazonaws.services.kafka.model.CreateClusterRequest;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterRequest;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersRequest;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+
+/**
+ * A Producer which sends messages to the Amazon MSK Service
+ * <a href="http://aws.amazon.com/msk/">AWS MSK</a>
+ */
+public class MSKProducer extends DefaultProducer {
+
+ private transient String mskProducerToString;
+
+ public MSKProducer(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ switch (determineOperation(exchange)) {
+ case listClusters:
+ listClusters(getEndpoint().getMskClient(), exchange);
+ break;
+ case createCluster:
+ createCluster(getEndpoint().getMskClient(), exchange);
+ break;
+ case deleteCluster:
+ deleteCluster(getEndpoint().getMskClient(), exchange);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported operation");
+ }
+ }
+
+ private MSKOperations determineOperation(Exchange exchange) {
+ MSKOperations operation = exchange.getIn().getHeader(MSKConstants.OPERATION, MSKOperations.class);
+ if (operation == null) {
+ operation = getConfiguration().getOperation();
+ }
+ return operation;
+ }
+
+ protected MSKConfiguration getConfiguration() {
+ return getEndpoint().getConfiguration();
+ }
+
+ @Override
+ public String toString() {
+ if (mskProducerToString == null) {
+ mskProducerToString = "MSKProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+ }
+ return mskProducerToString;
+ }
+
+ @Override
+ public MSKEndpoint getEndpoint() {
+ return (MSKEndpoint)super.getEndpoint();
+ }
+
+ private void listClusters(AWSKafka mskClient, Exchange exchange) {
+ ListClustersRequest request = new ListClustersRequest();
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTERS_FILTER))) {
+ String filter = exchange.getIn().getHeader(MSKConstants.CLUSTERS_FILTER, String.class);
+ request.withClusterNameFilter(filter);
+ }
+ ListClustersResult result;
+ try {
+ result = mskClient.listClusters(request);
+ } catch (AmazonServiceException ase) {
+ log.trace("List Clusters command returned the error code {}", ase.getErrorCode());
+ throw ase;
+ }
+ Message message = getMessageForResponse(exchange);
+ message.setBody(result);
+ }
+
+ private void createCluster(AWSKafka mskClient, Exchange exchange) {
+ CreateClusterRequest request = new CreateClusterRequest();
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_NAME))) {
+ String name = exchange.getIn().getHeader(MSKConstants.CLUSTER_NAME, String.class);
+ request.withClusterName(name);
+ } else {
+ throw new IllegalArgumentException("Cluster Name must be specified");
+ }
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_KAFKA_VERSION))) {
+ String version = exchange.getIn().getHeader(MSKConstants.CLUSTER_KAFKA_VERSION, String.class);
+ request.withKafkaVersion(version);
+ } else {
+ throw new IllegalArgumentException("Kafka Version must be specified");
+ }
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.BROKER_NODES_NUMBER))) {
+ Integer nodesNumber = exchange.getIn().getHeader(MSKConstants.BROKER_NODES_NUMBER, Integer.class);
+ request.withNumberOfBrokerNodes(nodesNumber);
+ } else {
+ throw new IllegalArgumentException("Kafka Version must be specified");
+ }
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.BROKER_NODES_GROUP_INFO))) {
+ BrokerNodeGroupInfo brokerNodesGroupInfo = exchange.getIn().getHeader(MSKConstants.BROKER_NODES_GROUP_INFO, BrokerNodeGroupInfo.class);
+ request.withBrokerNodeGroupInfo(brokerNodesGroupInfo);
+ } else {
+ throw new IllegalArgumentException("BrokerNodeGroupInfo must be specified");
+ }
+ CreateClusterResult result;
+ try {
+ result = mskClient.createCluster(request);
+ } catch (AmazonServiceException ase) {
+ log.trace("Create Cluster command returned the error code {}", ase.getErrorCode());
+ throw ase;
+ }
+ Message message = getMessageForResponse(exchange);
+ message.setBody(result);
+ }
+
+ private void deleteCluster(AWSKafka mskClient, Exchange exchange) {
+ DeleteClusterRequest request = new DeleteClusterRequest();
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_ARN))) {
+ String arn = exchange.getIn().getHeader(MSKConstants.CLUSTER_ARN, String.class);
+ request.withClusterArn(arn);
+ } else {
+ throw new IllegalArgumentException("Cluster ARN must be specified");
+ }
+ DeleteClusterResult result;
+ try {
+ result = mskClient.deleteCluster(request);
+ } catch (AmazonServiceException ase) {
+ log.trace("Delete Cluster command returned the error code {}", ase.getErrorCode());
+ throw ase;
+ }
+ Message message = getMessageForResponse(exchange);
+ message.setBody(result);
+ }
+
+ public static Message getMessageForResponse(final Exchange exchange) {
+ if (exchange.getPattern().isOutCapable()) {
+ Message out = exchange.getOut();
+ out.copyFrom(exchange.getIn());
+ return out;
+ }
+ return exchange.getIn();
+ }
+}
\ No newline at end of file
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java
new file mode 100644
index 0000000..b70c979
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.amazonaws.services.kafka.AbstractAWSKafka;
+import com.amazonaws.services.kafka.model.ClusterInfo;
+import com.amazonaws.services.kafka.model.ClusterState;
+import com.amazonaws.services.kafka.model.CreateClusterRequest;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterRequest;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersRequest;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+
+public class AmazonMSKClientMock extends AbstractAWSKafka {
+
+ public AmazonMSKClientMock() {
+ super();
+ }
+
+ @Override
+ public ListClustersResult listClusters(ListClustersRequest request) {
+ ListClustersResult result = new ListClustersResult();
+ List<ClusterInfo> info = new ArrayList<ClusterInfo>();
+ ClusterInfo info1 = new ClusterInfo();
+ info1.setClusterName("test-kafka");
+ info.add(info1);
+ result.setClusterInfoList(info);
+ return result;
+ }
+
+ @Override
+ public CreateClusterResult createCluster(CreateClusterRequest request) {
+ CreateClusterResult result = new CreateClusterResult();
+ result.setClusterName(request.getClusterName());
+ result.setState(ClusterState.CREATING.name());
+ return result;
+ }
+
+ @Override
+ public DeleteClusterResult deleteCluster(DeleteClusterRequest request) {
+ DeleteClusterResult res = new DeleteClusterResult();
+ res.setClusterArn(request.getClusterArn());
+ res.setState(ClusterState.DELETING.name());
+ return res;
+ }
+}
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java
new file mode 100644
index 0000000..ec81a5e
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import com.amazonaws.regions.Regions;
+
+import org.apache.camel.component.aws.msk.MSKComponent;
+import org.apache.camel.component.aws.msk.MSKEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class MSKComponentConfigurationTest extends CamelTestSupport {
+
+
+ @Test
+ public void createEndpointWithComponentElements() throws Exception {
+ MSKComponent component = new MSKComponent(context);
+ component.setAccessKey("XXX");
+ component.setSecretKey("YYY");
+ MSKEndpoint endpoint = (MSKEndpoint)component.createEndpoint("aws-msk://label");
+
+ assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
+ assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
+ }
+
+ @Test
+ public void createEndpointWithComponentAndEndpointElements() throws Exception {
+ MSKComponent component = new MSKComponent(context);
+ component.setAccessKey("XXX");
+ component.setSecretKey("YYY");
+ component.setRegion(Regions.US_WEST_1.toString());
+ MSKEndpoint endpoint = (MSKEndpoint)component.createEndpoint("aws-msk://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1");
+
+ assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+ assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+ assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+ }
+
+}
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java
new file mode 100644
index 0000000..cf57117
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.component.aws.msk.MSKOperations;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MSKComponentVerifierExtensionTest extends CamelTestSupport {
+
+ // *************************************************
+ // Tests (parameters)
+ // *************************************************
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testParameters() throws Exception {
+ Component component = context().getComponent("aws-msk");
+
+ ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("secretKey", "l");
+ parameters.put("accessKey", "k");
+ parameters.put("region", "l");
+ parameters.put("label", "test");
+ parameters.put("operation", MSKOperations.listClusters);
+
+ ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+
+ Assert.assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+ }
+
+ @Test
+ public void testConnectivity() throws Exception {
+ Component component = context().getComponent("aws-msk");
+ ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put("secretKey", "l");
+ parameters.put("accessKey", "k");
+ parameters.put("region", "US_EAST_1");
+ parameters.put("label", "test");
+ parameters.put("operation", MSKOperations.listClusters);
+
+ ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+
+ Assert.assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+ }
+
+}
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java
new file mode 100644
index 0000000..50a26f2
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.aws.msk.MSKConstants;
+import org.apache.camel.component.aws.msk.MSKOperations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo;
+import com.amazonaws.services.kafka.model.ClusterState;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+public class MSKProducerSpringTest extends CamelSpringTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mock;
+
+ @Test
+ public void kmsListKeysTest() throws Exception {
+
+ mock.expectedMessageCount(1);
+ Exchange exchange = template.request("direct:listClusters", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.listClusters);
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ ListClustersResult resultGet = (ListClustersResult) exchange.getIn().getBody();
+ assertEquals(1, resultGet.getClusterInfoList().size());
+ assertEquals("test-kafka", resultGet.getClusterInfoList().get(0).getClusterName());
+ }
+
+ @Test
+ public void mskCreateClusterTest() throws Exception {
+
+ mock.expectedMessageCount(1);
+ Exchange exchange = template.request("direct:createCluster", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.createCluster);
+ exchange.getIn().setHeader(MSKConstants.CLUSTER_NAME, "test-kafka");
+ exchange.getIn().setHeader(MSKConstants.CLUSTER_KAFKA_VERSION, "2.1.1");
+ exchange.getIn().setHeader(MSKConstants.BROKER_NODES_NUMBER, 2);
+ BrokerNodeGroupInfo groupInfo = new BrokerNodeGroupInfo();
+ exchange.getIn().setHeader(MSKConstants.BROKER_NODES_GROUP_INFO, groupInfo);
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ CreateClusterResult resultGet = (CreateClusterResult) exchange.getIn().getBody();
+ assertEquals("test-kafka", resultGet.getClusterName());
+ assertEquals(ClusterState.CREATING.name(), resultGet.getState());
+ }
+
+ @Test
+ public void mskDeleteClusterTest() throws Exception {
+
+ mock.expectedMessageCount(1);
+ Exchange exchange = template.request("direct:deleteCluster", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.deleteCluster);
+ exchange.getIn().setHeader(MSKConstants.CLUSTER_ARN, "test-kafka");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ DeleteClusterResult resultGet = (DeleteClusterResult) exchange.getIn().getBody();
+ assertEquals("test-kafka", resultGet.getClusterArn());
+ assertEquals(ClusterState.DELETING.name(), resultGet.getState());
+ }
+
+ @Override
+ protected ClassPathXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml");
+ }
+}
\ No newline at end of file
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java
new file mode 100644
index 0000000..23d3ada
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws.msk.MSKConstants;
+import org.apache.camel.component.aws.msk.MSKOperations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo;
+import com.amazonaws.services.kafka.model.ClusterState;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+public class MSKProducerTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mock;
+
+ @Test
+ public void kmsListClustersTest() throws Exception {
+
+ mock.expectedMessageCount(1);
+ Exchange exchange = template.request("direct:listClusters", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.listClusters);
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ ListClustersResult resultGet = (ListClustersResult) exchange.getIn().getBody();
+ assertEquals(1, resultGet.getClusterInfoList().size());
+ assertEquals("test-kafka", resultGet.getClusterInfoList().get(0).getClusterName());
+ }
+
+ @Test
+ public void mskCreateClusterTest() throws Exception {
+
+ mock.expectedMessageCount(1);
+ Exchange exchange = template.request("direct:createCluster", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.createCluster);
+ exchange.getIn().setHeader(MSKConstants.CLUSTER_NAME, "test-kafka");
+ exchange.getIn().setHeader(MSKConstants.CLUSTER_KAFKA_VERSION, "2.1.1");
+ exchange.getIn().setHeader(MSKConstants.BROKER_NODES_NUMBER, 2);
+ BrokerNodeGroupInfo groupInfo = new BrokerNodeGroupInfo();
+ exchange.getIn().setHeader(MSKConstants.BROKER_NODES_GROUP_INFO, groupInfo);
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ CreateClusterResult resultGet = (CreateClusterResult) exchange.getIn().getBody();
+ assertEquals("test-kafka", resultGet.getClusterName());
+ assertEquals(ClusterState.CREATING.name(), resultGet.getState());
+ }
+
+ @Test
+ public void mskDeleteClusterTest() throws Exception {
+
+ mock.expectedMessageCount(1);
+ Exchange exchange = template.request("direct:deleteCluster", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.deleteCluster);
+ exchange.getIn().setHeader(MSKConstants.CLUSTER_ARN, "test-kafka");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ DeleteClusterResult resultGet = (DeleteClusterResult) exchange.getIn().getBody();
+ assertEquals("test-kafka", resultGet.getClusterArn());
+ assertEquals(ClusterState.DELETING.name(), resultGet.getState());
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ AmazonMSKClientMock clientMock = new AmazonMSKClientMock();
+
+ registry.bind("amazonMskClient", clientMock);
+
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:listClusters")
+ .to("aws-msk://test?mskClient=#amazonMskClient&operation=listClusters")
+ .to("mock:result");
+ from("direct:createCluster")
+ .to("aws-msk://test?mskClient=#amazonMskClient&operation=createCluster")
+ .to("mock:result");
+ from("direct:deleteCluster")
+ .to("aws-msk://test?mskClient=#amazonMskClient&operation=deleteCluster")
+ .to("mock:result");
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/components/camel-aws-msk/src/test/resources/log4j2.properties b/components/camel-aws-msk/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..986f470
--- /dev/null
+++ b/components/camel-aws-msk/src/test/resources/log4j2.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-aws-kms-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
diff --git a/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml b/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml
new file mode 100644
index 0000000..af87975
--- /dev/null
+++ b/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <route>
+ <from uri="direct:listClusters"/>
+ <to uri="aws-msk://Test?mskClient=#amazonMskClient&operation=listClusters"/>
+ <to uri="mock:result"/>
+ </route>
+
+ <route>
+ <from uri="direct:createCluster"/>
+ <to uri="aws-msk://Test?mskClient=#amazonMskClient&operation=createCluster"/>
+ <to uri="mock:result"/>
+ </route>
+
+ <route>
+ <from uri="direct:deleteCluster"/>
+ <to uri="aws-msk://Test?mskClient=#amazonMskClient&operation=deleteCluster"/>
+ <to uri="mock:result"/>
+ </route>
+ </camelContext>
+
+ <bean id="amazonMskClient" class="org.apache.camel.component.aws.msk.AmazonMSKClientMock"/>
+</beans>
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/pom.xml b/platforms/spring-boot/components-starter/pom.xml
index 2bc538c..a1dde11 100644
--- a/platforms/spring-boot/components-starter/pom.xml
+++ b/platforms/spring-boot/components-starter/pom.xml
@@ -124,6 +124,7 @@
<module>camel-aws-kms-starter</module>
<module>camel-aws-lambda-starter</module>
<module>camel-aws-mq-starter</module>
+ <module>camel-aws-msk-starter</module>
<module>camel-aws-s3-starter</module>
<module>camel-aws-sdb-starter</module>
<module>camel-aws-ses-starter</module>