You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/12/09 07:42:41 UTC

[camel-kafka-connector] branch master updated: Added another test about removal of headers based on reg exp

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 314eee9  Added another test about removal of headers based on reg exp
314eee9 is described below

commit 314eee947e6f9c979a9c1ddbcd5516c3f51b6539
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Dec 9 07:31:47 2020 +0100

    Added another test about removal of headers based on reg exp
---
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 48 ++++++++++++++++++++++
 1 file changed, 48 insertions(+)

diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 74ebd32..047d858 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -195,6 +195,54 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testBodyAndHeadersExclusionsRegex() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        Byte myByte = new Byte("100");
+        Float myFloat = new Float("100");
+        Short myShort = new Short("100");
+        Double myDouble = new Double("100");
+        int myInteger = 100;
+        Long myLong = new Long("100");
+        BigDecimal myBigDecimal = new BigDecimal(1234567890);
+        Schema schema = Decimal.schema(myBigDecimal.scale());
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+        record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
+        record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte);
+        record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat);
+        record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort);
+        record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
+        record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
+        record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
+        record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+        records.add(record);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel", exchange.getMessage().getBody());
+        assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertNull(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+        assertNull(exchange.getIn().getHeader("MyByte", Byte.class));
+        assertNull(exchange.getIn().getHeader("MyFloat", Float.class));
+        assertNull(exchange.getIn().getHeader("MyShort", Short.class));
+        assertNull(exchange.getIn().getHeader("MyDouble", Double.class));
+        assertNull(exchange.getIn().getHeader("MyInteger"));
+        assertNull(exchange.getIn().getHeader("MyLong", Long.class));
+        assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
+
+        sinkTask.stop();
+    }
 
     @Test
     public void testBodyAndProperties() {