You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/02/28 13:51:11 UTC

[camel] 01/11: CAMEL-13165 - Camel-AWS: Create an AWS MSK component

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 80c410097a3f79844d9c9b678b4ca54494dcb2c6
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Feb 28 12:20:20 2019 +0100

    CAMEL-13165 - Camel-AWS: Create an AWS MSK component
---
 components/camel-aws-msk/bin/pom.xml               |  81 +++++++++
 .../bin/src/main/docs/aws-kms-component.adoc       | 182 +++++++++++++++++++++
 .../camel/component/aws/kms/MSKComponent.class     | Bin 0 -> 6580 bytes
 .../aws/kms/MSKComponentVerifierExtension.class    | Bin 0 -> 5686 bytes
 .../camel/component/aws/kms/MSKConfiguration.class | Bin 0 -> 5398 bytes
 .../camel/component/aws/kms/MSKConstants.class     | Bin 0 -> 799 bytes
 .../camel/component/aws/kms/MSKEndpoint.class      | Bin 0 -> 8731 bytes
 .../camel/component/aws/kms/MSKOperations.class    | Bin 0 -> 1146 bytes
 .../camel/component/aws/kms/MSKProducer.class      | Bin 0 -> 5348 bytes
 .../component/aws/kms/AmazonMKSClientMock.class    | Bin 0 -> 775 bytes
 .../component/aws/kms/KMSProducerSpringTest.class  | Bin 0 -> 9622 bytes
 .../camel/component/aws/kms/KMSProducerTest.class  | Bin 0 -> 9526 bytes
 .../aws/kms/MSKComponentConfigurationTest.class    | Bin 0 -> 2491 bytes
 .../kms/MSKComponentVerifierExtensionTest.class    | Bin 0 -> 3883 bytes
 .../bin/src/test/resources/log4j2.properties       |  28 ++++
 .../aws/kms/KMSComponentSpringTest-context.xml     |  60 +++++++
 components/camel-aws-msk/pom.xml                   |  81 +++++++++
 .../src/main/docs/aws-kms-component.adoc           | 182 +++++++++++++++++++++
 .../camel/component/aws/msk/MSKComponent.java      | 121 ++++++++++++++
 .../aws/msk/MSKComponentVerifierExtension.java     |  89 ++++++++++
 .../camel/component/aws/msk/MSKConfiguration.java  | 137 ++++++++++++++++
 .../camel/component/aws/msk/MSKConstants.java      |  30 ++++
 .../camel/component/aws/msk/MSKEndpoint.java       | 121 ++++++++++++++
 .../camel/component/aws/msk/MSKOperations.java     |  24 +++
 .../camel/component/aws/msk/MSKProducer.java       | 170 +++++++++++++++++++
 .../component/aws/msk/AmazonMSKClientMock.java     |  65 ++++++++
 .../aws/msk/MSKComponentConfigurationTest.java     |  53 ++++++
 .../aws/msk/MSKComponentVerifierExtensionTest.java |  74 +++++++++
 .../component/aws/msk/MSKProducerSpringTest.java   | 104 ++++++++++++
 .../camel/component/aws/msk/MSKProducerTest.java   | 129 +++++++++++++++
 .../src/test/resources/log4j2.properties           |  28 ++++
 .../aws/msk/MSKComponentSpringTest-context.xml     |  47 ++++++
 platforms/spring-boot/components-starter/pom.xml   |   1 +
 33 files changed, 1807 insertions(+)

