You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/02/03 08:58:04 UTC

[camel-kafka-connector] branch master updated (3f1a3be -> 3ffb61a)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from 3f1a3be  Updated CHANGELOG.md
     new cc371d7  Avoid reusing the same destination name for the sjms2 idempotency tests
     new 3ffb61a  Added idempotency test for SJMS2 using header expressions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   | 86 ++++++++++++++++++----
 1 file changed, 73 insertions(+), 13 deletions(-)


[camel-kafka-connector] 02/02: Added idempotency test for SJMS2 using header expressions

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 3ffb61a4d1f88a35832a60822b31ca563d7c5a12
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 09:11:34 2021 +0100

    Added idempotency test for SJMS2 using header expressions
---
 .../sjms2/sink/CamelSinkIdempotentJMSITCase.java   | 80 +++++++++++++++++++---
 1 file changed, 69 insertions(+), 11 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 566e823..d2f06a7 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.kafkaconnector.sjms2.sink;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +30,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
 
+import org.apache.camel.kafkaconnector.CamelSinkTask;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -53,6 +56,11 @@ import static org.junit.jupiter.api.Assertions.fail;
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
+    @FunctionalInterface
+    interface Producer {
+        void producerMessages();
+    }
+
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
@@ -84,7 +92,8 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
     public void setUp() {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
-        topic = TestUtils.getDefaultTestTopic(this.getClass());
+
+        topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
         destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
     }
 
@@ -142,7 +151,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         }
     }
 
-    private void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
+    private void runTest(ConnectorPropertyFactory connectorPropertyFactory, Producer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
@@ -151,14 +160,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         LOG.debug("Creating the consumer ...");
         service.submit(() -> consumeJMSMessages());
 
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-
-        for (int i = 0; i < expect; i++) {
-            LOG.debug("Sending message 1/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-            LOG.debug("Sending message 2/2");
-            kafkaClient.produce(topic, "Sink test message " + i);
-        }
+        producer.producerMessages();
 
         LOG.debug("Waiting for the messages to be processed");
         service.shutdown();
@@ -170,6 +172,39 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         }
     }
 
+    private void produceMessagesNoProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                LOG.debug("Sending message 1/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+                LOG.debug("Sending message 2/2");
+                kafkaClient.produce(topic, "Sink test message " + i);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+    private void produceMessagesWithProperties() {
+        try {
+            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < expect; i++) {
+                Map<String, String> headers = new HashMap<>();
+                int randomNumber = TestUtils.randomWithRange(1, 1000);
+
+                headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "MessageNumber", String.valueOf(i));
+
+                kafkaClient.produce(topic, "Sink test message " + randomNumber, headers);
+                kafkaClient.produce(topic, "Sink test message " + randomNumber + 1, headers);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
     @Test
     @Timeout(90)
     public void testIdempotentBodySendReceive() {
@@ -184,7 +219,30 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
                         .withExpressionType("body")
                         .end();
 
-            runTest(connectorPropertyFactory);
+            runTest(connectorPropertyFactory, this::produceMessagesNoProperties);
+
+        } catch (Exception e) {
+            LOG.error("JMS test failed: {}", e.getMessage(), e);
+            fail(e.getMessage());
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testIdempotentHeaderSendReceive() {
+        try {
+            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                    .basic()
+                    .withTopics(topic)
+                    .withConnectionProperties(connectionProperties())
+                    .withDestinationName(destinationName)
+                    .withIdempotency()
+                    .withRepositoryType("memory")
+                    .withExpressionType("header")
+                    .withExpressionHeader("MessageNumber")
+                    .end();
+
+            runTest(connectorPropertyFactory, this::produceMessagesWithProperties);
 
         } catch (Exception e) {
             LOG.error("JMS test failed: {}", e.getMessage(), e);


[camel-kafka-connector] 01/02: Avoid reusing the same destination name for the sjms2 idempotency tests

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit cc371d7167e406e4b41f2d47d97b41a7706868a7
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Feb 3 08:46:56 2021 +0100

    Avoid reusing the same destination name for the sjms2 idempotency tests
---
 .../kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
index 8eceee2..566e823 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkIdempotentJMSITCase.java
@@ -62,6 +62,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
     private static final Logger LOG = LoggerFactory.getLogger(CamelSinkIdempotentJMSITCase.class);
 
     private String topic;
+    private String destinationName;
     private int received;
     private final int expect = 10;
 
@@ -84,6 +85,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
         LOG.info("JMS service running at {}", jmsService.defaultEndpoint());
         received = 0;
         topic = TestUtils.getDefaultTestTopic(this.getClass());
+        destinationName = SJMS2Common.DEFAULT_JMS_QUEUE + "-" + TestUtils.randomWithRange(0, 100);
     }
 
     private boolean checkRecord(Message jmsMessage) {
@@ -111,7 +113,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
             jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
             jmsClient.start();
 
-            try (MessageConsumer consumer = jmsClient.createConsumer(SJMS2Common.DEFAULT_JMS_QUEUE)) {
+            try (MessageConsumer consumer = jmsClient.createConsumer(destinationName)) {
                 // number of retries until stale
                 int retries = 10;
 
@@ -176,7 +178,7 @@ public class CamelSinkIdempotentJMSITCase extends AbstractKafkaTest {
                     .basic()
                     .withTopics(topic)
                     .withConnectionProperties(connectionProperties())
-                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .withDestinationName(destinationName)
                     .withIdempotency()
                         .withRepositoryType("memory")
                         .withExpressionType("body")