You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/12/08 11:59:52 UTC

[camel] 02/03: (chores) camel-kafka: improve the ability to test for connection state

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 222fcd02f70c8698b3ea5c9769d14898c21736d3
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Dec 7 16:17:02 2022 +0100

    (chores) camel-kafka: improve the ability to test for connection state
---
 .../integration/AbstractKafkaTestSupport.java      |  3 +-
 .../BaseEmbeddedKafkaAuthTestSupport.java          | 39 +++++++++++++++++++++-
 .../kafka/integration/KafkaConsumerAuthIT.java     | 37 +++++++++++++-------
 3 files changed, 65 insertions(+), 14 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
index f02c28e1b6d..120dcb86c63 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/AbstractKafkaTestSupport.java
@@ -25,6 +25,7 @@ import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.slf4j.Logger;
@@ -40,7 +41,7 @@ public abstract class AbstractKafkaTestSupport extends CamelTestSupport {
 
     public static AdminClient createAdminClient(KafkaService service) {
         final Properties properties = new Properties();
-        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
 
         return KafkaAdminClient.create(properties);
     }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
index c888ff8c4bf..adf2b12068e 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseEmbeddedKafkaAuthTestSupport.java
@@ -16,17 +16,29 @@
  */
 package org.apache.camel.component.kafka.integration;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.test.infra.kafka.services.ContainerLocalAuthKafkaService;
+import org.apache.camel.test.infra.kafka.services.KafkaService;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.junit.Assert;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Tags;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
 @Tags({ @Tag("non-abstract") })
 public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTestSupport {
     @RegisterExtension
@@ -39,6 +51,17 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTest
         AbstractKafkaTestSupport.setServiceProperties(service);
     }
 
+    public static AdminClient createAuthAdminClient(KafkaService service) {
+        final Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, service.getBootstrapServers());
+        properties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        properties.put(SaslConfigs.SASL_JAAS_CONFIG,
+                ContainerLocalAuthKafkaService.generateSimpleSaslJaasConfig("admin", "admin-secret"));
+
+        return AdminClient.create(properties);
+    }
+
     @BeforeEach
     public void setKafkaAdminClient() {
         if (kafkaAdminClient == null) {
@@ -60,6 +83,20 @@ public abstract class BaseEmbeddedKafkaAuthTestSupport extends AbstractKafkaTest
     }
 
     private static AdminClient createAdminClient() {
-        return createAdminClient(service);
+        return createAuthAdminClient(service);
+    }
+
+    protected static Map<String, ConsumerGroupDescription> getConsumerGroupInfo(String groupId)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return kafkaAdminClient.describeConsumerGroups(Collections.singletonList(groupId)).all().get(30, TimeUnit.SECONDS);
+    }
+
+    protected static void assertGroupIsConnected(String groupId) {
+        final Map<String, ConsumerGroupDescription> allGroups = assertDoesNotThrow(() -> getConsumerGroupInfo(groupId));
+
+        Assert.assertTrue("There should be at least one group named" + groupId, allGroups.size() >= 1);
+
+        final ConsumerGroupDescription groupInfo = allGroups.get("KafkaConsumerAuthIT");
+        Assert.assertNotNull("There should be at least one group named KafkaConsumerAuthIT", groupInfo);
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java
index 1af2198c793..7df5402aa34 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAuthIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.integration;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.StreamSupport;
 
 import org.apache.camel.EndpointInject;
@@ -28,6 +29,7 @@ import org.apache.camel.component.kafka.MockConsumerInterceptor;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.kafka.services.ContainerLocalAuthKafkaService;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -61,9 +63,10 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
     @BeforeEach
     public void before() {
         Properties props = getDefaultProperties();
-        props.put("sasl.jaas.config", ContainerLocalAuthKafkaService.generateSimpleSaslJaasConfig("camel", "camel-secret"));
+        props.put(SaslConfigs.SASL_JAAS_CONFIG,
+                ContainerLocalAuthKafkaService.generateSimpleSaslJaasConfig("camel", "camel-secret"));
         props.put("security.protocol", "SASL_PLAINTEXT");
-        props.put("sasl.mechanism", "PLAIN");
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
 
         try {
             producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
@@ -94,7 +97,7 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
 
                 fromF("kafka:%s"
                       + "?groupId=%s&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                      + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                      + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer&clientId=camel-kafka-auth-test"
                       + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=%s"
                       + "&saslMechanism=PLAIN&securityProtocol=SASL_PLAINTEXT&saslJaasConfig=%s", TOPIC,
                         "KafkaConsumerAuthIT", "org.apache.camel.component.kafka.MockConsumerInterceptor", simpleSaslJaasConfig)
@@ -105,11 +108,17 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
         };
     }
 
+    @Order(1)
+    @Test
+    public void isConnected() {
+        assertGroupIsConnected("KafkaConsumerAuthIT");
+    }
+
     @DisplayName("Tests that Camel can adequately connect and consume from an authenticated Kafka instance")
     @Timeout(30)
-    @Order(3)
+    @Order(2)
     @Test
-    public void kafkaMessageIsConsumedByCamel() throws InterruptedException {
+    public void kafkaMessageIsConsumedByCamel() throws InterruptedException, ExecutionException {
         String propagatedHeaderKey = "PropagatedCustomHeader";
         byte[] propagatedHeaderValue = "propagated header value".getBytes();
         String skippedHeaderKey = "CamelSkippedHeader";
@@ -120,13 +129,7 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
         to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, null, null, null, null, null);
         to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
 
-        for (int k = 0; k < 5; k++) {
-            String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
-            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
-            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
-            producer.send(data);
-        }
+        populateKafkaTopic(propagatedHeaderKey, propagatedHeaderValue);
 
         to.assertIsSatisfied(3000);
 
@@ -138,4 +141,14 @@ public class KafkaConsumerAuthIT extends BaseEmbeddedKafkaAuthTestSupport {
         assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive propagated header");
     }
 
+    private void populateKafkaTopic(String propagatedHeaderKey, byte[] propagatedHeaderValue) {
+        for (int k = 0; k < 5; k++) {
+            String msg = "message-" + k;
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
+            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
+            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
+            producer.send(data);
+        }
+    }
+
 }