diff --git a/components/camel-aws-msk/bin/pom.xml b/components/camel-aws-msk/bin/pom.xml
new file mode 100644
index 0000000..c29ca1d
--- /dev/null
+++ b/components/camel-aws-msk/bin/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>3.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-aws-msk</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Camel :: AWS MSK</name>
+    <description>A Camel Amazon MSK Web Service Component</description>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-kafka</artifactId>
+            <version>${aws-java-sdk-version}</version>
+        </dependency>
+
+        <!-- for testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-spring</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc b/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc
new file mode 100644
index 0000000..77f8a26
--- /dev/null
+++ b/components/camel-aws-msk/bin/src/main/docs/aws-kms-component.adoc
@@ -0,0 +1,182 @@
+[[aws-kms-component]]
+== AWS KMS Component
+
+*Available as of Camel version 2.21*
+
+The KMS component supports create, run, start, stop and terminate
+https://aws.amazon.com/it/kms/[AWS KMS] instances.
+
+Prerequisites
+
+You must have a valid Amazon Web Services developer account, and be
+signed up to use Amazon KMS. More information are available at
+https://aws.amazon.com/it/mq/[Amazon KMS].
+
+### URI Format
+
+[source,java]
+-------------------------
+aws-kms://label[?options]
+-------------------------
+
+You can append query options to the URI in the following format,
+?options=value&option2=value&...
+
+### URI Options
+
+
+// component options: START
+The AWS KMS component supports 5 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *configuration* (advanced) | The AWS KMS default configuration |  | KMSConfiguration
+| *accessKey* (producer) | Amazon AWS Access Key |  | String
+| *secretKey* (producer) | Amazon AWS Secret Key |  | String
+| *region* (producer) | The region in which KMS client needs to work |  | String
+| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
+|===
+// component options: END
+
+
+
+
+// endpoint options: START
+The AWS KMS endpoint is configured using URI syntax:
+
+----
+aws-kms:label
+----
+
+with the following path and query parameters:
+
+==== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *label* | *Required* Logical name |  | String
+|===
+
+
+==== Query Parameters (8 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *accessKey* (producer) | Amazon AWS Access Key |  | String
+| *kmsClient* (producer) | To use a existing configured AWS KMS as client |  | AWSKMS
+| *operation* (producer) | *Required* The operation to perform |  | KMSOperations
+| *proxyHost* (producer) | To define a proxy host when instantiating the KMS client |  | String
+| *proxyPort* (producer) | To define a proxy port when instantiating the KMS client |  | Integer
+| *region* (producer) | The region in which KMS client needs to work |  | String
+| *secretKey* (producer) | Amazon AWS Secret Key |  | String
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+=== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+  <groupId>org.apache.camel</groupId>
+  <artifactId>camel-aws-kms-starter</artifactId>
+  <version>x.x.x</version>
+  <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 12 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.aws-kms.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-kms.configuration.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-kms.configuration.kms-client* | To use a existing configured AWS KMS as client |  | AWSKMS
+| *camel.component.aws-kms.configuration.operation* | The operation to perform |  | KMSOperations
+| *camel.component.aws-kms.configuration.proxy-host* | To define a proxy host when instantiating the KMS client |  | String
+| *camel.component.aws-kms.configuration.proxy-port* | To define a proxy port when instantiating the KMS client |  | Integer
+| *camel.component.aws-kms.configuration.region* | The region in which KMS client needs to work |  | String
+| *camel.component.aws-kms.configuration.secret-key* | Amazon AWS Secret Key |  | String
+| *camel.component.aws-kms.enabled* | Whether to enable auto configuration of the aws-kms component. This is enabled by default. |  | Boolean
+| *camel.component.aws-kms.region* | The region in which KMS client needs to work |  | String
+| *camel.component.aws-kms.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean
+| *camel.component.aws-kms.secret-key* | Amazon AWS Secret Key |  | String
+|===
+// spring-boot-auto-configure options: END
+
+
+
+
+Required KMS component options
+
+You have to provide the amazonKmsClient in the
+Registry or your accessKey and secretKey to access
+the https://aws.amazon.com/it/kms/[Amazon KMS] service.
+
+### Usage
+
+#### Message headers evaluated by the MQ producer
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKMSLimit` |`Integer` |The limit number of keys to return while performing a listKeys operation
+
+|`CamelAwsKMSOperation` |`String` |The operation we want to perform
+
+|`CamelAwsKMSDescription` |`String` |A key description to use while performing a createKey operation
+
+|`CamelAwsKMSKeyId` |`String` |The key Id 
+|=======================================================================
+
+#### KMS Producer operations
+
+Camel-AWS KMS component provides the following operation on the producer side:
+
+- listKeys
+- createKey
+- disableKey
+- scheduleKeyDeletion
+- describeKey
+- enableKey
+
+Dependencies
+
+Maven users will need to add the following dependency to their pom.xml.
+
+*pom.xml*
+
+[source,xml]
+---------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-aws-kms</artifactId>
+    <version>${camel-version}</version>
+</dependency>
+---------------------------------------
+
+where `${camel-version`} must be replaced by the actual version of Camel
+(2.16 or higher).
+
+### See Also
+
+* Configuring Camel
+* Component
+* Endpoint
+* Getting Started
+
+* AWS Component
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class
new file mode 100644
index 0000000..c7d4963
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponent.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class
new file mode 100644
index 0000000..144ae2e
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtension.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class
new file mode 100644
index 0000000..dbf1478
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConfiguration.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class
new file mode 100644
index 0000000..fb77aaf
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKConstants.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class
new file mode 100644
index 0000000..7fd6129
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKEndpoint.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class
new file mode 100644
index 0000000..16ec708
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKOperations.class differ
diff --git a/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class
new file mode 100644
index 0000000..6c8b42e
Binary files /dev/null and b/components/camel-aws-msk/bin/src/main/java/org/apache/camel/component/aws/kms/MSKProducer.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class
new file mode 100644
index 0000000..2a5517c
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/AmazonMKSClientMock.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class
new file mode 100644
index 0000000..5a6a3fe
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerSpringTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class
new file mode 100644
index 0000000..50605ab
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/KMSProducerTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class
new file mode 100644
index 0000000..0e6e223
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentConfigurationTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class
new file mode 100644
index 0000000..e161b81
Binary files /dev/null and b/components/camel-aws-msk/bin/src/test/java/org/apache/camel/component/aws/kms/MSKComponentVerifierExtensionTest.class differ
diff --git a/components/camel-aws-msk/bin/src/test/resources/log4j2.properties b/components/camel-aws-msk/bin/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..986f470
--- /dev/null
+++ b/components/camel-aws-msk/bin/src/test/resources/log4j2.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-aws-kms-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
diff --git a/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml b/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml
new file mode 100644
index 0000000..42eccfd
--- /dev/null
+++ b/components/camel-aws-msk/bin/src/test/resources/org/apache/camel/component/aws/kms/KMSComponentSpringTest-context.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:listKeys"/>
+            <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&amp;operation=listKeys"/>
+            <to uri="mock:result"/>
+        </route>
+        <route>
+            <from uri="direct:createKey"/>
+            <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&amp;operation=createKey"/>
+            <to uri="mock:result"/>
+        </route>
+        <route>
+            <from uri="direct:disableKey"/>
+            <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&amp;operation=disableKey"/>
+            <to uri="mock:result"/>
+        </route>
+        <route>
+            <from uri="direct:enableKey"/>
+            <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&amp;operation=enableKey"/>
+            <to uri="mock:result"/>
+        </route>
+        <route>
+            <from uri="direct:scheduleDelete"/>
+            <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&amp;operation=scheduleKeyDeletion"/>
+            <to uri="mock:result"/>
+        </route>
+        <route>
+            <from uri="direct:describeKey"/>
+            <to uri="aws-kms://Test?kmsClient=#amazonKmsClient&amp;operation=describeKey"/>
+            <to uri="mock:result"/>
+        </route>
+    </camelContext>
+
+    <bean id="amazonKmsClient" class="org.apache.camel.component.aws.kms.AmazonKMSClientMock"/>
+</beans>
\ No newline at end of file
diff --git a/components/camel-aws-msk/pom.xml b/components/camel-aws-msk/pom.xml
new file mode 100644
index 0000000..c29ca1d
--- /dev/null
+++ b/components/camel-aws-msk/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>3.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-aws-msk</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Camel :: AWS MSK</name>
+    <description>A Camel Amazon MSK Web Service Component</description>
+
+    <properties>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-kafka</artifactId>
+            <version>${aws-java-sdk-version}</version>
+        </dependency>
+
+        <!-- for testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-spring</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc b/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc
new file mode 100644
index 0000000..77f8a26
--- /dev/null
+++ b/components/camel-aws-msk/src/main/docs/aws-kms-component.adoc
@@ -0,0 +1,182 @@
+[[aws-kms-component]]
+== AWS KMS Component
+
+*Available as of Camel version 2.21*
+
+The KMS component supports create, run, start, stop and terminate
+https://aws.amazon.com/it/kms/[AWS KMS] instances.
+
+Prerequisites
+
+You must have a valid Amazon Web Services developer account, and be
+signed up to use Amazon KMS. More information are available at
+https://aws.amazon.com/it/mq/[Amazon KMS].
+
+### URI Format
+
+[source,java]
+-------------------------
+aws-kms://label[?options]
+-------------------------
+
+You can append query options to the URI in the following format,
+?options=value&option2=value&...
+
+### URI Options
+
+
+// component options: START
+The AWS KMS component supports 5 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *configuration* (advanced) | The AWS KMS default configuration |  | KMSConfiguration
+| *accessKey* (producer) | Amazon AWS Access Key |  | String
+| *secretKey* (producer) | Amazon AWS Secret Key |  | String
+| *region* (producer) | The region in which KMS client needs to work |  | String
+| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
+|===
+// component options: END
+
+
+
+
+// endpoint options: START
+The AWS KMS endpoint is configured using URI syntax:
+
+----
+aws-kms:label
+----
+
+with the following path and query parameters:
+
+==== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *label* | *Required* Logical name |  | String
+|===
+
+
+==== Query Parameters (8 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *accessKey* (producer) | Amazon AWS Access Key |  | String
+| *kmsClient* (producer) | To use a existing configured AWS KMS as client |  | AWSKMS
+| *operation* (producer) | *Required* The operation to perform |  | KMSOperations
+| *proxyHost* (producer) | To define a proxy host when instantiating the KMS client |  | String
+| *proxyPort* (producer) | To define a proxy port when instantiating the KMS client |  | Integer
+| *region* (producer) | The region in which KMS client needs to work |  | String
+| *secretKey* (producer) | Amazon AWS Secret Key |  | String
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+=== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+  <groupId>org.apache.camel</groupId>
+  <artifactId>camel-aws-kms-starter</artifactId>
+  <version>x.x.x</version>
+  <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 12 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.aws-kms.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-kms.configuration.access-key* | Amazon AWS Access Key |  | String
+| *camel.component.aws-kms.configuration.kms-client* | To use a existing configured AWS KMS as client |  | AWSKMS
+| *camel.component.aws-kms.configuration.operation* | The operation to perform |  | KMSOperations
+| *camel.component.aws-kms.configuration.proxy-host* | To define a proxy host when instantiating the KMS client |  | String
+| *camel.component.aws-kms.configuration.proxy-port* | To define a proxy port when instantiating the KMS client |  | Integer
+| *camel.component.aws-kms.configuration.region* | The region in which KMS client needs to work |  | String
+| *camel.component.aws-kms.configuration.secret-key* | Amazon AWS Secret Key |  | String
+| *camel.component.aws-kms.enabled* | Whether to enable auto configuration of the aws-kms component. This is enabled by default. |  | Boolean
+| *camel.component.aws-kms.region* | The region in which KMS client needs to work |  | String
+| *camel.component.aws-kms.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean
+| *camel.component.aws-kms.secret-key* | Amazon AWS Secret Key |  | String
+|===
+// spring-boot-auto-configure options: END
+
+
+
+
+Required KMS component options
+
+You have to provide the amazonKmsClient in the
+Registry or your accessKey and secretKey to access
+the https://aws.amazon.com/it/kms/[Amazon KMS] service.
+
+### Usage
+
+#### Message headers evaluated by the MQ producer
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Header |Type |Description
+
+|`CamelAwsKMSLimit` |`Integer` |The limit number of keys to return while performing a listKeys operation
+
+|`CamelAwsKMSOperation` |`String` |The operation we want to perform
+
+|`CamelAwsKMSDescription` |`String` |A key description to use while performing a createKey operation
+
+|`CamelAwsKMSKeyId` |`String` |The key Id 
+|=======================================================================
+
+#### KMS Producer operations
+
+Camel-AWS KMS component provides the following operation on the producer side:
+
+- listKeys
+- createKey
+- disableKey
+- scheduleKeyDeletion
+- describeKey
+- enableKey
+
+Dependencies
+
+Maven users will need to add the following dependency to their pom.xml.
+
+*pom.xml*
+
+[source,xml]
+---------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-aws-kms</artifactId>
+    <version>${camel-version}</version>
+</dependency>
+---------------------------------------
+
+where `${camel-version`} must be replaced by the actual version of Camel
+(2.16 or higher).
+
+### See Also
+
+* Configuring Camel
+* Component
+* Endpoint
+* Getting Started
+
+* AWS Component
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java
new file mode 100644
index 0000000..5b6086a
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponent.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * For working with Amazon KMS.
+ */
+@Component("aws-msk")
+public class MSKComponent extends DefaultComponent {
+
+    @Metadata
+    private String accessKey;
+    @Metadata
+    private String secretKey;
+    @Metadata
+    private String region;
+    @Metadata(label = "advanced")    
+    private MSKConfiguration configuration;
+    
+    public MSKComponent() {
+        this(null);
+    }
+    
+    public MSKComponent(CamelContext context) {
+        super(context);
+        
+        this.configuration = new MSKConfiguration();
+        registerExtension(new MSKComponentVerifierExtension());
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        MSKConfiguration configuration = this.configuration.copy();
+        setProperties(configuration, parameters);
+
+        if (ObjectHelper.isEmpty(configuration.getAccessKey())) {
+            setAccessKey(accessKey);
+        }
+        if (ObjectHelper.isEmpty(configuration.getSecretKey())) {
+            setSecretKey(secretKey);
+        }
+        if (ObjectHelper.isEmpty(configuration.getRegion())) {
+            setRegion(region);
+        }
+        if (configuration.getMskClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
+            throw new IllegalArgumentException("Amazon msk client or accessKey and secretKey must be specified");
+        }
+        
+        MSKEndpoint endpoint = new MSKEndpoint(uri, this, configuration);
+        return endpoint;
+    }
+    
+    public MSKConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The AWS KMS default configuration
+     */
+    public void setConfiguration(MSKConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public String getAccessKey() {
+        return configuration.getAccessKey();
+    }
+
+    /**
+     * Amazon AWS Access Key
+     */
+    public void setAccessKey(String accessKey) {
+        configuration.setAccessKey(accessKey);
+    }
+
+    public String getSecretKey() {
+        return configuration.getSecretKey();
+    }
+
+    /**
+     * Amazon AWS Secret Key
+     */
+    public void setSecretKey(String secretKey) {
+        configuration.setSecretKey(secretKey);
+    }
+    
+    public String getRegion() {
+        return configuration.getRegion();
+    }
+
+    /**
+     * The region in which KMS client needs to work
+     */
+    public void setRegion(String region) {
+        configuration.setRegion(region);
+    }
+
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java
new file mode 100644
index 0000000..bf1fa3b
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtension.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.Map;
+
+import com.amazonaws.SdkClientException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kafka.AWSKafka;
+import com.amazonaws.services.kafka.AWSKafkaClientBuilder;
+import com.amazonaws.services.kafka.model.ListClustersRequest;
+
+import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension;
+import org.apache.camel.component.extension.verifier.ResultBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorHelper;
+
+public class MSKComponentVerifierExtension extends DefaultComponentVerifierExtension {
+
+    public MSKComponentVerifierExtension() {
+        this("aws-msk");
+    }
+
+    public MSKComponentVerifierExtension(String scheme) {
+        super(scheme);
+    }
+
+    // *********************************
+    // Parameters validation
+    // *********************************
+
+    @Override
+    protected Result verifyParameters(Map<String, Object> parameters) {
+
+        ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.PARAMETERS).error(ResultErrorHelper.requiresOption("accessKey", parameters))
+            .error(ResultErrorHelper.requiresOption("secretKey", parameters)).error(ResultErrorHelper.requiresOption("region", parameters));
+
+        // Validate using the catalog
+
+        super.verifyParametersAgainstCatalog(builder, parameters);
+
+        return builder.build();
+    }
+
+    // *********************************
+    // Connectivity validation
+    // *********************************
+
+    @Override
+    protected Result verifyConnectivity(Map<String, Object> parameters) {
+        ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.CONNECTIVITY);
+
+        try {
+            MSKConfiguration configuration = setProperties(new MSKConfiguration(), parameters);
+            AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+            AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+            ListClustersRequest request = new ListClustersRequest();
+            AWSKafka client = AWSKafkaClientBuilder.standard().withCredentials(credentialsProvider).withRegion(Regions.valueOf(configuration.getRegion())).build();
+            client.listClusters(request);
+        } catch (SdkClientException e) {
+            ResultErrorBuilder errorBuilder = ResultErrorBuilder.withCodeAndDescription(VerificationError.StandardCode.AUTHENTICATION, e.getMessage())
+                .detail("aws_mks_exception_message", e.getMessage()).detail(VerificationError.ExceptionAttribute.EXCEPTION_CLASS, e.getClass().getName())
+                .detail(VerificationError.ExceptionAttribute.EXCEPTION_INSTANCE, e);
+
+            builder.error(errorBuilder.build());
+        } catch (Exception e) {
+            builder.error(ResultErrorBuilder.withException(e).build());
+        }
+        return builder.build();
+    }
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java
new file mode 100644
index 0000000..93e86bd
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConfiguration.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+import com.amazonaws.services.kafka.AWSKafka;
+
+@UriParams
+public class MSKConfiguration implements Cloneable {
+
+    @UriPath(description = "Logical name")
+    @Metadata(required = true)
+    private String label;
+    @UriParam(label = "producer")
+    private AWSKafka mskClient;
+    @UriParam(label = "producer", secret = true)
+    private String accessKey;
+    @UriParam(label = "producer", secret = true)
+    private String secretKey;
+    @UriParam(label = "producer")
+    @Metadata(required = true)
+    private MSKOperations operation;
+    @UriParam(label = "producer")
+    private String proxyHost;
+    @UriParam(label = "producer")
+    private Integer proxyPort;
+    @UriParam
+    private String region;
+
+    public AWSKafka getMskClient() {
+        return mskClient;
+    }
+
+    /**
+     * To use a existing configured AWS MSK as client
+     */
+    public void setMskClient(AWSKafka mskClient) {
+        this.mskClient = mskClient;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * Amazon AWS Access Key
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * Amazon AWS Secret Key
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public MSKOperations getOperation() {
+        return operation;
+    }
+
+    /**
+     * The operation to perform
+     */
+    public void setOperation(MSKOperations operation) {
+        this.operation = operation;
+    }
+
+    public String getProxyHost() {
+        return proxyHost;
+    }
+
+    /**
+     * To define a proxy host when instantiating the KMS client
+     */
+    public void setProxyHost(String proxyHost) {
+        this.proxyHost = proxyHost;
+    }
+
+    public Integer getProxyPort() {
+        return proxyPort;
+    }
+
+    /**
+     * To define a proxy port when instantiating the KMS client
+     */
+    public void setProxyPort(Integer proxyPort) {
+        this.proxyPort = proxyPort;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    /**
+     * The region in which KMS client needs to work
+     */
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    // *************************************************
+    //
+    // *************************************************
+
+    public MSKConfiguration copy() {
+        try {
+            return (MSKConfiguration)super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java
new file mode 100644
index 0000000..5f2c1e5
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKConstants.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+/**
+ * Constants used in Camel AWS MSK module
+ */
+public interface MSKConstants {
+    String OPERATION                         = "CamelAwsMSKOperation";
+    String CLUSTERS_FILTER                   = "CamelAwsMSKClusterFilter";
+    String CLUSTER_NAME                      = "CamelAwsMSKClusterName";
+    String CLUSTER_ARN                       = "CamelAwsMSKClusterArn";
+    String CLUSTER_KAFKA_VERSION             = "CamelAwsMSKClusterKafkaVersion";
+    String BROKER_NODES_NUMBER               = "CamelAwsMSKBrokerNodesNumber";
+    String BROKER_NODES_GROUP_INFO           = "CamelAwsMSKBrokerNodesGroupInfo";
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java
new file mode 100644
index 0000000..f49ab3f
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKEndpoint.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.kafka.AWSKafka;
+import com.amazonaws.services.kafka.AWSKafkaClientBuilder;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * The aws-kms is used for managing Amazon KMS
+ */
+@UriEndpoint(firstVersion = "3.0.0", scheme = "aws-msk", title = "AWS MSK", syntax = "aws-msk:label", producerOnly = true, label = "cloud,management")
+public class MSKEndpoint extends ScheduledPollEndpoint {
+
+    private AWSKafka mskClient;
+
+    @UriParam
+    private MSKConfiguration configuration;
+
+    public MSKEndpoint(String uri, Component component, MSKConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("You cannot receive messages from this endpoint");
+    }
+
+    public Producer createProducer() throws Exception {
+        return new MSKProducer(this);
+    }
+
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        mskClient = configuration.getMskClient() != null ? configuration.getMskClient() : createMSKClient();
+    }
+    
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(configuration.getMskClient())) {
+            if (mskClient != null) {
+                mskClient.shutdown();
+            }
+        }
+        super.doStop();
+    }
+
+    public MSKConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public AWSKafka getMskClient() {
+        return mskClient;
+    }
+
+    AWSKafka createMSKClient() {
+        AWSKafka client = null;
+        ClientConfiguration clientConfiguration = null;
+        AWSKafkaClientBuilder clientBuilder = null;
+        boolean isClientConfigFound = false;
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            clientConfiguration = new ClientConfiguration();
+            clientConfiguration.setProxyHost(configuration.getProxyHost());
+            clientConfiguration.setProxyPort(configuration.getProxyPort());
+            isClientConfigFound = true;
+        }
+        if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {
+            AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+            AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+            if (isClientConfigFound) {
+                clientBuilder = AWSKafkaClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider);
+            } else {
+                clientBuilder = AWSKafkaClientBuilder.standard().withCredentials(credentialsProvider);
+            }
+        } else {
+            if (isClientConfigFound) {
+                clientBuilder = AWSKafkaClientBuilder.standard();
+            } else {
+                clientBuilder = AWSKafkaClientBuilder.standard().withClientConfiguration(clientConfiguration);
+            }
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = clientBuilder.withRegion(configuration.getRegion());
+        }
+        client = clientBuilder.build();
+        return client;
+    }
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java
new file mode 100644
index 0000000..a06b4e7
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKOperations.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+public enum MSKOperations {
+
+    listClusters,
+    createCluster,
+    deleteCluster
+}
diff --git a/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java
new file mode 100644
index 0000000..b59b9e1
--- /dev/null
+++ b/components/camel-aws-msk/src/main/java/org/apache/camel/component/aws/msk/MSKProducer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.kafka.AWSKafka;
+import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo;
+import com.amazonaws.services.kafka.model.CreateClusterRequest;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterRequest;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersRequest;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+
+/**
+ * A Producer which sends messages to the Amazon MSK Service
+ * <a href="http://aws.amazon.com/msk/">AWS MSK</a>
+ */
+public class MSKProducer extends DefaultProducer {
+
+    private transient String mskProducerToString;
+
+    public MSKProducer(Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        switch (determineOperation(exchange)) {
+        case listClusters:
+            listClusters(getEndpoint().getMskClient(), exchange);
+            break;
+        case createCluster:
+            createCluster(getEndpoint().getMskClient(), exchange);
+            break;
+        case deleteCluster:
+            deleteCluster(getEndpoint().getMskClient(), exchange);
+            break;
+        default:
+            throw new IllegalArgumentException("Unsupported operation");
+        }
+    }
+
+    private MSKOperations determineOperation(Exchange exchange) {
+        MSKOperations operation = exchange.getIn().getHeader(MSKConstants.OPERATION, MSKOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    protected MSKConfiguration getConfiguration() {
+        return getEndpoint().getConfiguration();
+    }
+
+    @Override
+    public String toString() {
+        if (mskProducerToString == null) {
+            mskProducerToString = "MSKProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
+        }
+        return mskProducerToString;
+    }
+
+    @Override
+    public MSKEndpoint getEndpoint() {
+        return (MSKEndpoint)super.getEndpoint();
+    }
+
+    private void listClusters(AWSKafka mskClient, Exchange exchange) {
+        ListClustersRequest request = new ListClustersRequest();
+        if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTERS_FILTER))) {
+            String filter = exchange.getIn().getHeader(MSKConstants.CLUSTERS_FILTER, String.class);
+            request.withClusterNameFilter(filter);
+        }
+        ListClustersResult result;
+        try {
+            result = mskClient.listClusters(request);
+        } catch (AmazonServiceException ase) {
+            log.trace("List Clusters command returned the error code {}", ase.getErrorCode());
+            throw ase;
+        }
+        Message message = getMessageForResponse(exchange);
+        message.setBody(result);
+    }
+    
+    private void createCluster(AWSKafka mskClient, Exchange exchange) {
+        CreateClusterRequest request = new CreateClusterRequest();
+        if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_NAME))) {
+            String name = exchange.getIn().getHeader(MSKConstants.CLUSTER_NAME, String.class);
+            request.withClusterName(name);
+        } else {
+            throw new IllegalArgumentException("Cluster Name must be specified");
+        }
+        if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_KAFKA_VERSION))) {
+            String version = exchange.getIn().getHeader(MSKConstants.CLUSTER_KAFKA_VERSION, String.class);
+            request.withKafkaVersion(version);
+        } else {
+            throw new IllegalArgumentException("Kafka Version must be specified");
+        }
+        if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.BROKER_NODES_NUMBER))) {
+            Integer nodesNumber = exchange.getIn().getHeader(MSKConstants.BROKER_NODES_NUMBER, Integer.class);
+            request.withNumberOfBrokerNodes(nodesNumber);
+        } else {
+            throw new IllegalArgumentException("Kafka Version must be specified");
+        }
+        if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.BROKER_NODES_GROUP_INFO))) {
+        	BrokerNodeGroupInfo brokerNodesGroupInfo = exchange.getIn().getHeader(MSKConstants.BROKER_NODES_GROUP_INFO, BrokerNodeGroupInfo.class);
+			request.withBrokerNodeGroupInfo(brokerNodesGroupInfo);
+        } else {
+            throw new IllegalArgumentException("BrokerNodeGroupInfo must be specified");
+        }
+        CreateClusterResult result;
+        try {
+            result = mskClient.createCluster(request);
+        } catch (AmazonServiceException ase) {
+            log.trace("Create Cluster command returned the error code {}", ase.getErrorCode());
+            throw ase;
+        }
+        Message message = getMessageForResponse(exchange);
+        message.setBody(result);
+    }
+    
+    private void deleteCluster(AWSKafka mskClient, Exchange exchange) {
+        DeleteClusterRequest request = new DeleteClusterRequest();
+        if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(MSKConstants.CLUSTER_ARN))) {
+            String arn = exchange.getIn().getHeader(MSKConstants.CLUSTER_ARN, String.class);
+            request.withClusterArn(arn);
+        } else {
+            throw new IllegalArgumentException("Cluster ARN must be specified");
+        }
+        DeleteClusterResult result;
+        try {
+            result = mskClient.deleteCluster(request);
+        } catch (AmazonServiceException ase) {
+            log.trace("Delete Cluster command returned the error code {}", ase.getErrorCode());
+            throw ase;
+        }
+        Message message = getMessageForResponse(exchange);
+        message.setBody(result);
+    }
+    
+    public static Message getMessageForResponse(final Exchange exchange) {
+        if (exchange.getPattern().isOutCapable()) {
+            Message out = exchange.getOut();
+            out.copyFrom(exchange.getIn());
+            return out;
+        }
+        return exchange.getIn();
+    }
+}
\ No newline at end of file
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java
new file mode 100644
index 0000000..b70c979
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/AmazonMSKClientMock.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.amazonaws.services.kafka.AbstractAWSKafka;
+import com.amazonaws.services.kafka.model.ClusterInfo;
+import com.amazonaws.services.kafka.model.ClusterState;
+import com.amazonaws.services.kafka.model.CreateClusterRequest;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterRequest;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersRequest;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+
+public class AmazonMSKClientMock extends AbstractAWSKafka {
+
+    public AmazonMSKClientMock() {
+        super();
+    }
+
+    @Override
+    public ListClustersResult listClusters(ListClustersRequest request) {
+        ListClustersResult result = new ListClustersResult();
+        List<ClusterInfo> info = new ArrayList<ClusterInfo>();
+        ClusterInfo info1 = new ClusterInfo();
+        info1.setClusterName("test-kafka");
+        info.add(info1);
+        result.setClusterInfoList(info);
+        return result;
+    }
+    
+    @Override
+    public CreateClusterResult createCluster(CreateClusterRequest request) {
+        CreateClusterResult result = new CreateClusterResult();
+        result.setClusterName(request.getClusterName());
+        result.setState(ClusterState.CREATING.name());
+        return result;
+    }
+    
+    @Override
+    public DeleteClusterResult deleteCluster(DeleteClusterRequest request) {
+         DeleteClusterResult res = new DeleteClusterResult();
+         res.setClusterArn(request.getClusterArn());
+         res.setState(ClusterState.DELETING.name());
+         return res;
+    }
+}
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java
new file mode 100644
index 0000000..ec81a5e
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentConfigurationTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import com.amazonaws.regions.Regions;
+
+import org.apache.camel.component.aws.msk.MSKComponent;
+import org.apache.camel.component.aws.msk.MSKEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class MSKComponentConfigurationTest extends CamelTestSupport {
+
+    
+    @Test
+    public void createEndpointWithComponentElements() throws Exception {
+        MSKComponent component = new MSKComponent(context);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        MSKEndpoint endpoint = (MSKEndpoint)component.createEndpoint("aws-msk://label");
+        
+        assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
+        assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
+    }
+    
+    @Test
+    public void createEndpointWithComponentAndEndpointElements() throws Exception {
+        MSKComponent component = new MSKComponent(context);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        component.setRegion(Regions.US_WEST_1.toString());
+        MSKEndpoint endpoint = (MSKEndpoint)component.createEndpoint("aws-msk://label?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1");
+        
+        assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+        assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+    }
+    
+}
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java
new file mode 100644
index 0000000..cf57117
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKComponentVerifierExtensionTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Component;
+import org.apache.camel.component.aws.msk.MSKOperations;
+import org.apache.camel.component.extension.ComponentVerifierExtension;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MSKComponentVerifierExtensionTest extends CamelTestSupport {
+
+    // *************************************************
+    // Tests (parameters)
+    // *************************************************
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testParameters() throws Exception {
+        Component component = context().getComponent("aws-msk");
+
+        ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("secretKey", "l");
+        parameters.put("accessKey", "k");
+        parameters.put("region", "l");
+        parameters.put("label", "test");
+        parameters.put("operation", MSKOperations.listClusters);
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.PARAMETERS, parameters);
+
+        Assert.assertEquals(ComponentVerifierExtension.Result.Status.OK, result.getStatus());
+    }
+
+    @Test
+    public void testConnectivity() throws Exception {
+        Component component = context().getComponent("aws-msk");
+        ComponentVerifierExtension verifier = component.getExtension(ComponentVerifierExtension.class).orElseThrow(IllegalStateException::new);
+
+        Map<String, Object> parameters = new HashMap<>();
+        parameters.put("secretKey", "l");
+        parameters.put("accessKey", "k");
+        parameters.put("region", "US_EAST_1");
+        parameters.put("label", "test");
+        parameters.put("operation", MSKOperations.listClusters);
+
+        ComponentVerifierExtension.Result result = verifier.verify(ComponentVerifierExtension.Scope.CONNECTIVITY, parameters);
+
+        Assert.assertEquals(ComponentVerifierExtension.Result.Status.ERROR, result.getStatus());
+    }
+
+}
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java
new file mode 100644
index 0000000..50a26f2
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerSpringTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.aws.msk.MSKConstants;
+import org.apache.camel.component.aws.msk.MSKOperations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo;
+import com.amazonaws.services.kafka.model.ClusterState;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+public class MSKProducerSpringTest extends CamelSpringTestSupport {
+    
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+    
+    @Test
+    public void kmsListKeysTest() throws Exception {
+
+        mock.expectedMessageCount(1);
+        Exchange exchange = template.request("direct:listClusters", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.listClusters);
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        
+        ListClustersResult resultGet = (ListClustersResult) exchange.getIn().getBody();
+        assertEquals(1, resultGet.getClusterInfoList().size());
+        assertEquals("test-kafka", resultGet.getClusterInfoList().get(0).getClusterName());
+    }
+    
+    @Test
+    public void mskCreateClusterTest() throws Exception {
+
+        mock.expectedMessageCount(1);
+        Exchange exchange = template.request("direct:createCluster", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.createCluster);
+                exchange.getIn().setHeader(MSKConstants.CLUSTER_NAME, "test-kafka");
+                exchange.getIn().setHeader(MSKConstants.CLUSTER_KAFKA_VERSION, "2.1.1");
+                exchange.getIn().setHeader(MSKConstants.BROKER_NODES_NUMBER, 2);
+                BrokerNodeGroupInfo groupInfo = new BrokerNodeGroupInfo();
+                exchange.getIn().setHeader(MSKConstants.BROKER_NODES_GROUP_INFO, groupInfo);
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        
+        CreateClusterResult resultGet = (CreateClusterResult) exchange.getIn().getBody();
+        assertEquals("test-kafka", resultGet.getClusterName());
+        assertEquals(ClusterState.CREATING.name(), resultGet.getState());
+    }
+    
+    @Test
+    public void mskDeleteClusterTest() throws Exception {
+
+        mock.expectedMessageCount(1);
+        Exchange exchange = template.request("direct:deleteCluster", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.deleteCluster);
+                exchange.getIn().setHeader(MSKConstants.CLUSTER_ARN, "test-kafka");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        
+        DeleteClusterResult resultGet = (DeleteClusterResult) exchange.getIn().getBody();
+        assertEquals("test-kafka", resultGet.getClusterArn());
+        assertEquals(ClusterState.DELETING.name(), resultGet.getState());
+    }
+
+    @Override
+    protected ClassPathXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml");
+    }
+}
\ No newline at end of file
diff --git a/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java
new file mode 100644
index 0000000..23d3ada
--- /dev/null
+++ b/components/camel-aws-msk/src/test/java/org/apache/camel/component/aws/msk/MSKProducerTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.msk;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws.msk.MSKConstants;
+import org.apache.camel.component.aws.msk.MSKOperations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import com.amazonaws.services.kafka.model.BrokerNodeGroupInfo;
+import com.amazonaws.services.kafka.model.ClusterState;
+import com.amazonaws.services.kafka.model.CreateClusterResult;
+import com.amazonaws.services.kafka.model.DeleteClusterResult;
+import com.amazonaws.services.kafka.model.ListClustersResult;
+
+public class MSKProducerTest extends CamelTestSupport {
+    
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+    
+    @Test
+    public void kmsListClustersTest() throws Exception {
+
+        mock.expectedMessageCount(1);
+        Exchange exchange = template.request("direct:listClusters", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.listClusters);
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        
+        ListClustersResult resultGet = (ListClustersResult) exchange.getIn().getBody();
+        assertEquals(1, resultGet.getClusterInfoList().size());
+        assertEquals("test-kafka", resultGet.getClusterInfoList().get(0).getClusterName());
+    }
+    
+    @Test
+    public void mskCreateClusterTest() throws Exception {
+
+        mock.expectedMessageCount(1);
+        Exchange exchange = template.request("direct:createCluster", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.createCluster);
+                exchange.getIn().setHeader(MSKConstants.CLUSTER_NAME, "test-kafka");
+                exchange.getIn().setHeader(MSKConstants.CLUSTER_KAFKA_VERSION, "2.1.1");
+                exchange.getIn().setHeader(MSKConstants.BROKER_NODES_NUMBER, 2);
+                BrokerNodeGroupInfo groupInfo = new BrokerNodeGroupInfo();
+                exchange.getIn().setHeader(MSKConstants.BROKER_NODES_GROUP_INFO, groupInfo);
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        
+        CreateClusterResult resultGet = (CreateClusterResult) exchange.getIn().getBody();
+        assertEquals("test-kafka", resultGet.getClusterName());
+        assertEquals(ClusterState.CREATING.name(), resultGet.getState());
+    }
+    
+    @Test
+    public void mskDeleteClusterTest() throws Exception {
+
+        mock.expectedMessageCount(1);
+        Exchange exchange = template.request("direct:deleteCluster", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setHeader(MSKConstants.OPERATION, MSKOperations.deleteCluster);
+                exchange.getIn().setHeader(MSKConstants.CLUSTER_ARN, "test-kafka");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        
+        DeleteClusterResult resultGet = (DeleteClusterResult) exchange.getIn().getBody();
+        assertEquals("test-kafka", resultGet.getClusterArn());
+        assertEquals(ClusterState.DELETING.name(), resultGet.getState());
+    }
+    
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        
+        AmazonMSKClientMock clientMock = new AmazonMSKClientMock();
+        
+        registry.bind("amazonMskClient", clientMock);
+        
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:listClusters")
+                    .to("aws-msk://test?mskClient=#amazonMskClient&operation=listClusters")
+                    .to("mock:result");
+                from("direct:createCluster")
+                    .to("aws-msk://test?mskClient=#amazonMskClient&operation=createCluster")
+                    .to("mock:result");
+                from("direct:deleteCluster")
+                    .to("aws-msk://test?mskClient=#amazonMskClient&operation=deleteCluster")
+                    .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/components/camel-aws-msk/src/test/resources/log4j2.properties b/components/camel-aws-msk/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..986f470
--- /dev/null
+++ b/components/camel-aws-msk/src/test/resources/log4j2.properties
@@ -0,0 +1,28 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-aws-kms-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = file
diff --git a/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml b/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml
new file mode 100644
index 0000000..af87975
--- /dev/null
+++ b/components/camel-aws-msk/src/test/resources/org/apache/camel/component/aws/msk/MSKComponentSpringTest-context.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+        <route>
+            <from uri="direct:listClusters"/>
+            <to uri="aws-msk://Test?mskClient=#amazonMskClient&amp;operation=listClusters"/>
+            <to uri="mock:result"/>
+        </route>
+
+        <route>
+            <from uri="direct:createCluster"/>
+            <to uri="aws-msk://Test?mskClient=#amazonMskClient&amp;operation=createCluster"/>
+            <to uri="mock:result"/>
+        </route>
+        
+        <route>
+            <from uri="direct:deleteCluster"/>
+            <to uri="aws-msk://Test?mskClient=#amazonMskClient&amp;operation=deleteCluster"/>
+            <to uri="mock:result"/>
+        </route>
+    </camelContext>
+
+    <bean id="amazonMskClient" class="org.apache.camel.component.aws.msk.AmazonMSKClientMock"/>
+</beans>
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/pom.xml b/platforms/spring-boot/components-starter/pom.xml
index 2bc538c..a1dde11 100644
--- a/platforms/spring-boot/components-starter/pom.xml
+++ b/platforms/spring-boot/components-starter/pom.xml
@@ -124,6 +124,7 @@
     <module>camel-aws-kms-starter</module>
     <module>camel-aws-lambda-starter</module>
     <module>camel-aws-mq-starter</module>
+    <module>camel-aws-msk-starter</module>
     <module>camel-aws-s3-starter</module>
     <module>camel-aws-sdb-starter</module>
     <module>camel-aws-ses-starter</module>