You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/12/09 06:35:37 UTC

[camel-kafka-connector] branch test-regex created (now 55c9c3a)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch test-regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 55c9c3a  Added another test about removal of headers based on reg exp

This branch includes the following new commits:

     new 55c9c3a  Added another test about removal of headers based on reg exp

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 01/01: Added another test about removal of headers based on reg exp

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch test-regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 55c9c3a4f4722b9f91430efc78ed94f58df530e2
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Dec 9 07:31:47 2020 +0100

    Added another test about removal of headers based on reg exp
---
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 48 ++++++++++++++++++++++
 1 file changed, 48 insertions(+)

diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 74ebd32..047d858 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -195,6 +195,54 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndHeadersExclusionsRegex() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        BigDecimal myBigDecimal = new BigDecimal(1234567890);
+        Schema schema = Decimal.schema(myBigDecimal.scale());
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
+        record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte);
+        record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat);
+        record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort);
+        record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
+        record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
+        record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+        records.add(record);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertNull(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertNull(exchange.getIn().getHeader("MyByte", Byte.class));
+        assertNull(exchange.getIn().getHeader("MyFloat", Float.class));
+        assertNull(exchange.getIn().getHeader("MyShort", Short.class));
+        assertNull(exchange.getIn().getHeader("MyDouble", Double.class));
+        assertNull(exchange.getIn().getHeader("MyInteger"));
+        assertNull(exchange.getIn().getHeader("MyLong", Long.class));
+        assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
+
+        sinkTask.stop();
+    }
 
     @Test
     public void testBodyAndProperties() {