You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zb...@apache.org on 2021/12/01 09:30:28 UTC

[camel-quarkus] branch main updated: :white_check_mark: Kafka Oauth Integration test with Strimzi and Keycloak (#3336)

This is an automated email from the ASF dual-hosted git repository.

zbendhiba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new cb9c920  :white_check_mark: Kafka Oauth Integration test with Strimzi and Keycloak (#3336)
cb9c920 is described below

commit cb9c92008820642be374fa136f9b53a4677777d6
Author: Zineb BENDHIBA <be...@gmail.com>
AuthorDate: Wed Dec 1 10:30:18 2021 +0100

    :white_check_mark: Kafka Oauth Integration test with Strimzi and Keycloak (#3336)
    
    Fixes #2872
---
 .../main/resources/META-INF/quarkus-extension.yaml |   3 +-
 integration-tests/kafka-oauth/pom.xml              | 209 ++++++++++++++
 .../quarkus/kafka/oauth/it/KafkaOauthResource.java |  36 +++
 .../camel/quarkus/kafka/oauth/it/Routes.java       |  41 +++
 .../src/main/resources/application.properties      |  50 ++++
 .../camel/quarkus/kafka/oauth/it/KafkaIT.java      |  23 ++
 .../kafka/oauth/it/KafkaKeycloakTestResource.java  |  66 +++++
 .../camel/quarkus/kafka/oauth/it/KafkaTest.java    |  39 +++
 .../kafka/oauth/it/container/KafkaContainer.java   | 106 +++++++
 .../oauth/it/container/KeycloakContainer.java      |  80 +++++
 .../src/test/resources/certificates/README.md      |  44 ++-
 .../test/resources/certificates/ca-truststore.p12  | Bin 0 -> 1639 bytes
 .../src/test/resources/certificates/gen-ca.sh      |  29 +-
 .../resources/certificates/gen-keycloak-certs.sh   |  31 ++
 .../certificates/keycloak.server.keystore.p12      | Bin 0 -> 5565 bytes
 .../src/test/resources/kafkaServer.properties      | 177 ++++++++++++
 .../keycloak/realms/kafka-authz-realm.json         | 321 +++++++++++++++++++++
 .../resources/keycloak/scripts/keycloak-ssl.cli    |  20 ++
 integration-tests/pom.xml                          |   1 +
 pom.xml                                            |   1 +
 tooling/scripts/test-categories.yaml               |   1 +
 21 files changed, 1247 insertions(+), 31 deletions(-)

diff --git a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
index a6a058f..6545559 100644
--- a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -24,9 +24,8 @@
 name: "Camel Jfr"
 description: "Diagnose Camel applications with Java Flight Recorder"
 metadata:
-  unlisted: true
   guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/jfr.html"
   categories:
   - "integration"
   status:
-  - "preview"
+  - "stable"
diff --git a/integration-tests/kafka-oauth/pom.xml b/integration-tests/kafka-oauth/pom.xml
new file mode 100644
index 0000000..7382b99
--- /dev/null
+++ b/integration-tests/kafka-oauth/pom.xml
@@ -0,0 +1,209 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.camel.quarkus</groupId>
+        <artifactId>camel-quarkus-build-parent-it</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+        <relativePath>../../poms/build-parent-it/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>camel-quarkus-integration-test-kafka-oauth</artifactId>
+    <name>Camel Quarkus :: Integration Tests :: Kafka Oauth</name>
+    <description>Integration tests for Camel Quarkus Kafka extension</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-log</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-timer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-seda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-resteasy-jackson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.strimzi</groupId>
+            <artifactId>kafka-oauth-client</artifactId>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-junit5</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.rest-assured</groupId>
+            <artifactId>rest-assured</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.strimzi</groupId>
+            <artifactId>strimzi-test-container</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>full</id>
+            <activation>
+                <property>
+                    <name>!quickly</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-surefire-plugin</artifactId>
+                        <configuration>
+                            <systemPropertyVariables>
+                                <!-- Configure the app to test to resolve "keycloak" hostname to the Docker hostname -->
+                                <jdk.net.hosts.file>target/hosts</jdk.net.hosts.file>
+                            </systemPropertyVariables>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>native</id>
+            <activation>
+                <property>
+                    <name>native</name>
+                </property>
+            </activation>
+            <properties>
+                <quarkus.package.type>native</quarkus.package.type>
+            </properties>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>integration-test</goal>
+                                    <goal>verify</goal>
+                                </goals>
+                                <configuration>
+                                    <rerunFailingTestsCount>${rerun.failing.test.count}</rerunFailingTestsCount>
+                                    <systemPropertyVariables>
+                                        <quarkus.test.arg-line>-Djdk.net.hosts.file=${basedir}/target/hosts</quarkus.test.arg-line>
+                                    </systemPropertyVariables>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>virtualDependencies</id>
+            <activation>
+                <property>
+                    <name>!noVirtualDependencies</name>
+                </property>
+            </activation>
+            <dependencies>
+                <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
+                <dependency>
+                    <groupId>org.apache.camel.quarkus</groupId>
+                    <artifactId>camel-quarkus-kafka-deployment</artifactId>
+                    <version>${project.version}</version>
+                    <type>pom</type>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>*</groupId>
+                            <artifactId>*</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.camel.quarkus</groupId>
+                    <artifactId>camel-quarkus-log-deployment</artifactId>
+                    <version>${project.version}</version>
+                    <type>pom</type>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>*</groupId>
+                            <artifactId>*</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.camel.quarkus</groupId>
+                    <artifactId>camel-quarkus-seda-deployment</artifactId>
+                    <version>${project.version}</version>
+                    <type>pom</type>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>*</groupId>
+                            <artifactId>*</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.camel.quarkus</groupId>
+                    <artifactId>camel-quarkus-timer-deployment</artifactId>
+                    <version>${project.version}</version>
+                    <type>pom</type>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>*</groupId>
+                            <artifactId>*</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaOauthResource.java b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaOauthResource.java
new file mode 100644
index 0000000..e11ba83
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaOauthResource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+
+import org.apache.camel.ConsumerTemplate;
+
+@ApplicationScoped
+@Path("kafka-oauth")
+public class KafkaOauthResource {
+    @Inject
+    ConsumerTemplate consumerTemplate;
+
+    @GET
+    public String getMessages() {
+        return consumerTemplate.receiveBody("seda:kafka-messages", 10000, String.class);
+    }
+}
diff --git a/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/Routes.java b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/Routes.java
new file mode 100644
index 0000000..8b3de8f
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/Routes.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import org.apache.camel.builder.RouteBuilder;
+
+@ApplicationScoped
+public class Routes extends RouteBuilder {
+
+    @Override
+    public void configure() throws Exception {
+        // produces messages to kafka
+        from("timer:foo?period={{timer.period}}&delay={{timer.delay}}")
+                .routeId("FromTimer2Kafka")
+                .setBody().simple("Message #${exchangeProperty.CamelTimerCounter}")
+                .to("kafka:{{kafka.topic.name}}")
+                .log("Message sent correctly to the topic! : \"${body}\" ");
+
+        // kafka consumer
+        from("kafka:{{kafka.topic.name}}")
+                .routeId("FromKafka2Seda")
+                .log("Received : \"${body}\"")
+                .to("seda:kafka-messages");
+    }
+}
diff --git a/integration-tests/kafka-oauth/src/main/resources/application.properties b/integration-tests/kafka-oauth/src/main/resources/application.properties
new file mode 100644
index 0000000..315c071
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/main/resources/application.properties
@@ -0,0 +1,50 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+
+#Kafka topic Name
+kafka.topic.name=test
+
+# How often should the messages be generated and pushed to Kafka Topic
+timer.period = 100
+timer.delay = 100
+
+camel.component.kafka.security-protocol = SASL_PLAINTEXT
+camel.component.kafka.sasl-mechanism = OAUTHBEARER
+camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+  oauth.client.id="kafka-client" \
+  oauth.client.secret="kafka-client-secret" \
+  oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
+camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
+
+# enable health check
+quarkus.kafka.health.enabled=true
+
+# using QuarkusTestResourceLifecycleManager in this test: Kafka configuration needs to be tuned to work with Keycloak
+quarkus.kafka.devservices.enabled=false
+
+
+#
+# Quarkus - Log
+#
+quarkus.log.category."org.apache.camel.quarkus.core.deployment".level = INFO
+quarkus.log.category."org.apache.camel.quarkus.component.kafka".level = DEBUG
+quarkus.log.category."org.apache.zookeeper".level = WARNING
+quarkus.log.category."org.apache.kafka".level = WARNING
+
+%quiet.quarkus.log.category."kafka".level = WARNING
+%quiet.quarkus.log.category."kafka.log".level = FATAL
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaIT.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaIT.java
new file mode 100644
index 0000000..7a5062a
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaIT.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import io.quarkus.test.junit.NativeImageTest;
+
+@NativeImageTest
+public class KafkaIT extends KafkaTest {
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaKeycloakTestResource.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaKeycloakTestResource.java
new file mode 100644
index 0000000..549ddb7
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaKeycloakTestResource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.apache.camel.quarkus.kafka.oauth.it.container.KafkaContainer;
+import org.apache.camel.quarkus.kafka.oauth.it.container.KeycloakContainer;
+import org.jboss.logging.Logger;
+
+/**
+ * Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
+ */
+public class KafkaKeycloakTestResource implements QuarkusTestResourceLifecycleManager {
+
+    private static final Logger log = Logger.getLogger(KafkaKeycloakTestResource.class);
+    private KafkaContainer kafka;
+    private KeycloakContainer keycloak;
+
+    @Override
+    public Map<String, String> start() {
+
+        Map<String, String> properties = new HashMap<>();
+
+        //Start keycloak container
+        keycloak = new KeycloakContainer();
+        keycloak.start();
+        log.info(keycloak.getLogs());
+        keycloak.createHostsFile();
+
+        //Start kafka container
+        kafka = new KafkaContainer();
+        kafka.start();
+        log.info(kafka.getLogs());
+        properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
+        properties.put("camel.component.kafka.brokers", kafka.getBootstrapServers());
+
+        return properties;
+    }
+
+    @Override
+    public void stop() {
+        if (kafka != null) {
+            kafka.stop();
+        }
+        if (keycloak != null) {
+            keycloak.stop();
+        }
+    }
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaTest.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaTest.java
new file mode 100644
index 0000000..a858d76
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import java.util.concurrent.TimeUnit;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+
+@QuarkusTest
+@QuarkusTestResource(KafkaKeycloakTestResource.class)
+public class KafkaTest {
+
+    @Test
+    public void testKafka() {
+        await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            String message = RestAssured.get("/kafka-oauth").asString();
+            return message != null && message.contains("Message #");
+        });
+    }
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KafkaContainer.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KafkaContainer.java
new file mode 100644
index 0000000..31885bf
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KafkaContainer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it.container;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import io.strimzi.StrimziKafkaContainer;
+import org.jboss.logging.Logger;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.MountableFile;
+
+/**
+ * Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
+ */
+public class KafkaContainer extends FixedHostPortGenericContainer<KafkaContainer> {
+
+    private static final Logger LOGGER = Logger.getLogger(KafkaContainer.class);
+
+    private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
+    private static final int KAFKA_PORT = 9092;
+    private static final String LATEST_KAFKA_VERSION;
+
+    private static final List<String> supportedKafkaVersions = new ArrayList<>(3);
+
+    static {
+        InputStream inputStream = StrimziKafkaContainer.class.getResourceAsStream("/kafka-versions.txt");
+        InputStreamReader streamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+
+        try (BufferedReader bufferedReader = new BufferedReader(streamReader)) {
+            String kafkaVersion;
+            while ((kafkaVersion = bufferedReader.readLine()) != null) {
+                supportedKafkaVersions.add(kafkaVersion);
+            }
+        } catch (IOException e) {
+            LOGGER.error("Unable to load the supported Kafka versions", e);
+        }
+
+        // sort kafka version from low to high
+        Collections.sort(supportedKafkaVersions);
+
+        LATEST_KAFKA_VERSION = supportedKafkaVersions.get(supportedKafkaVersions.size() - 1);
+    }
+
+    public KafkaContainer() {
+        super("quay.io/strimzi/kafka:" + "latest-kafka-" + LATEST_KAFKA_VERSION);
+
+        withExposedPorts(KAFKA_PORT);
+        withFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+        withCopyFileToContainer(MountableFile.forClasspathResource("kafkaServer.properties"),
+                "/opt/kafka/config/server.properties");
+        waitingFor(Wait.forLogMessage(".*Kafka startTimeMs:.*", 1));
+        withNetwork(Network.SHARED);
+        withNetworkAliases("kafka");
+        withEnv("LOG_DIR", "/tmp");
+    }
+
+    @Override
+    protected void doStart() {
+        // we need it for the startZookeeper(); and startKafka(); to run container before...
+        withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
+        super.doStart();
+    }
+
+    @Override
+    protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
+        super.containerIsStarting(containerInfo, reused);
+        LOGGER.info("Kafka servers :: " + getBootstrapServers());
+        String command = "#!/bin/bash \n";
+        command += "bin/zookeeper-server-start.sh ./config/zookeeper.properties &\n";
+        command += "export CLASSPATH=\"/opt/kafka/libs/strimzi/*:$CLASSPATH\" \n";
+        command += "bin/kafka-server-start.sh ./config/server.properties" +
+                " --override listeners=JWT://:" + KAFKA_PORT +
+                " --override advertised.listeners=" + getBootstrapServers();
+        copyFileToContainer(Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), STARTER_SCRIPT);
+    }
+
+    public String getBootstrapServers() {
+        return String.format("JWT://%s:%s", getHost(), KAFKA_PORT);
+    }
+
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KeycloakContainer.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KeycloakContainer.java
new file mode 100644
index 0000000..0b46077
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KeycloakContainer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it.container;
+
+import java.io.FileWriter;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.MountableFile;
+
+/**
+ * Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
+ */
+public class KeycloakContainer extends FixedHostPortGenericContainer<KeycloakContainer> {
+
+    public KeycloakContainer() {
+        super("quay.io/keycloak/keycloak:15.0.2");
+        withExposedPorts(8443);
+        withFixedExposedPort(8080, 8080);
+        withEnv("KEYCLOAK_USER", "admin");
+        withEnv("KEYCLOAK_PASSWORD", "admin");
+        withEnv("KEYCLOAK_HTTPS_PORT", "8443");
+        withEnv("PROXY_ADDRESS_FORWARDING", "true");
+        withEnv("KEYCLOAK_IMPORT", "/opt/jboss/keycloak/realms/kafka-authz-realm.json");
+        waitingFor(Wait.forLogMessage(".*WFLYSRV0025.*", 1));
+        withNetwork(Network.SHARED);
+        withNetworkAliases("keycloak");
+        withCreateContainerCmdModifier(cmd -> {
+            cmd.withEntrypoint("");
+            cmd.withCmd("/bin/bash", "-c", "cd /opt/jboss/keycloak " +
+                    "&& bin/jboss-cli.sh --file=ssl/keycloak-ssl.cli " +
+                    "&& rm -rf standalone/configuration/standalone_xml_history/current " +
+                    "&& cd .. " +
+                    "&& /opt/jboss/tools/docker-entrypoint.sh -Dkeycloak.profile.feature.upload_scripts=enabled -b 0.0.0.0");
+        });
+    }
+
+    @Override
+    protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
+        super.containerIsStarting(containerInfo);
+        copyFileToContainer(MountableFile.forClasspathResource("certificates/ca-truststore.p12"),
+                "/opt/jboss/keycloak/standalone/configuration/certs/ca-truststore.p12");
+        copyFileToContainer(MountableFile.forClasspathResource("certificates/keycloak.server.keystore.p12"),
+                "/opt/jboss/keycloak/standalone/configuration/certs/keycloak.server.keystore.p12");
+        copyFileToContainer(MountableFile.forClasspathResource("keycloak/scripts/keycloak-ssl.cli"),
+                "/opt/jboss/keycloak/ssl/keycloak-ssl.cli");
+        copyFileToContainer(MountableFile.forClasspathResource("keycloak/realms/kafka-authz-realm.json"),
+                "/opt/jboss/keycloak/realms/kafka-authz-realm.json");
+    }
+
+    public void createHostsFile() {
+        try (FileWriter fileWriter = new FileWriter("target/hosts")) {
+            String dockerHost = this.getHost();
+            if ("localhost".equals(dockerHost)) {
+                fileWriter.write("127.0.0.1 keycloak");
+            } else {
+                fileWriter.write(dockerHost + " keycloak");
+            }
+            fileWriter.flush();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/integration-tests/kafka-oauth/src/test/resources/certificates/README.md
similarity index 56%
copy from extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
copy to integration-tests/kafka-oauth/src/test/resources/certificates/README.md
index a6a058f..5056a5b 100644
--- a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/integration-tests/kafka-oauth/src/test/resources/certificates/README.md
@@ -1,3 +1,4 @@
+#!/bin/sh
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -15,18 +16,33 @@
 # limitations under the License.
 #
 
-# This is a generated file. Do not edit directly!
-# To re-generate, run the following command from the top level directory:
+## 
+# Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
 #
-#   mvn -N cq:update-quarkus-metadata
-#
----
-name: "Camel Jfr"
-description: "Diagnose Camel applications with Java Flight Recorder"
-metadata:
-  unlisted: true
-  guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/jfr.html"
-  categories:
-  - "integration"
-  status:
-  - "preview"
+# Generating the certificates and keystore
+
+## Creating a self-signed CA certificate and truststore
+
+```bash
+./gen-ca.sh
+```
+
+This creates `crt.ca` and adds the certificate to the keystore `ca-truststore.p12`.
+
+## Creating a server certificate and add it to keystore
+
+```bash
+./gen-keycloak-certs.sh
+```
+
+This creates server certificate for Keycloak, signs it and adds it to keystore `keycloak.server.keystore.p12`.
+
+## Cleanup
+
+```bash
+rm ca.srl
+rm ca.crt
+rm ca.key
+rm cert-file
+rm cert-signed
+```
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/resources/certificates/ca-truststore.p12 b/integration-tests/kafka-oauth/src/test/resources/certificates/ca-truststore.p12
new file mode 100644
index 0000000..f3fdb05
Binary files /dev/null and b/integration-tests/kafka-oauth/src/test/resources/certificates/ca-truststore.p12 differ
diff --git a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-ca.sh
old mode 100644
new mode 100755
similarity index 64%
copy from extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
copy to integration-tests/kafka-oauth/src/test/resources/certificates/gen-ca.sh
index a6a058f..c5c5af3
--- a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-ca.sh
@@ -1,3 +1,4 @@
+#!/bin/sh
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -15,18 +16,16 @@
 # limitations under the License.
 #
 
-# This is a generated file. Do not edit directly!
-# To re-generate, run the following command from the top level directory:
-#
-#   mvn -N cq:update-quarkus-metadata
-#
----
-name: "Camel Jfr"
-description: "Diagnose Camel applications with Java Flight Recorder"
-metadata:
-  unlisted: true
-  guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/jfr.html"
-  categories:
-  - "integration"
-  status:
-  - "preview"
+set -e
+
+# create CA key
+openssl genrsa -out ca.key 4096
+
+# create CA certificate
+openssl req -x509 -new -nodes -sha256 -days 3650 -subj "/CN=quarkus.io" -key ca.key -out ca.crt
+
+
+PASSWORD=changeit
+
+# create p12 truststore
+keytool -keystore ca-truststore.p12 -storetype pkcs12 -alias ca -storepass $PASSWORD -keypass $PASSWORD -import -file ca.crt -noprompt
diff --git a/integration-tests/kafka-oauth/src/test/resources/certificates/gen-keycloak-certs.sh b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-keycloak-certs.sh
new file mode 100755
index 0000000..2b921ea
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-keycloak-certs.sh
@@ -0,0 +1,31 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+set -e
+
+PASSWORD=changeit
+
+echo "#### Create server certificate for Keycloak"
+keytool -keystore keycloak.server.keystore.p12 -storetype pkcs12 -keyalg RSA -alias keycloak -validity 3650 -genkey -storepass $PASSWORD -keypass $PASSWORD -dname CN=keycloak -ext SAN=DNS:keycloak
+
+echo "#### Sign server certificate (export, sign, add signed to keystore)"
+keytool -keystore keycloak.server.keystore.p12 -storetype pkcs12 -alias keycloak -storepass $PASSWORD -keypass $PASSWORD -certreq -file cert-file
+openssl x509 -req -CA ca.crt -CAkey ca.key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:$PASSWORD
+keytool -keystore keycloak.server.keystore.p12 -alias CARoot -storepass $PASSWORD -keypass $PASSWORD -import -file ca.crt -noprompt
+keytool -keystore keycloak.server.keystore.p12 -alias keycloak -storepass $PASSWORD -keypass $PASSWORD -import -file cert-signed -noprompt
diff --git a/integration-tests/kafka-oauth/src/test/resources/certificates/keycloak.server.keystore.p12 b/integration-tests/kafka-oauth/src/test/resources/certificates/keycloak.server.keystore.p12
new file mode 100644
index 0000000..406c335
Binary files /dev/null and b/integration-tests/kafka-oauth/src/test/resources/certificates/keycloak.server.keystore.p12 differ
diff --git a/integration-tests/kafka-oauth/src/test/resources/kafkaServer.properties b/integration-tests/kafka-oauth/src/test/resources/kafkaServer.properties
new file mode 100644
index 0000000..efc0bb8
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/kafkaServer.properties
@@ -0,0 +1,177 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=1
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = listener_name://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
+listeners=JWT://:9092
+#advertised.listeners=SASL_PLAINTEXT://localhost:9092
+
+
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+#advertised.listeners=SASL_PLAINTEXT://localhost:9092
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+listener.security.protocol.map=JWT:SASL_PLAINTEXT
+
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+inter.broker.listener.name=JWT
+
+
+#### SASL ####
+
+sasl.enabled.mechanisms=OAUTHBEARER
+
+sasl.mechanism.inter.broker.protocol=OAUTHBEARER
+
+oauth.username.claim=preferred_username
+principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
+
+listener.name.jwt.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
+listener.name.jwt.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+     oauth.jwks.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs" \
+     oauth.valid.issuer.uri="http://keycloak:8080/auth/realms/kafka-authz" \
+     oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" \
+     oauth.client.id="kafka" \
+     oauth.client.secret="kafka-secret";
+
+listener.name.jwt.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
+listener.name.jwt.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
+listener.name.jwt.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+     oauth.jwks.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs" \
+     oauth.valid.issuer.uri="http://keycloak:8080/auth/realms/kafka-authz" \
+     oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" \
+     oauth.client.id="kafka" \
+     oauth.client.secret="kafka-secret" \
+     unsecuredLoginStringClaim_sub="admin";
+
+listener.name.jwt.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=45000
+
+
+############################# Group Coordinator Settings #############################
+
+# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
+# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
+# The default value for this is 3 seconds.
+# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
+# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
+group.initial.rebalance.delay.ms=0
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/resources/keycloak/realms/kafka-authz-realm.json b/integration-tests/kafka-oauth/src/test/resources/keycloak/realms/kafka-authz-realm.json
new file mode 100644
index 0000000..cbd2d87
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/keycloak/realms/kafka-authz-realm.json
@@ -0,0 +1,321 @@
+{
+  "realm": "kafka-authz",
+  "accessTokenLifespan": 300,
+  "ssoSessionIdleTimeout": 864000,
+  "ssoSessionMaxLifespan": 864000,
+  "enabled": true,
+  "sslRequired": "external",
+  "roles": {
+    "realm": [],
+    "client": {
+      "kafka-cli": [],
+      "kafka": [
+        {
+          "name": "uma_protection",
+          "clientRole": true
+        },
+        {
+          "name": "kafka-user",
+          "clientRole": true
+        }
+      ]
+    }
+  },
+  "groups" : [],
+  "users": [
+    {
+      "username": "service-account-kafka-producer-client",
+      "enabled": true,
+      "realmRoles" : [ "offline_access" ],
+      "email": "service-account-kafka-producer-client@placeholder.org",
+      "serviceAccountClientId": "kafka-producer-client"
+    },
+    {
+      "username": "service-account-kafka-consumer-client",
+      "enabled": true,
+      "realmRoles" : [ "offline_access" ],
+      "email": "service-account-kafka-consumer-client@placeholder.org",
+      "serviceAccountClientId": "kafka-consumer-client"
+    }
+  ],
+  "clients": [
+    {
+      "clientId": "kafka",
+      "enabled": true,
+      "clientAuthenticatorType": "client-secret",
+      "secret": "kafka-secret",
+      "bearerOnly": false,
+      "consentRequired": false,
+      "standardFlowEnabled": false,
+      "implicitFlowEnabled": false,
+      "directAccessGrantsEnabled": true,
+      "serviceAccountsEnabled": true,
+      "authorizationServicesEnabled": true,
+      "publicClient": false,
+      "fullScopeAllowed": true,
+      "protocolMappers": [
+        {
+          "name": "kafka audience",
+          "protocol": "openid-connect",
+          "protocolMapper": "oidc-audience-mapper",
+          "consentRequired": false,
+          "config": {
+            "included.client.audience": "kafka",
+            "id.token.claim": "false",
+            "access.token.claim": "true"
+          }
+        }
+      ],
+      "authorizationSettings": {
+        "allowRemoteResourceManagement": true,
+        "policyEnforcementMode": "ENFORCING",
+        "resources": [
+          {
+            "name": "Group:*",
+            "type": "Group",
+            "ownerManagedAccess": false,
+            "displayName": "Any group",
+            "attributes": {},
+            "uris": [],
+            "scopes": [
+              {
+                "name": "Describe"
+              },
+              {
+                "name": "Read"
+              },
+              {
+                "name": "DescribeConfigs"
+              },
+              {
+                "name": "AlterConfigs"
+              }
+            ]
+          },
+          {
+            "name": "Topic:*",
+            "type": "Topic",
+            "ownerManagedAccess": false,
+            "displayName": "Any topic",
+            "attributes": {},
+            "uris": [],
+            "scopes": [
+              {
+                "name": "Create"
+              },
+              {
+                "name": "Delete"
+              },
+              {
+                "name": "Describe"
+              },
+              {
+                "name": "Write"
+              },
+              {
+                "name": "Read"
+              },
+              {
+                "name": "Alter"
+              },
+              {
+                "name": "DescribeConfigs"
+              },
+              {
+                "name": "AlterConfigs"
+              }
+            ]
+          },
+          {
+            "name" : "Cluster:*",
+            "type" : "Cluster",
+            "ownerManagedAccess" : false,
+            "attributes" : { },
+            "uris" : [ ]
+          },
+          {
+            "name": "Topic:messages",
+            "type": "topic",
+            "scopes": [
+              {
+                "name": "Delete"
+              },
+              {
+                "name": "Describe"
+              },
+              {
+                "name": "Create"
+              },
+              {
+                "name": "Write"
+              },
+              {
+                "name": "Alter"
+              },
+              {
+                "name": "Read"
+              },
+              {
+                "name": "DescribeConfigs"
+              },
+              {
+                "name": "AlterConfigs"
+              }
+            ]
+          }
+        ],
+        "policies": [
+          {
+            "name": "Producer Client",
+            "type": "client",
+            "logic": "POSITIVE",
+            "decisionStrategy": "UNANIMOUS",
+            "config": {
+              "clients": "[\"kafka-producer-client\", \"kafka-client\"]"
+            }
+          },
+          {
+            "name": "Consumer Client",
+            "type": "client",
+            "logic": "POSITIVE",
+            "decisionStrategy": "UNANIMOUS",
+            "config": {
+              "clients": "[\"kafka-consumer-client\", \"kafka-client\"]"
+            }
+          },
+          {
+            "name": "Producer Client can write to topic 'messages'",
+            "type": "scope",
+            "logic": "POSITIVE",
+            "decisionStrategy": "UNANIMOUS",
+            "config": {
+              "resources": "[\"Topic:messages\"]",
+              "scopes": "[\"Delete\",\"Describe\",\"Create\",\"Write\"]",
+              "applyPolicies": "[\"Producer Client\"]"
+            }
+          },
+          {
+            "name": "Consumer Client can read from topic 'messages'",
+            "type": "scope",
+            "logic": "POSITIVE",
+            "decisionStrategy": "UNANIMOUS",
+            "config": {
+              "resources": "[\"Topic:messages\"]",
+              "scopes": "[\"Describe\",\"Read\"]",
+              "applyPolicies": "[\"Consumer Client\"]"
+            }
+          },
+          {
+            "name": "Consumer Client can use any group",
+            "type": "scope",
+            "logic": "POSITIVE",
+            "decisionStrategy": "UNANIMOUS",
+            "config": {
+              "resources": "[\"Group:*\"]",
+              "scopes": "[\"Describe\",\"Write\",\"Read\"]",
+              "applyPolicies": "[\"Consumer Client\"]"
+            }
+          }
+        ],
+        "scopes": [
+          {
+            "name": "Create"
+          },
+          {
+            "name": "Read"
+          },
+          {
+            "name": "Write"
+          },
+          {
+            "name": "Delete"
+          },
+          {
+            "name": "Alter"
+          },
+          {
+            "name": "Describe"
+          },
+          {
+            "name": "ClusterAction"
+          },
+          {
+            "name": "DescribeConfigs"
+          },
+          {
+            "name": "AlterConfigs"
+          },
+          {
+            "name": "IdempotentWrite"
+          }
+        ],
+        "decisionStrategy": "AFFIRMATIVE"
+      }
+    },
+    {
+      "clientId": "kafka-cli",
+      "enabled": true,
+      "clientAuthenticatorType": "client-secret",
+      "secret": "kafka-cli-secret",
+      "bearerOnly": false,
+      "consentRequired": false,
+      "standardFlowEnabled": false,
+      "implicitFlowEnabled": false,
+      "directAccessGrantsEnabled": true,
+      "serviceAccountsEnabled": false,
+      "publicClient": true,
+      "fullScopeAllowed": true
+    },
+    {
+      "clientId": "kafka-producer-client",
+      "enabled": true,
+      "clientAuthenticatorType": "client-secret",
+      "secret": "kafka-producer-client-secret",
+      "publicClient": false,
+      "bearerOnly": false,
+      "standardFlowEnabled": false,
+      "implicitFlowEnabled": false,
+      "directAccessGrantsEnabled": true,
+      "serviceAccountsEnabled": true,
+      "consentRequired" : false,
+      "fullScopeAllowed" : false,
+      "attributes": {
+        "access.token.lifespan": "36000"
+      }
+    },
+    {
+      "clientId": "kafka-consumer-client",
+      "enabled": true,
+      "clientAuthenticatorType": "client-secret",
+      "secret": "kafka-consumer-client-secret",
+      "publicClient": false,
+      "bearerOnly": false,
+      "standardFlowEnabled": false,
+      "implicitFlowEnabled": false,
+      "directAccessGrantsEnabled": true,
+      "serviceAccountsEnabled": true,
+      "consentRequired" : false,
+      "fullScopeAllowed" : false,
+      "attributes": {
+        "access.token.lifespan": "36000"
+      }
+    },
+    {
+      "clientId": "kafka-client",
+      "enabled": true,
+      "clientAuthenticatorType": "client-secret",
+      "secret": "kafka-client-secret",
+      "publicClient": false,
+      "bearerOnly": false,
+      "standardFlowEnabled": false,
+      "implicitFlowEnabled": false,
+      "directAccessGrantsEnabled": true,
+      "serviceAccountsEnabled": true,
+      "consentRequired" : false,
+      "fullScopeAllowed" : false,
+      "attributes": {
+        "access.token.lifespan": "36000"
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/resources/keycloak/scripts/keycloak-ssl.cli b/integration-tests/kafka-oauth/src/test/resources/keycloak/scripts/keycloak-ssl.cli
new file mode 100644
index 0000000..405f1c8
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/keycloak/scripts/keycloak-ssl.cli
@@ -0,0 +1,20 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+embed-server --server-config=standalone-ha.xml
+/core-service=management/security-realm=UndertowRealm:add()
+/core-service=management/security-realm=UndertowRealm/server-identity=ssl:add(keystore-path=certs/keycloak.server.keystore.p12, keystore-relative-to=jboss.server.config.dir, keystore-password=changeit)
+/subsystem=undertow/server=default-server/https-listener=https:write-attribute(name=security-realm, value=UndertowRealm)
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 3393e3a..883adb3 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -131,6 +131,7 @@
         <module>jsonpath</module>
         <module>jta</module>
         <module>kafka</module>
+        <module>kafka-oauth</module>
         <module>kafka-sasl</module>
         <module>kafka-sasl-ssl</module>
         <module>kafka-ssl</module>
diff --git a/pom.xml b/pom.xml
index 5f2cff6..cd15b4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -511,6 +511,7 @@
                             <exclude>**/k8s-sb/**</exclude>
                         </excludes>
                         <mapping>
+                            <cli>CAMEL_PROPERTIES_STYLE</cli>
                             <groovy>SLASHSTAR_STYLE</groovy>
                             <java>SLASHSTAR_STYLE</java>
                             <Jenkinsfile>SLASHSTAR_STYLE</Jenkinsfile>
diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml
index d13ec86..8b643d3 100644
--- a/tooling/scripts/test-categories.yaml
+++ b/tooling/scripts/test-categories.yaml
@@ -39,6 +39,7 @@ group-02:
   - jackson-avro
   - jackson-protobuf
   - jfr
+  - kafka-oauth
   - oaipmh
   - pubnub
   - protobuf