You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zb...@apache.org on 2021/12/01 09:30:28 UTC
[camel-quarkus] branch main updated: :white_check_mark: Kafka Oauth Integration test with Strimzi and Keycloak (#3336)
This is an automated email from the ASF dual-hosted git repository.
zbendhiba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new cb9c920 :white_check_mark: Kafka Oauth Integration test with Strimzi and Keycloak (#3336)
cb9c920 is described below
commit cb9c92008820642be374fa136f9b53a4677777d6
Author: Zineb BENDHIBA <be...@gmail.com>
AuthorDate: Wed Dec 1 10:30:18 2021 +0100
:white_check_mark: Kafka Oauth Integration test with Strimzi and Keycloak (#3336)
Fixes #2872
---
.../main/resources/META-INF/quarkus-extension.yaml | 3 +-
integration-tests/kafka-oauth/pom.xml | 209 ++++++++++++++
.../quarkus/kafka/oauth/it/KafkaOauthResource.java | 36 +++
.../camel/quarkus/kafka/oauth/it/Routes.java | 41 +++
.../src/main/resources/application.properties | 50 ++++
.../camel/quarkus/kafka/oauth/it/KafkaIT.java | 23 ++
.../kafka/oauth/it/KafkaKeycloakTestResource.java | 66 +++++
.../camel/quarkus/kafka/oauth/it/KafkaTest.java | 39 +++
.../kafka/oauth/it/container/KafkaContainer.java | 106 +++++++
.../oauth/it/container/KeycloakContainer.java | 80 +++++
.../src/test/resources/certificates/README.md | 44 ++-
.../test/resources/certificates/ca-truststore.p12 | Bin 0 -> 1639 bytes
.../src/test/resources/certificates/gen-ca.sh | 29 +-
.../resources/certificates/gen-keycloak-certs.sh | 31 ++
.../certificates/keycloak.server.keystore.p12 | Bin 0 -> 5565 bytes
.../src/test/resources/kafkaServer.properties | 177 ++++++++++++
.../keycloak/realms/kafka-authz-realm.json | 321 +++++++++++++++++++++
.../resources/keycloak/scripts/keycloak-ssl.cli | 20 ++
integration-tests/pom.xml | 1 +
pom.xml | 1 +
tooling/scripts/test-categories.yaml | 1 +
21 files changed, 1247 insertions(+), 31 deletions(-)
diff --git a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
index a6a058f..6545559 100644
--- a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -24,9 +24,8 @@
name: "Camel Jfr"
description: "Diagnose Camel applications with Java Flight Recorder"
metadata:
- unlisted: true
guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/jfr.html"
categories:
- "integration"
status:
- - "preview"
+ - "stable"
diff --git a/integration-tests/kafka-oauth/pom.xml b/integration-tests/kafka-oauth/pom.xml
new file mode 100644
index 0000000..7382b99
--- /dev/null
+++ b/integration-tests/kafka-oauth/pom.xml
@@ -0,0 +1,209 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-build-parent-it</artifactId>
+ <version>2.6.0-SNAPSHOT</version>
+ <relativePath>../../poms/build-parent-it/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>camel-quarkus-integration-test-kafka-oauth</artifactId>
+ <name>Camel Quarkus :: Integration Tests :: Kafka Oauth</name>
+ <description>Integration tests for Camel Quarkus Kafka extension</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-log</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-timer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-seda</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-jackson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.strimzi</groupId>
+ <artifactId>kafka-oauth-client</artifactId>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.strimzi</groupId>
+ <artifactId>strimzi-test-container</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>full</id>
+ <activation>
+ <property>
+ <name>!quickly</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <!-- Configure the app to test to resolve "keycloak" hostname to the Docker hostname -->
+ <jdk.net.hosts.file>target/hosts</jdk.net.hosts.file>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>native</id>
+ <activation>
+ <property>
+ <name>native</name>
+ </property>
+ </activation>
+ <properties>
+ <quarkus.package.type>native</quarkus.package.type>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <rerunFailingTestsCount>${rerun.failing.test.count}</rerunFailingTestsCount>
+ <systemPropertyVariables>
+ <quarkus.test.arg-line>-Djdk.net.hosts.file=${basedir}/target/hosts</quarkus.test.arg-line>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>virtualDependencies</id>
+ <activation>
+ <property>
+ <name>!noVirtualDependencies</name>
+ </property>
+ </activation>
+ <dependencies>
+ <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-kafka-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-log-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-seda-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-timer-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaOauthResource.java b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaOauthResource.java
new file mode 100644
index 0000000..e11ba83
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaOauthResource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+
+import org.apache.camel.ConsumerTemplate;
+
+@ApplicationScoped
+@Path("kafka-oauth")
+public class KafkaOauthResource {
+ @Inject
+ ConsumerTemplate consumerTemplate;
+
+ @GET
+ public String getMessages() {
+ return consumerTemplate.receiveBody("seda:kafka-messages", 10000, String.class);
+ }
+}
diff --git a/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/Routes.java b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/Routes.java
new file mode 100644
index 0000000..8b3de8f
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/main/java/org/apache/camel/quarkus/kafka/oauth/it/Routes.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import org.apache.camel.builder.RouteBuilder;
+
+@ApplicationScoped
+public class Routes extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+ // produces messages to kafka
+ from("timer:foo?period={{timer.period}}&delay={{timer.delay}}")
+ .routeId("FromTimer2Kafka")
+ .setBody().simple("Message #${exchangeProperty.CamelTimerCounter}")
+ .to("kafka:{{kafka.topic.name}}")
+ .log("Message sent correctly to the topic! : \"${body}\" ");
+
+ // kafka consumer
+ from("kafka:{{kafka.topic.name}}")
+ .routeId("FromKafka2Seda")
+ .log("Received : \"${body}\"")
+ .to("seda:kafka-messages");
+ }
+}
diff --git a/integration-tests/kafka-oauth/src/main/resources/application.properties b/integration-tests/kafka-oauth/src/main/resources/application.properties
new file mode 100644
index 0000000..315c071
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/main/resources/application.properties
@@ -0,0 +1,50 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+
+#Kafka topic Name
+kafka.topic.name=test
+
+# How often should the messages be generated and pushed to Kafka Topic
+timer.period = 100
+timer.delay = 100
+
+camel.component.kafka.security-protocol = SASL_PLAINTEXT
+camel.component.kafka.sasl-mechanism = OAUTHBEARER
+camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+ oauth.client.id="kafka-client" \
+ oauth.client.secret="kafka-client-secret" \
+ oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" ;
+camel.component.kafka.additional-properties[sasl.login.callback.handler.class] = io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
+
+# enable health check
+quarkus.kafka.health.enabled=true
+
+# using QuarkusTestResourceLifecycleManager in this test: Kafka configuration needs to be tuned to work with Keycloak
+quarkus.kafka.devservices.enabled=false
+
+
+#
+# Quarkus - Log
+#
+quarkus.log.category."org.apache.camel.quarkus.core.deployment".level = INFO
+quarkus.log.category."org.apache.camel.quarkus.component.kafka".level = DEBUG
+quarkus.log.category."org.apache.zookeeper".level = WARNING
+quarkus.log.category."org.apache.kafka".level = WARNING
+
+%quiet.quarkus.log.category."kafka".level = WARNING
+%quiet.quarkus.log.category."kafka.log".level = FATAL
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaIT.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaIT.java
new file mode 100644
index 0000000..7a5062a
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaIT.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import io.quarkus.test.junit.NativeImageTest;
+
+@NativeImageTest
+public class KafkaIT extends KafkaTest {
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaKeycloakTestResource.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaKeycloakTestResource.java
new file mode 100644
index 0000000..549ddb7
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaKeycloakTestResource.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.apache.camel.quarkus.kafka.oauth.it.container.KafkaContainer;
+import org.apache.camel.quarkus.kafka.oauth.it.container.KeycloakContainer;
+import org.jboss.logging.Logger;
+
+/**
+ * Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
+ */
+public class KafkaKeycloakTestResource implements QuarkusTestResourceLifecycleManager {
+
+ private static final Logger log = Logger.getLogger(KafkaKeycloakTestResource.class);
+ private KafkaContainer kafka;
+ private KeycloakContainer keycloak;
+
+ @Override
+ public Map<String, String> start() {
+
+ Map<String, String> properties = new HashMap<>();
+
+ //Start keycloak container
+ keycloak = new KeycloakContainer();
+ keycloak.start();
+ log.info(keycloak.getLogs());
+ keycloak.createHostsFile();
+
+ //Start kafka container
+ kafka = new KafkaContainer();
+ kafka.start();
+ log.info(kafka.getLogs());
+ properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
+ properties.put("camel.component.kafka.brokers", kafka.getBootstrapServers());
+
+ return properties;
+ }
+
+ @Override
+ public void stop() {
+ if (kafka != null) {
+ kafka.stop();
+ }
+ if (keycloak != null) {
+ keycloak.stop();
+ }
+ }
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaTest.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaTest.java
new file mode 100644
index 0000000..a858d76
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/KafkaTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it;
+
+import java.util.concurrent.TimeUnit;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+import org.junit.jupiter.api.Test;
+
+import static org.awaitility.Awaitility.await;
+
+@QuarkusTest
+@QuarkusTestResource(KafkaKeycloakTestResource.class)
+public class KafkaTest {
+
+ @Test
+ public void testKafka() {
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ String message = RestAssured.get("/kafka-oauth").asString();
+ return message != null && message.contains("Message #");
+ });
+ }
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KafkaContainer.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KafkaContainer.java
new file mode 100644
index 0000000..31885bf
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KafkaContainer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it.container;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import io.strimzi.StrimziKafkaContainer;
+import org.jboss.logging.Logger;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.MountableFile;
+
+/**
+ * Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
+ */
+public class KafkaContainer extends FixedHostPortGenericContainer<KafkaContainer> {
+
+ private static final Logger LOGGER = Logger.getLogger(KafkaContainer.class);
+
+ private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
+ private static final int KAFKA_PORT = 9092;
+ private static final String LATEST_KAFKA_VERSION;
+
+ private static final List<String> supportedKafkaVersions = new ArrayList<>(3);
+
+ static {
+ InputStream inputStream = StrimziKafkaContainer.class.getResourceAsStream("/kafka-versions.txt");
+ InputStreamReader streamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+
+ try (BufferedReader bufferedReader = new BufferedReader(streamReader)) {
+ String kafkaVersion;
+ while ((kafkaVersion = bufferedReader.readLine()) != null) {
+ supportedKafkaVersions.add(kafkaVersion);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Unable to load the supported Kafka versions", e);
+ }
+
+ // sort kafka version from low to high
+ Collections.sort(supportedKafkaVersions);
+
+ LATEST_KAFKA_VERSION = supportedKafkaVersions.get(supportedKafkaVersions.size() - 1);
+ }
+
+ public KafkaContainer() {
+ super("quay.io/strimzi/kafka:" + "latest-kafka-" + LATEST_KAFKA_VERSION);
+
+ withExposedPorts(KAFKA_PORT);
+ withFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+ withCopyFileToContainer(MountableFile.forClasspathResource("kafkaServer.properties"),
+ "/opt/kafka/config/server.properties");
+ waitingFor(Wait.forLogMessage(".*Kafka startTimeMs:.*", 1));
+ withNetwork(Network.SHARED);
+ withNetworkAliases("kafka");
+ withEnv("LOG_DIR", "/tmp");
+ }
+
+ @Override
+ protected void doStart() {
+ // we need it for the startZookeeper(); and startKafka(); to run container before...
+ withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
+ super.doStart();
+ }
+
+ @Override
+ protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
+ super.containerIsStarting(containerInfo, reused);
+ LOGGER.info("Kafka servers :: " + getBootstrapServers());
+ String command = "#!/bin/bash \n";
+ command += "bin/zookeeper-server-start.sh ./config/zookeeper.properties &\n";
+ command += "export CLASSPATH=\"/opt/kafka/libs/strimzi/*:$CLASSPATH\" \n";
+ command += "bin/kafka-server-start.sh ./config/server.properties" +
+ " --override listeners=JWT://:" + KAFKA_PORT +
+ " --override advertised.listeners=" + getBootstrapServers();
+ copyFileToContainer(Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), STARTER_SCRIPT);
+ }
+
+ public String getBootstrapServers() {
+ return String.format("JWT://%s:%s", getHost(), KAFKA_PORT);
+ }
+
+}
diff --git a/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KeycloakContainer.java b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KeycloakContainer.java
new file mode 100644
index 0000000..0b46077
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/java/org/apache/camel/quarkus/kafka/oauth/it/container/KeycloakContainer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.kafka.oauth.it.container;
+
+import java.io.FileWriter;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.MountableFile;
+
+/**
+ * Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
+ */
+public class KeycloakContainer extends FixedHostPortGenericContainer<KeycloakContainer> {
+
+ public KeycloakContainer() {
+ super("quay.io/keycloak/keycloak:15.0.2");
+ withExposedPorts(8443);
+ withFixedExposedPort(8080, 8080);
+ withEnv("KEYCLOAK_USER", "admin");
+ withEnv("KEYCLOAK_PASSWORD", "admin");
+ withEnv("KEYCLOAK_HTTPS_PORT", "8443");
+ withEnv("PROXY_ADDRESS_FORWARDING", "true");
+ withEnv("KEYCLOAK_IMPORT", "/opt/jboss/keycloak/realms/kafka-authz-realm.json");
+ waitingFor(Wait.forLogMessage(".*WFLYSRV0025.*", 1));
+ withNetwork(Network.SHARED);
+ withNetworkAliases("keycloak");
+ withCreateContainerCmdModifier(cmd -> {
+ cmd.withEntrypoint("");
+ cmd.withCmd("/bin/bash", "-c", "cd /opt/jboss/keycloak " +
+ "&& bin/jboss-cli.sh --file=ssl/keycloak-ssl.cli " +
+ "&& rm -rf standalone/configuration/standalone_xml_history/current " +
+ "&& cd .. " +
+ "&& /opt/jboss/tools/docker-entrypoint.sh -Dkeycloak.profile.feature.upload_scripts=enabled -b 0.0.0.0");
+ });
+ }
+
+ @Override
+ protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) {
+ super.containerIsStarting(containerInfo);
+ copyFileToContainer(MountableFile.forClasspathResource("certificates/ca-truststore.p12"),
+ "/opt/jboss/keycloak/standalone/configuration/certs/ca-truststore.p12");
+ copyFileToContainer(MountableFile.forClasspathResource("certificates/keycloak.server.keystore.p12"),
+ "/opt/jboss/keycloak/standalone/configuration/certs/keycloak.server.keystore.p12");
+ copyFileToContainer(MountableFile.forClasspathResource("keycloak/scripts/keycloak-ssl.cli"),
+ "/opt/jboss/keycloak/ssl/keycloak-ssl.cli");
+ copyFileToContainer(MountableFile.forClasspathResource("keycloak/realms/kafka-authz-realm.json"),
+ "/opt/jboss/keycloak/realms/kafka-authz-realm.json");
+ }
+
+ public void createHostsFile() {
+ try (FileWriter fileWriter = new FileWriter("target/hosts")) {
+ String dockerHost = this.getHost();
+ if ("localhost".equals(dockerHost)) {
+ fileWriter.write("127.0.0.1 keycloak");
+ } else {
+ fileWriter.write(dockerHost + " keycloak");
+ }
+ fileWriter.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/integration-tests/kafka-oauth/src/test/resources/certificates/README.md
similarity index 56%
copy from extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
copy to integration-tests/kafka-oauth/src/test/resources/certificates/README.md
index a6a058f..5056a5b 100644
--- a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/integration-tests/kafka-oauth/src/test/resources/certificates/README.md
@@ -1,3 +1,4 @@
+#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -15,18 +16,33 @@
# limitations under the License.
#
-# This is a generated file. Do not edit directly!
-# To re-generate, run the following command from the top level directory:
+##
+# Inspired from https://github.com/quarkusio/quarkus/tree/main/integration-tests/kafka-oauth-keycloak/
#
-# mvn -N cq:update-quarkus-metadata
-#
----
-name: "Camel Jfr"
-description: "Diagnose Camel applications with Java Flight Recorder"
-metadata:
- unlisted: true
- guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/jfr.html"
- categories:
- - "integration"
- status:
- - "preview"
+# Generating the certificates and keystore
+
+## Creating a self-signed CA certificate and truststore
+
+```bash
+./gen-ca.sh
+```
+
+This creates `crt.ca` and adds the certificate to the keystore `ca-truststore.p12`.
+
+## Creating a server certificate and add it to keystore
+
+```bash
+./gen-keycloak-certs.sh
+```
+
+This creates server certificate for Keycloak, signs it and adds it to keystore `keycloak.server.keystore.p12`.
+
+## Cleanup
+
+```bash
+rm ca.srl
+rm ca.crt
+rm ca.key
+rm cert-file
+rm cert-signed
+```
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/resources/certificates/ca-truststore.p12 b/integration-tests/kafka-oauth/src/test/resources/certificates/ca-truststore.p12
new file mode 100644
index 0000000..f3fdb05
Binary files /dev/null and b/integration-tests/kafka-oauth/src/test/resources/certificates/ca-truststore.p12 differ
diff --git a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-ca.sh
old mode 100644
new mode 100755
similarity index 64%
copy from extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
copy to integration-tests/kafka-oauth/src/test/resources/certificates/gen-ca.sh
index a6a058f..c5c5af3
--- a/extensions/jfr/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-ca.sh
@@ -1,3 +1,4 @@
+#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -15,18 +16,16 @@
# limitations under the License.
#
-# This is a generated file. Do not edit directly!
-# To re-generate, run the following command from the top level directory:
-#
-# mvn -N cq:update-quarkus-metadata
-#
----
-name: "Camel Jfr"
-description: "Diagnose Camel applications with Java Flight Recorder"
-metadata:
- unlisted: true
- guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/jfr.html"
- categories:
- - "integration"
- status:
- - "preview"
+set -e
+
+# create CA key
+openssl genrsa -out ca.key 4096
+
+# create CA certificate
+openssl req -x509 -new -nodes -sha256 -days 3650 -subj "/CN=quarkus.io" -key ca.key -out ca.crt
+
+
+PASSWORD=changeit
+
+# create p12 truststore
+keytool -keystore ca-truststore.p12 -storetype pkcs12 -alias ca -storepass $PASSWORD -keypass $PASSWORD -import -file ca.crt -noprompt
diff --git a/integration-tests/kafka-oauth/src/test/resources/certificates/gen-keycloak-certs.sh b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-keycloak-certs.sh
new file mode 100755
index 0000000..2b921ea
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/certificates/gen-keycloak-certs.sh
@@ -0,0 +1,31 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+set -e
+
+PASSWORD=changeit
+
+echo "#### Create server certificate for Keycloak"
+keytool -keystore keycloak.server.keystore.p12 -storetype pkcs12 -keyalg RSA -alias keycloak -validity 3650 -genkey -storepass $PASSWORD -keypass $PASSWORD -dname CN=keycloak -ext SAN=DNS:keycloak
+
+echo "#### Sign server certificate (export, sign, add signed to keystore)"
+keytool -keystore keycloak.server.keystore.p12 -storetype pkcs12 -alias keycloak -storepass $PASSWORD -keypass $PASSWORD -certreq -file cert-file
+openssl x509 -req -CA ca.crt -CAkey ca.key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:$PASSWORD
+keytool -keystore keycloak.server.keystore.p12 -alias CARoot -storepass $PASSWORD -keypass $PASSWORD -import -file ca.crt -noprompt
+keytool -keystore keycloak.server.keystore.p12 -alias keycloak -storepass $PASSWORD -keypass $PASSWORD -import -file cert-signed -noprompt
diff --git a/integration-tests/kafka-oauth/src/test/resources/certificates/keycloak.server.keystore.p12 b/integration-tests/kafka-oauth/src/test/resources/certificates/keycloak.server.keystore.p12
new file mode 100644
index 0000000..406c335
Binary files /dev/null and b/integration-tests/kafka-oauth/src/test/resources/certificates/keycloak.server.keystore.p12 differ
diff --git a/integration-tests/kafka-oauth/src/test/resources/kafkaServer.properties b/integration-tests/kafka-oauth/src/test/resources/kafkaServer.properties
new file mode 100644
index 0000000..efc0bb8
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/kafkaServer.properties
@@ -0,0 +1,177 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=1
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
+listeners=JWT://:9092
+#advertised.listeners=SASL_PLAINTEXT://localhost:9092
+
+
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured. Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+#advertised.listeners=SASL_PLAINTEXT://localhost:9092
+
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+listener.security.protocol.map=JWT:SASL_PLAINTEXT
+
+
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
+num.network.threads=3
+
+# The number of threads that the server uses for processing requests, which may include disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+inter.broker.listener.name=JWT
+
+
+#### SASL ####
+
+sasl.enabled.mechanisms=OAUTHBEARER
+
+sasl.mechanism.inter.broker.protocol=OAUTHBEARER
+
+oauth.username.claim=preferred_username
+principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder
+
+listener.name.jwt.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN
+listener.name.jwt.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
+ oauth.jwks.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs" \
+ oauth.valid.issuer.uri="http://keycloak:8080/auth/realms/kafka-authz" \
+ oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" \
+ oauth.client.id="kafka" \
+ oauth.client.secret="kafka-secret";
+
+listener.name.jwt.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
+listener.name.jwt.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
+listener.name.jwt.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+ oauth.jwks.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/certs" \
+ oauth.valid.issuer.uri="http://keycloak:8080/auth/realms/kafka-authz" \
+ oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/kafka-authz/protocol/openid-connect/token" \
+ oauth.client.id="kafka" \
+ oauth.client.secret="kafka-secret" \
+ unsecuredLoginStringClaim_sub="admin";
+
+listener.name.jwt.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler
+
+############################# Log Basics #############################
+
+# A comma separated list of directories under which to store log files
+log.dirs=/tmp/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Internal Topic Settings #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion due to age
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=45000
+
+
+############################# Group Coordinator Settings #############################
+
+# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
+# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
+# The default value for this is 3 seconds.
+# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
+# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
+group.initial.rebalance.delay.ms=0
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/resources/keycloak/realms/kafka-authz-realm.json b/integration-tests/kafka-oauth/src/test/resources/keycloak/realms/kafka-authz-realm.json
new file mode 100644
index 0000000..cbd2d87
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/keycloak/realms/kafka-authz-realm.json
@@ -0,0 +1,321 @@
+{
+ "realm": "kafka-authz",
+ "accessTokenLifespan": 300,
+ "ssoSessionIdleTimeout": 864000,
+ "ssoSessionMaxLifespan": 864000,
+ "enabled": true,
+ "sslRequired": "external",
+ "roles": {
+ "realm": [],
+ "client": {
+ "kafka-cli": [],
+ "kafka": [
+ {
+ "name": "uma_protection",
+ "clientRole": true
+ },
+ {
+ "name": "kafka-user",
+ "clientRole": true
+ }
+ ]
+ }
+ },
+ "groups" : [],
+ "users": [
+ {
+ "username": "service-account-kafka-producer-client",
+ "enabled": true,
+ "realmRoles" : [ "offline_access" ],
+ "email": "service-account-kafka-producer-client@placeholder.org",
+ "serviceAccountClientId": "kafka-producer-client"
+ },
+ {
+ "username": "service-account-kafka-consumer-client",
+ "enabled": true,
+ "realmRoles" : [ "offline_access" ],
+ "email": "service-account-kafka-consumer-client@placeholder.org",
+ "serviceAccountClientId": "kafka-consumer-client"
+ }
+ ],
+ "clients": [
+ {
+ "clientId": "kafka",
+ "enabled": true,
+ "clientAuthenticatorType": "client-secret",
+ "secret": "kafka-secret",
+ "bearerOnly": false,
+ "consentRequired": false,
+ "standardFlowEnabled": false,
+ "implicitFlowEnabled": false,
+ "directAccessGrantsEnabled": true,
+ "serviceAccountsEnabled": true,
+ "authorizationServicesEnabled": true,
+ "publicClient": false,
+ "fullScopeAllowed": true,
+ "protocolMappers": [
+ {
+ "name": "kafka audience",
+ "protocol": "openid-connect",
+ "protocolMapper": "oidc-audience-mapper",
+ "consentRequired": false,
+ "config": {
+ "included.client.audience": "kafka",
+ "id.token.claim": "false",
+ "access.token.claim": "true"
+ }
+ }
+ ],
+ "authorizationSettings": {
+ "allowRemoteResourceManagement": true,
+ "policyEnforcementMode": "ENFORCING",
+ "resources": [
+ {
+ "name": "Group:*",
+ "type": "Group",
+ "ownerManagedAccess": false,
+ "displayName": "Any group",
+ "attributes": {},
+ "uris": [],
+ "scopes": [
+ {
+ "name": "Describe"
+ },
+ {
+ "name": "Read"
+ },
+ {
+ "name": "DescribeConfigs"
+ },
+ {
+ "name": "AlterConfigs"
+ }
+ ]
+ },
+ {
+ "name": "Topic:*",
+ "type": "Topic",
+ "ownerManagedAccess": false,
+ "displayName": "Any topic",
+ "attributes": {},
+ "uris": [],
+ "scopes": [
+ {
+ "name": "Create"
+ },
+ {
+ "name": "Delete"
+ },
+ {
+ "name": "Describe"
+ },
+ {
+ "name": "Write"
+ },
+ {
+ "name": "Read"
+ },
+ {
+ "name": "Alter"
+ },
+ {
+ "name": "DescribeConfigs"
+ },
+ {
+ "name": "AlterConfigs"
+ }
+ ]
+ },
+ {
+ "name" : "Cluster:*",
+ "type" : "Cluster",
+ "ownerManagedAccess" : false,
+ "attributes" : { },
+ "uris" : [ ]
+ },
+ {
+ "name": "Topic:messages",
+ "type": "topic",
+ "scopes": [
+ {
+ "name": "Delete"
+ },
+ {
+ "name": "Describe"
+ },
+ {
+ "name": "Create"
+ },
+ {
+ "name": "Write"
+ },
+ {
+ "name": "Alter"
+ },
+ {
+ "name": "Read"
+ },
+ {
+ "name": "DescribeConfigs"
+ },
+ {
+ "name": "AlterConfigs"
+ }
+ ]
+ }
+ ],
+ "policies": [
+ {
+ "name": "Producer Client",
+ "type": "client",
+ "logic": "POSITIVE",
+ "decisionStrategy": "UNANIMOUS",
+ "config": {
+ "clients": "[\"kafka-producer-client\", \"kafka-client\"]"
+ }
+ },
+ {
+ "name": "Consumer Client",
+ "type": "client",
+ "logic": "POSITIVE",
+ "decisionStrategy": "UNANIMOUS",
+ "config": {
+ "clients": "[\"kafka-consumer-client\", \"kafka-client\"]"
+ }
+ },
+ {
+ "name": "Producer Client can write to topic 'messages'",
+ "type": "scope",
+ "logic": "POSITIVE",
+ "decisionStrategy": "UNANIMOUS",
+ "config": {
+ "resources": "[\"Topic:messages\"]",
+ "scopes": "[\"Delete\",\"Describe\",\"Create\",\"Write\"]",
+ "applyPolicies": "[\"Producer Client\"]"
+ }
+ },
+ {
+ "name": "Consumer Client can read from topic 'messages'",
+ "type": "scope",
+ "logic": "POSITIVE",
+ "decisionStrategy": "UNANIMOUS",
+ "config": {
+ "resources": "[\"Topic:messages\"]",
+ "scopes": "[\"Describe\",\"Read\"]",
+ "applyPolicies": "[\"Consumer Client\"]"
+ }
+ },
+ {
+ "name": "Consumer Client can use any group",
+ "type": "scope",
+ "logic": "POSITIVE",
+ "decisionStrategy": "UNANIMOUS",
+ "config": {
+ "resources": "[\"Group:*\"]",
+ "scopes": "[\"Describe\",\"Write\",\"Read\"]",
+ "applyPolicies": "[\"Consumer Client\"]"
+ }
+ }
+ ],
+ "scopes": [
+ {
+ "name": "Create"
+ },
+ {
+ "name": "Read"
+ },
+ {
+ "name": "Write"
+ },
+ {
+ "name": "Delete"
+ },
+ {
+ "name": "Alter"
+ },
+ {
+ "name": "Describe"
+ },
+ {
+ "name": "ClusterAction"
+ },
+ {
+ "name": "DescribeConfigs"
+ },
+ {
+ "name": "AlterConfigs"
+ },
+ {
+ "name": "IdempotentWrite"
+ }
+ ],
+ "decisionStrategy": "AFFIRMATIVE"
+ }
+ },
+ {
+ "clientId": "kafka-cli",
+ "enabled": true,
+ "clientAuthenticatorType": "client-secret",
+ "secret": "kafka-cli-secret",
+ "bearerOnly": false,
+ "consentRequired": false,
+ "standardFlowEnabled": false,
+ "implicitFlowEnabled": false,
+ "directAccessGrantsEnabled": true,
+ "serviceAccountsEnabled": false,
+ "publicClient": true,
+ "fullScopeAllowed": true
+ },
+ {
+ "clientId": "kafka-producer-client",
+ "enabled": true,
+ "clientAuthenticatorType": "client-secret",
+ "secret": "kafka-producer-client-secret",
+ "publicClient": false,
+ "bearerOnly": false,
+ "standardFlowEnabled": false,
+ "implicitFlowEnabled": false,
+ "directAccessGrantsEnabled": true,
+ "serviceAccountsEnabled": true,
+ "consentRequired" : false,
+ "fullScopeAllowed" : false,
+ "attributes": {
+ "access.token.lifespan": "36000"
+ }
+ },
+ {
+ "clientId": "kafka-consumer-client",
+ "enabled": true,
+ "clientAuthenticatorType": "client-secret",
+ "secret": "kafka-consumer-client-secret",
+ "publicClient": false,
+ "bearerOnly": false,
+ "standardFlowEnabled": false,
+ "implicitFlowEnabled": false,
+ "directAccessGrantsEnabled": true,
+ "serviceAccountsEnabled": true,
+ "consentRequired" : false,
+ "fullScopeAllowed" : false,
+ "attributes": {
+ "access.token.lifespan": "36000"
+ }
+ },
+ {
+ "clientId": "kafka-client",
+ "enabled": true,
+ "clientAuthenticatorType": "client-secret",
+ "secret": "kafka-client-secret",
+ "publicClient": false,
+ "bearerOnly": false,
+ "standardFlowEnabled": false,
+ "implicitFlowEnabled": false,
+ "directAccessGrantsEnabled": true,
+ "serviceAccountsEnabled": true,
+ "consentRequired" : false,
+ "fullScopeAllowed" : false,
+ "attributes": {
+ "access.token.lifespan": "36000"
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/integration-tests/kafka-oauth/src/test/resources/keycloak/scripts/keycloak-ssl.cli b/integration-tests/kafka-oauth/src/test/resources/keycloak/scripts/keycloak-ssl.cli
new file mode 100644
index 0000000..405f1c8
--- /dev/null
+++ b/integration-tests/kafka-oauth/src/test/resources/keycloak/scripts/keycloak-ssl.cli
@@ -0,0 +1,20 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+embed-server --server-config=standalone-ha.xml
+/core-service=management/security-realm=UndertowRealm:add()
+/core-service=management/security-realm=UndertowRealm/server-identity=ssl:add(keystore-path=certs/keycloak.server.keystore.p12, keystore-relative-to=jboss.server.config.dir, keystore-password=changeit)
+/subsystem=undertow/server=default-server/https-listener=https:write-attribute(name=security-realm, value=UndertowRealm)
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 3393e3a..883adb3 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -131,6 +131,7 @@
<module>jsonpath</module>
<module>jta</module>
<module>kafka</module>
+ <module>kafka-oauth</module>
<module>kafka-sasl</module>
<module>kafka-sasl-ssl</module>
<module>kafka-ssl</module>
diff --git a/pom.xml b/pom.xml
index 5f2cff6..cd15b4b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -511,6 +511,7 @@
<exclude>**/k8s-sb/**</exclude>
</excludes>
<mapping>
+ <cli>CAMEL_PROPERTIES_STYLE</cli>
<groovy>SLASHSTAR_STYLE</groovy>
<java>SLASHSTAR_STYLE</java>
<Jenkinsfile>SLASHSTAR_STYLE</Jenkinsfile>
diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml
index d13ec86..8b643d3 100644
--- a/tooling/scripts/test-categories.yaml
+++ b/tooling/scripts/test-categories.yaml
@@ -39,6 +39,7 @@ group-02:
- jackson-avro
- jackson-protobuf
- jfr
+ - kafka-oauth
- oaipmh
- pubnub
- protobuf