You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/08 11:59:52 UTC
[camel] 02/03: (chores) camel-kafka: improve the ability to test for connection state
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 222fcd02f70c8698b3ea5c9769d14898c21736d3
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Dec 7 16:17:02 2022 +0100
(chores) camel-kafka: improve the ability to test for connection state
---
.../integration/AbstractKafkaTestSupport.java | 3 +-
.../BaseEmbeddedKafkaAuthTestSupport.java | 39 +++++++++++++++++++++-
.../kafka/integration/KafkaConsumerAuthIT.java | 37 +++++++++++++-------
3 files changed, 65 insertions(+), 14 deletions(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
index f02c28e1b6d..120dcb86c63 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
@@ -25,6 +25,7 @@ import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.test.infra.kafka.services.KafkaService;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
@@ -40,7 +41,7 @@ public abstract class AbstractKafkaTestSupport extends CamelTestSupport {
public static AdminClient createAdminClient(KafkaService service) {
final Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
return KafkaAdminClient.create(properties);
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
index c888ff8c4bf..adf2b12068e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
@@ -16,17 +16,29 @@
*/
package org.apache.camel.component.kafka.integration;
+import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.camel.CamelContext;
import org.apache.camel.test.infra.kafka.services.ContainerLocalAuthKafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.junit.Assert;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.extension.RegisterExtension;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
@Tags({ @Tag("non-abstract") })
public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTestSupport {
@RegisterExtension
@@ -39,6 +51,17 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTest
AbstractKafkaTestSupport.setServiceProperties(service);
}
+ public static AdminClient createAuthAdminClient(KafkaService service) {
+ final Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
+ properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG,
+ ContainerLocalAuthKafkaService.generateSimpleSaslJaasConfig("admin", "admin-secret"));
+
+ return AdminClient.create(properties);
+ }
+
@BeforeEach
public void setKafkaAdminClient() {
if (kafkaAdminClient == null) {
@@ -60,6 +83,20 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTest
}
private static AdminClient createAdminClient() {
- return createAdminClient(service);
+ return createAuthAdminClient(service);
+ }
+
+ protected static Map<String, ConsumerGroupDescription> getConsumerGroupInfo(String groupId)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return kafkaAdminClient.describeConsumerGroups(Collections.singletonList(groupId)).all().get(30, TimeUnit.SECONDS);
+ }
+
+ protected static void assertGroupIsConnected(String groupId) {
+ final Map<String, ConsumerGroupDescription> allGroups = assertDoesNotThrow(() -> getConsumerGroupInfo(groupId));
+
+ Assert.assertTrue("There should be at least one group named" + groupId, allGroups.size() >= 1);
+
+ final ConsumerGroupDescription groupInfo = allGroups.get("KafkaConsumerAuthIT");
+ Assert.assertNotNull("There should be at least one group named KafkaConsumerAuthIT", groupInfo);
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java
index 1af2198c793..7df5402aa34 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.integration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
import java.util.stream.StreamSupport;
import org.apache.camel.EndpointInject;
@@ -28,6 +29,7 @@ import org.apache.camel.component.kafka.MockConsumerInterceptor;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.kafka.services.ContainerLocalAuthKafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -61,9 +63,10 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
@BeforeEach
public void before() {
Properties props = getDefaultProperties();
- props.put("sasl.jaas.config", ContainerLocalAuthKafkaService.generateSimpleSaslJaasConfig("camel", "camel-secret"));
+ props.put(SaslConfigs.SASL_JAAS_CONFIG,
+ ContainerLocalAuthKafkaService.generateSimpleSaslJaasConfig("camel", "camel-secret"));
props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
+ props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
try {
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
@@ -94,7 +97,7 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
fromF("kafka:%s"
+ "?groupId=%s&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
- + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer&clientId=camel-kafka-auth-test"
+ "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=%s"
+ "&saslMechanism=PLAIN&securityProtocol=SASL_PLAINTEXT&saslJaasConfig=%s", TOPIC,
"KafkaConsumerAuthIT", "org.apache.camel.component.kafka.MockConsumerInterceptor", simpleSaslJaasConfig)
@@ -105,11 +108,17 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
};
}
+ @Order(1)
+ @Test
+ public void isConnected() {
+ assertGroupIsConnected("KafkaConsumerAuthIT");
+ }
+
@DisplayName("Tests that Camel can adequately connect and consume from an authenticated Kafka instance")
@Timeout(30)
- @Order(3)
+ @Order(2)
@Test
- public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+ public void kafkaMessageIsConsumedByCamel() throws InterruptedException, ExecutionException {
String propagatedHeaderKey = "PropagatedCustomHeader";
byte[] propagatedHeaderValue = "propagated header value".getBytes();
String skippedHeaderKey = "CamelSkippedHeader";
@@ -120,13 +129,7 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, null, null, null, null, null);
to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
- for (int k = 0; k < 5; k++) {
- String msg = "message-" + k;
- ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
- data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
- data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
- producer.send(data);
- }
+ populateKafkaTopic(propagatedHeaderKey, propagatedHeaderValue);
to.assertIsSatisfied(3000);
@@ -138,4 +141,14 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive propagated header");
}
+ private void populateKafkaTopic(String propagatedHeaderKey, byte[] propagatedHeaderValue) {
+ for (int k = 0; k < 5; k++) {
+ String msg = "message-" + k;
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+ data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+ data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+ producer.send(data);
+ }
+ }
+
}