You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/14 09:10:09 UTC

[camel-kafka-connector] branch topics.regex updated: Topics.regex: Added unit test

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch topics.regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/topics.regex by this push:
     new 0618b2e  Topics.regex: Added unit test
0618b2e is described below

commit 0618b2ee6ac5c513628d9b71356f52d4a9f452d3
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 14 11:09:36 2020 +0200

    Topics.regex: Added unit test
---
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 84df83d..a050943 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -67,6 +67,35 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testTopicsRegex() {
+        Map<String, String> props = new HashMap<>();
+        props.put("topics.regex", "topic1*");
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord("topic1", 1, null, "test", null, "camel", 42);
+        SinkRecord record1 = new SinkRecord("topic12", 1, null, "test", null, "cameltopicregex", 42);
+        records.add(record);
+        records.add(record1);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+            .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+        Exchange exchange1 = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("cameltopicregex", exchange1.getMessage().getBody());
+        assertEquals("test", exchange1.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+
+        sinkTask.stop();
+    }
 
     @Test
     public void testBodyAndHeaders() {