You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/12/09 07:42:41 UTC
[camel-kafka-connector] branch master updated: Added another test
about removal of headers based on reg exp
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 314eee9 Added another test about removal of headers based on reg exp
314eee9 is described below
commit 314eee947e6f9c979a9c1ddbcd5516c3f51b6539
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Dec 9 07:31:47 2020 +0100
Added another test about removal of headers based on reg exp
---
.../camel/kafkaconnector/CamelSinkTaskTest.java | 48 ++++++++++++++++++++++
1 file changed, 48 insertions(+)
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 74ebd32..047d858 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -195,6 +195,54 @@ public class CamelSinkTaskTest {
sinkTask.stop();
}
+
+ @Test
+ public void testBodyAndHeadersExclusionsRegex() {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+ props.put(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF, "My*");
+
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+
+ Byte myByte = new Byte("100");
+ Float myFloat = new Float("100");
+ Short myShort = new Short("100");
+ Double myDouble = new Double("100");
+ int myInteger = 100;
+ Long myLong = new Long("100");
+ BigDecimal myBigDecimal = new BigDecimal(1234567890);
+ Schema schema = Decimal.schema(myBigDecimal.scale());
+
+ List<SinkRecord> records = new ArrayList<SinkRecord>();
+ SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
+ record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
+ record.headers().addByte(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyByte", myByte);
+ record.headers().addFloat(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyFloat", myFloat);
+ record.headers().addShort(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyShort", myShort);
+ record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble);
+ record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
+ record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
+ record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+ records.add(record);
+ sinkTask.put(records);
+
+ ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+ Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+ assertEquals("camel", exchange.getMessage().getBody());
+ assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+ assertNull(exchange.getIn().getHeader("MyBoolean", Boolean.class));
+ assertNull(exchange.getIn().getHeader("MyByte", Byte.class));
+ assertNull(exchange.getIn().getHeader("MyFloat", Float.class));
+ assertNull(exchange.getIn().getHeader("MyShort", Short.class));
+ assertNull(exchange.getIn().getHeader("MyDouble", Double.class));
+ assertNull(exchange.getIn().getHeader("MyInteger"));
+ assertNull(exchange.getIn().getHeader("MyLong", Long.class));
+ assertNull(exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
+
+ sinkTask.stop();
+ }
@Test
public void testBodyAndProperties() {