You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/14 09:07:00 UTC

[camel-kafka-connector] branch topics.regex created (now 936ffc0)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch topics.regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 936ffc0  Removed TOPICS_CONF from SinkConnectorConfig it's already coming from Kafka Connect Config

This branch includes the following new commits:

     new 14261c2  Support topics.regex
     new 936ffc0  Removed TOPICS_CONF from SinkConnectorConfig it's already coming from Kafka Connect Config

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel-kafka-connector] 02/02: Removed TOPICS_CONF from SinkConnectorConfig it's already coming from Kafka Connect Config

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch topics.regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 936ffc061ffb4b731a3027a19ee09c482cd01bfe
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 14 09:17:19 2020 +0200

    Removed TOPICS_CONF from SinkConnectorConfig it's already coming from Kafka Connect Config
---
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  5 ----
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 33 +++++++++++-----------
 2 files changed, 17 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 57f626a..9b4b2df 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -43,10 +43,6 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
     public static final String CAMEL_SINK_URL_DOC = "The camel url to configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
             + " and all the properties starting with " + CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
 
-    public static final String TOPIC_DEFAULT = null;
-    public static final String TOPIC_CONF = "topics";
-    public static final String TOPIC_DOC = "A list of topics to use as input for this connector";
-
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT = LoggingLevel.OFF.toString();
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = "camel.sink.contentLogLevel";
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
@@ -65,7 +61,6 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
 
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
-        .define(TOPIC_CONF, Type.STRING, TOPIC_DEFAULT, Importance.HIGH, TOPIC_DOC)
         .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
         .define(CAMEL_SINK_UNMARSHAL_CONF, Type.STRING, CAMEL_SINK_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_UNMARSHAL_DOC)
         .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index f4bb812..84df83d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -42,11 +42,12 @@ public class CamelSinkTaskTest {
     private static final String SEDA_URI = "seda:test";
     private static final String TOPIC_NAME = "my-topic";
     private static final long RECEIVE_TIMEOUT = 1_000;
+    private static final String TOPIC_CONF = "topics";
 
     @Test
     public void testOnlyBody() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -70,7 +71,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndHeaders() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -117,7 +118,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndProperties() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -160,7 +161,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndPropertiesHeadersMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -217,7 +218,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndHeadersMap() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -271,7 +272,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndPropertiesHeadersMapMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -346,7 +347,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndHeadersList() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -396,7 +397,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndPropertiesHeadersListMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -465,7 +466,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "shouldNotBeUsed");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
@@ -490,7 +491,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testOnlyBodyUsingComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
@@ -516,7 +517,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testOnlyBodyUsingMultipleComponentProperties() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
@@ -544,7 +545,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testIfExchangeFailsShouldThrowConnectException() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         // we use a dummy component sink in order fail the exchange delivery
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "direct");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
@@ -564,7 +565,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testAggregationBody() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
@@ -599,7 +600,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testAggregationBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
@@ -635,7 +636,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testSecretRaw() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "se+ret");
         props.put("camel.sink.endpoint.accessKey", "MoreSe+ret$");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
@@ -649,7 +650,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testSecretRawReference() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
         props.put("camel.sink.endpoint.accessKey", "#property:myAccessKey");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");


[camel-kafka-connector] 01/02: Support topics.regex

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch topics.regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 14261c250fd86ed28b97e23e9199e732b5d81b06
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 14 08:46:52 2020 +0200

    Support topics.regex
---
 .../java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index bfd5bbf..57f626a 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -43,7 +43,7 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
     public static final String CAMEL_SINK_URL_DOC = "The camel url to configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
             + " and all the properties starting with " + CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
 
-    public static final String TOPIC_DEFAULT = "test";
+    public static final String TOPIC_DEFAULT = null;
     public static final String TOPIC_CONF = "topics";
     public static final String TOPIC_DOC = "A list of topics to use as input for this connector";