You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/14 09:10:09 UTC
[camel-kafka-connector] branch topics.regex updated: Topics.regex:
Added unit test
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch topics.regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/topics.regex by this push:
new 0618b2e Topics.regex: Added unit test
0618b2e is described below
commit 0618b2ee6ac5c513628d9b71356f52d4a9f452d3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 14 11:09:36 2020 +0200
Topics.regex: Added unit test
---
.../camel/kafkaconnector/CamelSinkTaskTest.java | 29 ++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 84df83d..a050943 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -67,6 +67,35 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
+
+ @Test
+ public void testTopicsRegex() {
+ Map<String, String> props = new HashMap<>();
+ props.put("topics.regex", "topic1*");
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord("topic1", 1, null, "test", null, "camel", 42);
+ SinkRecord record1 = new SinkRecord("topic12", 1, null, "test", null, "cameltopicregex", 42);
+ records.add(record);
+ records.add(record1);
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+ .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+ Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("cameltopicregex", exchange1.getMessage().getBody());
+ assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+
+ sinkTask.stop();
+ }
@Test
public void testBodyAndHeaders() {