You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/02/03 13:08:58 UTC

[camel-kafka-connector] 03/05: Added idempotency test for SJMS2 using header expressions

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-master-master-align
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 6e57e058d712e9184687cb961e25500c7605780f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 09:11:34 2021 +0100

    Added idempotency test for SJMS2 using header expressions
---
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   | 80 +++++++++++++++++++---
 1 file changed, 69 insertions(+), 11 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 566e823..d2f06a7 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.kafkaconnector.sjms2.sink;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +30,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
+import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -53,6 +56,11 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
+    @FunctionalInterface
+    interface Producer {
+        void producerMessages();
+    }
+
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -84,7 +92,8 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
     public void setUp() {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
-        topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
         destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
     }
 
@@ -142,7 +151,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         }
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
@@ -151,14 +160,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         LOG.debug("Creating the consumer ...");
         service.submit(() -> consumeJMSMessages());
 
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            LOG.debug("Sending message 1/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-            LOG.debug("Sending message 2/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-        }
+        producer.producerMessages();
 
         LOG.debug("Waiting for the messages to be processed");
         service.shutdown();
@@ -170,6 +172,39 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         }
     }
 
+    private void produceMessagesNoProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                LOG.debug("Sending message 1/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+                LOG.debug("Sending message 2/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    private void produceMessagesWithProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                Map<String, String> headers = new HashMap<>();
+                int randomNumber = TestUtils.randomWithRange(1, 1000);
+
+                headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "MessageNumber", String.valueOf(i));
+
+                kafkaClient.produce(topic, "Sink test message " + randomNumber, headers);
+                kafkaClient.produce(topic, "Sink test message " + randomNumber + 1, headers);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
     @Test
     @Timeout(90)
     public void testIdempotentBodySendReceive() {
@@ -184,7 +219,30 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
                         .withExpressionType("body")
                         .end();
 
-            runTest(connectorPropertyFactory);
+            runTest(connectorPropertyFactory, this::produceMessagesNoProperties);
+
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testIdempotentHeaderSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(topic)
+                    .withConnectionProperties(connectionProperties())
+                    .withDestinationName(destinationName)
+                    .withIdempotency()
+                    .withRepositoryType("memory")
+                    .withExpressionType("header")
+                    .withExpressionHeader("MessageNumber")
+                    .end();
+
+            runTest(connectorPropertyFactory, this::produceMessagesWithProperties);
 
         } catch (Exception e) {
             LOG.error("JMS test failed: {}", e.getMessage(), e);