You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:48 UTC

[camel-kafka-connector] 08/14: Converted the SJMS2 source test cases to use the reusable source base class

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e95b9fb3bd2f1874625fa81587a0400b52ce505f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 21:21:28 2021 +0100

    Converted the SJMS2 source test cases to use the reusable source base class
---
 .../common/test/AbstractTestMessageConsumer.java   |   8 +-
 .../common/test/CamelSourceTestSupport.java        |  45 +++++++-
 .../common/test/IntegerMessageConsumer.java        |  29 +++++
 .../sjms2/source/CamelSourceJMSITCase.java         | 127 ++++++++-------------
 .../source/CamelSourceJMSWithAggregation.java      | 110 +++++++++++-------
 5 files changed, 186 insertions(+), 133 deletions(-)

diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
index 2fcf42f..744abbf 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
 public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsumer<T> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageConsumer.class);
 
-    private final KafkaClient<String, T> kafkaClient;
-    private final String topicName;
+    protected final KafkaClient<String, T> kafkaClient;
+    protected final String topicName;
     private final int count;
     private final List<ConsumerRecord<String, T>> receivedMessages;
     private volatile int received;
@@ -42,7 +42,7 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu
         receivedMessages = new ArrayList<>(count);
     }
 
-    private boolean checkRecord(ConsumerRecord<String, T> record) {
+    public boolean checkRecord(ConsumerRecord<String, T> record) {
         LOG.debug("Received: {}", record.value());
         received++;
         receivedMessages.add(record);
@@ -63,4 +63,6 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu
     public List<ConsumerRecord<String, T>> consumedMessages() {
         return receivedMessages;
     }
+
+
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 35626a3..7f8b03c 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -33,7 +33,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
     protected abstract void verifyMessages(TestMessageConsumer<?> consumer);
 
     /**
-     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
      * @param topic the topic to send the messages to
@@ -49,7 +49,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
 
 
     /**
-     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
      * @param consumer A Kafka consumer consumer for the test messages
@@ -60,7 +60,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
     }
 
     /**
-     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
      *
      * @param connectorPropertyFactory A factory for connector properties
      * @param consumer A Kafka consumer consumer for the test messages
@@ -71,7 +71,6 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
                         FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         LOG.debug("Initialized the connector and put the data for the test execution");
-//        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
         LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
@@ -86,4 +85,42 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
         LOG.debug("Verified messages");
     }
 
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param consumer A Kafka consumer consumer for the test messages
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
+        runTestBlocking(connectorPropertyFactory, consumer, this::produceTestData);
+    }
+
+    /**
+     * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+     *
+     * @param connectorPropertyFactory A factory for connector properties
+     * @param consumer A Kafka consumer consumer for the test messages
+     * @param producer A producer for the test messages
+     * @throws Exception For test-specific exceptions
+     */
+    public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
+                        FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
+        connectorPropertyFactory.log();
+        LOG.debug("Initialized the connector and put the data for the test execution");
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
+        producer.produceMessages();
+
+        LOG.debug("Creating the Kafka consumer ...");
+        consumer.consumeMessages();
+        LOG.debug("Ran the Kafka consumer ...");
+
+        LOG.debug("Verifying messages");
+        verifyMessages(consumer);
+        LOG.debug("Verified messages");
+    }
+
+
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java
new file mode 100644
index 0000000..a49e00e
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+
+/**
+ * A consumer that receives the 'count' amount of text messages from the Kafka broker
+ */
+public class IntegerMessageConsumer extends AbstractTestMessageConsumer<Integer> {
+    public IntegerMessageConsumer(KafkaClient<String, Integer> kafkaClient, String topicName, int count) {
+        super(kafkaClient, topicName, count);
+    }
+}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 5729c15..781e029 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -20,26 +20,24 @@ package org.apache.camel.kafkaconnector.sjms2.source;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.IntegerMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
 import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
 
 
 /**
@@ -47,16 +45,14 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceJMSITCase extends AbstractKafkaTest {
+public class CamelSourceJMSITCase extends CamelSourceTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
             .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
             .build();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);
-
-    private int received;
+    private String topicName;
     private final int expect = 10;
     private JMSClient jmsClient;
 
@@ -76,101 +72,68 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        received = 0;
-        jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
-    }
-
-    private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
+        topicName = getTopicForTest(this);
 
-        return true;
     }
 
+    @BeforeAll
+    public void setupClient() {
+        jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
+    }
 
-
-    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+    @Override
+    protected void produceTestData() {
         JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, expect, "Test string message");
+    }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
-
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
         assertEquals(received, expect, "Didn't process the expected amount of messages");
     }
 
+
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
-                    .withConnectionProperties(connectionProperties());
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+                .withConnectionProperties(connectionProperties());
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
     @Test
     @Timeout(90)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withConnectionProperties(connectionProperties())
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
-                        .buildUrl();
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withConnectionProperties(connectionProperties())
+                .withKafkaTopic(topicName)
+                .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
+                    .buildUrl();
+
+        runTest(connectorPropertyFactory, topicName, expect);
     }
 
 
     @Test
     @Timeout(90)
-    public void testIntSendReceive() {
-        try {
-            final String jmsQueueName = "testIntSendReceive";
-
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()) + jmsQueueName)
-                    .withDestinationName(jmsQueueName)
-                    .withConnectionProperties(connectionProperties());
-
-            connectorPropertyFactory.log();
-            getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-            JMSClient.produceMessages(jmsClient, jmsQueueName, expect);
+    public void testIntSendReceive() throws ExecutionException, InterruptedException {
+        final String jmsQueueName = "testIntSendReceive";
 
-            LOG.debug("Creating the consumer ...");
-            KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-            kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()) + "testIntSendReceive", this::checkRecord);
-            LOG.debug("Created the consumer ...");
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withDestinationName(jmsQueueName)
+                .withConnectionProperties(connectionProperties());
 
-            assertEquals(received, expect, "Didn't process the expected amount of messages");
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+        KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        IntegerMessageConsumer consumer = new IntegerMessageConsumer(kafkaClient, topicName, expect);
 
+        runTest(connectorPropertyFactory, consumer, () -> JMSClient.produceMessages(jmsClient, jmsQueueName, expect));
     }
 
 
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
index 0603a78..6b95304 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
@@ -20,44 +20,65 @@ package org.apache.camel.kafkaconnector.sjms2.source;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
 import org.apache.camel.test.infra.messaging.services.MessagingService;
 import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
+public class CamelSourceJMSWithAggregation extends CamelSourceTestSupport {
     @RegisterExtension
     public static MessagingService jmsService = MessagingServiceBuilder
             .newBuilder(DispatchRouterContainer::new)
             .withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
             .build();
 
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);
-
-    private int received;
     private final int sentSize = 10;
     private final int expect = 1;
     private JMSClient jmsClient;
-    private String receivedMessage = "";
     private String expectedMessage = "";
     private String queueName;
+    private String topicName;
+
+    class GreedyConsumer extends StringMessageConsumer {
+
+        public GreedyConsumer(KafkaClient<String, String> kafkaClient, String topicName, int count) {
+            super(kafkaClient, topicName, count);
+        }
+
+        @Override
+        public void consumeMessages() {
+            int retries = 10;
+
+            do {
+                kafkaClient.consumeAvailable(super.topicName, super::checkRecord);
+                if (consumedMessages().size() == 0) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+
+            } while (consumedMessages().size() == 0);
+        }
+    }
 
     private Properties connectionProperties() {
         Properties properties = new Properties();
@@ -73,9 +94,8 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
         return new String[] {"camel-sjms2-kafka-connector"};
     }
 
-    @BeforeEach
-    public void setUp() {
-        received = 0;
+    @BeforeAll
+    public void setupClient() {
         jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
 
         for (int i = 0; i < sentSize - 1; i++) {
@@ -83,53 +103,55 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
         }
 
         expectedMessage += "hello;";
-        queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
     }
 
-    private void checkRecord(ConsumerRecord<String, String> record) {
-        receivedMessage += record.value();
-        LOG.debug("Received: {}", receivedMessage);
+    @BeforeEach
+    public void setUp() {
+        topicName = getTopicForTest(this);
 
-        received++;
+        queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
     }
 
-    private static String textToSend(Integer i) {
-        return "hello;";
+    @Override
+    protected void produceTestData() {
+        JMSClient.produceMessages(jmsClient, queueName, sentSize,
+                CamelSourceJMSWithAggregation::textToSend);
     }
 
+    @Override
+    protected void verifyMessages(TestMessageConsumer<?> consumer) {
+        int received = consumer.consumedMessages().size();
 
-    public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
-        JMSClient.produceMessages(jmsClient, queueName, sentSize,
-                CamelSourceJMSWithAggregation::textToSend);
+        Object receivedObject = consumer.consumedMessages().get(0).value();
+        if (!(receivedObject instanceof String)) {
+            fail("Unexpected message type");
+        }
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
-        LOG.debug("Created the consumer ...");
+        String receivedMessage = (String) receivedObject;
 
         assertEquals(expect, received, "Didn't process the expected amount of messages");
         assertEquals(expectedMessage, receivedMessage, "The messages don't match");
     }
 
+
+    private static String textToSend(Integer i) {
+        return "hello;";
+    }
+
     @Test
     @Timeout(90)
-    public void testBasicSendReceive() {
-        try {
-            ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                    .withDestinationName(queueName)
-                    .withConnectionProperties(connectionProperties())
-                    .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
-                            1000);
-
-            runBasicStringTest(connectorPropertyFactory);
-        } catch (Exception e) {
-            LOG.error("JMS test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
+    public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+        ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+                .basic()
+                .withKafkaTopic(topicName)
+                .withDestinationName(queueName)
+                .withConnectionProperties(connectionProperties())
+                .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
+                        1000);
+
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+        GreedyConsumer greedyConsumer = new GreedyConsumer(kafkaClient, topicName, expect);
+
+        runTestBlocking(connectorPropertyFactory, greedyConsumer);
     }
 }