You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/02/10 10:11:48 UTC
[camel-kafka-connector] 08/14: Converted the SJMS2 source test
cases to use the reusable source base class
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e95b9fb3bd2f1874625fa81587a0400b52ce505f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Feb 8 21:21:28 2021 +0100
Converted the SJMS2 source test cases to use the reusable source base class
---
.../common/test/AbstractTestMessageConsumer.java | 8 +-
.../common/test/CamelSourceTestSupport.java | 45 +++++++-
.../common/test/IntegerMessageConsumer.java | 29 +++++
.../sjms2/source/CamelSourceJMSITCase.java | 127 ++++++++-------------
.../source/CamelSourceJMSWithAggregation.java | 110 +++++++++++-------
5 files changed, 186 insertions(+), 133 deletions(-)
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
index 2fcf42f..744abbf 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageConsumer.java
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsumer<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTestMessageConsumer.class);
- private final KafkaClient<String, T> kafkaClient;
- private final String topicName;
+ protected final KafkaClient<String, T> kafkaClient;
+ protected final String topicName;
private final int count;
private final List<ConsumerRecord<String, T>> receivedMessages;
private volatile int received;
@@ -42,7 +42,7 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu
receivedMessages = new ArrayList<>(count);
}
- private boolean checkRecord(ConsumerRecord<String, T> record) {
+ public boolean checkRecord(ConsumerRecord<String, T> record) {
LOG.debug("Received: {}", record.value());
received++;
receivedMessages.add(record);
@@ -63,4 +63,6 @@ public abstract class AbstractTestMessageConsumer<T> implements TestMessageConsu
public List<ConsumerRecord<String, T>> consumedMessages() {
return receivedMessages;
}
+
+
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
index 35626a3..7f8b03c 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/CamelSourceTestSupport.java
@@ -33,7 +33,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
protected abstract void verifyMessages(TestMessageConsumer<?> consumer);
/**
- * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+ * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
*
* @param connectorPropertyFactory A factory for connector properties
* @param topic the topic to send the messages to
@@ -49,7 +49,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
/**
- * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+ * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
*
* @param connectorPropertyFactory A factory for connector properties
* @param consumer A Kafka consumer consumer for the test messages
@@ -60,7 +60,7 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
}
/**
- * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+ * A simple test runner that follows the steps: initialize, start producer, consume messages, verify results
*
* @param connectorPropertyFactory A factory for connector properties
* @param consumer A Kafka consumer consumer for the test messages
@@ -71,7 +71,6 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
LOG.debug("Initialized the connector and put the data for the test execution");
-// getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
@@ -86,4 +85,42 @@ public abstract class CamelSourceTestSupport extends AbstractKafkaTest {
LOG.debug("Verified messages");
}
+ /**
+ * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
+ * @param consumer A Kafka consumer consumer for the test messages
+ * @throws Exception For test-specific exceptions
+ */
+ public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer) throws ExecutionException, InterruptedException {
+ runTestBlocking(connectorPropertyFactory, consumer, this::produceTestData);
+ }
+
+ /**
+ * A simple blocking test runner that follows the steps: initialize, start producer, consume messages, verify results
+ *
+ * @param connectorPropertyFactory A factory for connector properties
+ * @param consumer A Kafka consumer consumer for the test messages
+ * @param producer A producer for the test messages
+ * @throws Exception For test-specific exceptions
+ */
+ public void runTestBlocking(ConnectorPropertyFactory connectorPropertyFactory, TestMessageConsumer<?> consumer,
+ FunctionalTestMessageProducer producer) throws ExecutionException, InterruptedException {
+ connectorPropertyFactory.log();
+ LOG.debug("Initialized the connector and put the data for the test execution");
+ getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+ LOG.debug("Producing test data to be collected by the connector and sent to Kafka");
+ producer.produceMessages();
+
+ LOG.debug("Creating the Kafka consumer ...");
+ consumer.consumeMessages();
+ LOG.debug("Ran the Kafka consumer ...");
+
+ LOG.debug("Verifying messages");
+ verifyMessages(consumer);
+ LOG.debug("Verified messages");
+ }
+
+
}
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java
new file mode 100644
index 0000000..a49e00e
--- /dev/null
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/IntegerMessageConsumer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.common.test;
+
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+
+/**
+ * A consumer that receives the 'count' amount of text messages from the Kafka broker
+ */
+public class IntegerMessageConsumer extends AbstractTestMessageConsumer<Integer> {
+ public IntegerMessageConsumer(KafkaClient<String, Integer> kafkaClient, String topicName, int count) {
+ super(kafkaClient, topicName, count);
+ }
+}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 5729c15..781e029 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -20,26 +20,24 @@ package org.apache.camel.kafkaconnector.sjms2.source;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.IntegerMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
import org.apache.camel.test.infra.messaging.services.MessagingService;
import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
/**
@@ -47,16 +45,14 @@ import static org.junit.jupiter.api.Assertions.fail;
* messages
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceJMSITCase extends AbstractKafkaTest {
+public class CamelSourceJMSITCase extends CamelSourceTestSupport {
@RegisterExtension
public static MessagingService jmsService = MessagingServiceBuilder
.newBuilder(DispatchRouterContainer::new)
.withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
.build();
- private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);
-
- private int received;
+ private String topicName;
private final int expect = 10;
private JMSClient jmsClient;
@@ -76,101 +72,68 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
- received = 0;
- jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
- }
-
- private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
- LOG.debug("Received: {}", record.value());
- received++;
-
- if (received == expect) {
- return false;
- }
+ topicName = getTopicForTest(this);
- return true;
}
+ @BeforeAll
+ public void setupClient() {
+ jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
+ }
-
- public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
+ @Override
+ protected void produceTestData() {
JMSClient.produceMessages(jmsClient, SJMS2Common.DEFAULT_JMS_QUEUE, expect, "Test string message");
+ }
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
-
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
assertEquals(received, expect, "Didn't process the expected amount of messages");
}
+
@Test
@Timeout(90)
- public void testBasicSendReceive() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
- .basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
- .withConnectionProperties(connectionProperties());
-
- runBasicStringTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("JMS test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withDestinationName(SJMS2Common.DEFAULT_JMS_QUEUE)
+ .withConnectionProperties(connectionProperties());
+
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@Timeout(90)
- public void testBasicSendReceiveUsingUrl() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
- .basic()
- .withConnectionProperties(connectionProperties())
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
- .buildUrl();
-
- runBasicStringTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("JMS test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+ .basic()
+ .withConnectionProperties(connectionProperties())
+ .withKafkaTopic(topicName)
+ .withUrl(SJMS2Common.DEFAULT_JMS_QUEUE)
+ .buildUrl();
+
+ runTest(connectorPropertyFactory, topicName, expect);
}
@Test
@Timeout(90)
- public void testIntSendReceive() {
- try {
- final String jmsQueueName = "testIntSendReceive";
-
- ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
- .basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()) + jmsQueueName)
- .withDestinationName(jmsQueueName)
- .withConnectionProperties(connectionProperties());
-
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
- JMSClient.produceMessages(jmsClient, jmsQueueName, expect);
+ public void testIntSendReceive() throws ExecutionException, InterruptedException {
+ final String jmsQueueName = "testIntSendReceive";
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()) + "testIntSendReceive", this::checkRecord);
- LOG.debug("Created the consumer ...");
+ ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withDestinationName(jmsQueueName)
+ .withConnectionProperties(connectionProperties());
- assertEquals(received, expect, "Didn't process the expected amount of messages");
- } catch (Exception e) {
- LOG.error("JMS test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ KafkaClient<String, Integer> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ IntegerMessageConsumer consumer = new IntegerMessageConsumer(kafkaClient, topicName, expect);
+ runTest(connectorPropertyFactory, consumer, () -> JMSClient.produceMessages(jmsClient, jmsQueueName, expect));
}
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
index 0603a78..6b95304 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSWithAggregation.java
@@ -20,44 +20,65 @@ package org.apache.camel.kafkaconnector.sjms2.source;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport;
+import org.apache.camel.kafkaconnector.common.test.StringMessageConsumer;
+import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
import org.apache.camel.test.infra.dispatch.router.services.DispatchRouterContainer;
import org.apache.camel.test.infra.messaging.services.MessagingService;
import org.apache.camel.test.infra.messaging.services.MessagingServiceBuilder;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
+public class CamelSourceJMSWithAggregation extends CamelSourceTestSupport {
@RegisterExtension
public static MessagingService jmsService = MessagingServiceBuilder
.newBuilder(DispatchRouterContainer::new)
.withEndpointProvider(DispatchRouterContainer::defaultEndpoint)
.build();
- private static final Logger LOG = LoggerFactory.getLogger(CamelSourceJMSITCase.class);
-
- private int received;
private final int sentSize = 10;
private final int expect = 1;
private JMSClient jmsClient;
- private String receivedMessage = "";
private String expectedMessage = "";
private String queueName;
+ private String topicName;
+
+ class GreedyConsumer extends StringMessageConsumer {
+
+ public GreedyConsumer(KafkaClient<String, String> kafkaClient, String topicName, int count) {
+ super(kafkaClient, topicName, count);
+ }
+
+ @Override
+ public void consumeMessages() {
+ int retries = 10;
+
+ do {
+ kafkaClient.consumeAvailable(super.topicName, super::checkRecord);
+ if (consumedMessages().size() == 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ } while (consumedMessages().size() == 0);
+ }
+ }
private Properties connectionProperties() {
Properties properties = new Properties();
@@ -73,9 +94,8 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
return new String[] {"camel-sjms2-kafka-connector"};
}
- @BeforeEach
- public void setUp() {
- received = 0;
+ @BeforeAll
+ public void setupClient() {
jmsClient = JMSClient.newClient(jmsService.defaultEndpoint());
for (int i = 0; i < sentSize - 1; i++) {
@@ -83,53 +103,55 @@ public class CamelSourceJMSWithAggregation extends AbstractKafkaTest {
}
expectedMessage += "hello;";
- queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
}
- private void checkRecord(ConsumerRecord<String, String> record) {
- receivedMessage += record.value();
- LOG.debug("Received: {}", receivedMessage);
+ @BeforeEach
+ public void setUp() {
+ topicName = getTopicForTest(this);
- received++;
+ queueName = SJMS2Common.DEFAULT_JMS_QUEUE + "." + TestUtils.randomWithRange(1, 100);
}
- private static String textToSend(Integer i) {
- return "hello;";
+ @Override
+ protected void produceTestData() {
+ JMSClient.produceMessages(jmsClient, queueName, sentSize,
+ CamelSourceJMSWithAggregation::textToSend);
}
+ @Override
+ protected void verifyMessages(TestMessageConsumer<?> consumer) {
+ int received = consumer.consumedMessages().size();
- public void runBasicStringTest(ConnectorPropertyFactory connectorPropertyFactory) throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
-
- JMSClient.produceMessages(jmsClient, queueName, sentSize,
- CamelSourceJMSWithAggregation::textToSend);
+ Object receivedObject = consumer.consumedMessages().get(0).value();
+ if (!(receivedObject instanceof String)) {
+ fail("Unexpected message type");
+ }
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consumeAvailable(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord);
- LOG.debug("Created the consumer ...");
+ String receivedMessage = (String) receivedObject;
assertEquals(expect, received, "Didn't process the expected amount of messages");
assertEquals(expectedMessage, receivedMessage, "The messages don't match");
}
+
+ private static String textToSend(Integer i) {
+ return "hello;";
+ }
+
@Test
@Timeout(90)
- public void testBasicSendReceive() {
- try {
- ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
- .basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withDestinationName(queueName)
- .withConnectionProperties(connectionProperties())
- .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
- 1000);
-
- runBasicStringTest(connectorPropertyFactory);
- } catch (Exception e) {
- LOG.error("JMS test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
+ public void testBasicSendReceive() throws ExecutionException, InterruptedException {
+ ConnectorPropertyFactory connectorPropertyFactory = CamelJMSPropertyFactory
+ .basic()
+ .withKafkaTopic(topicName)
+ .withDestinationName(queueName)
+ .withConnectionProperties(connectionProperties())
+ .withAggregate("org.apache.camel.kafkaconnector.aggregator.StringAggregator", sentSize,
+ 1000);
+
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+ GreedyConsumer greedyConsumer = new GreedyConsumer(kafkaClient, topicName, expect);
+
+ runTestBlocking(connectorPropertyFactory, greedyConsumer);
}
}