You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/14 09:07:02 UTC

[camel-kafka-connector] 02/02: Removed TOPICS_CONF from SinkConnectorConfig it's already coming from Kafka Connect Config

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch topics.regex
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 936ffc061ffb4b731a3027a19ee09c482cd01bfe
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Sep 14 09:17:19 2020 +0200

    Removed TOPICS_CONF from SinkConnectorConfig it's already coming from Kafka Connect Config
---
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  5 ----
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 33 +++++++++++-----------
 2 files changed, 17 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 57f626a..9b4b2df 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -43,10 +43,6 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
     public static final String CAMEL_SINK_URL_DOC = "The camel url to configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
             + " and all the properties starting with " + CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
 
-    public static final String TOPIC_DEFAULT = null;
-    public static final String TOPIC_CONF = "topics";
-    public static final String TOPIC_DOC = "A list of topics to use as input for this connector";
-
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT = LoggingLevel.OFF.toString();
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = "camel.sink.contentLogLevel";
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
@@ -65,7 +61,6 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
 
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
-        .define(TOPIC_CONF, Type.STRING, TOPIC_DEFAULT, Importance.HIGH, TOPIC_DOC)
         .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
         .define(CAMEL_SINK_UNMARSHAL_CONF, Type.STRING, CAMEL_SINK_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_UNMARSHAL_DOC)
         .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index f4bb812..84df83d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -42,11 +42,12 @@ public class CamelSinkTaskTest {
     private static final String SEDA_URI = "seda:test";
     private static final String TOPIC_NAME = "my-topic";
     private static final long RECEIVE_TIMEOUT = 1_000;
+    private static final String TOPIC_CONF = "topics";
 
     @Test
     public void testOnlyBody() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -70,7 +71,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndHeaders() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -117,7 +118,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndProperties() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -160,7 +161,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndPropertiesHeadersMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -217,7 +218,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndHeadersMap() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -271,7 +272,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndPropertiesHeadersMapMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -346,7 +347,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndHeadersList() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -396,7 +397,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testBodyAndPropertiesHeadersListMixed() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -465,7 +466,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testUrlPrecedenceOnComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "shouldNotBeUsed");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
@@ -490,7 +491,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testOnlyBodyUsingComponentProperty() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
@@ -516,7 +517,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testOnlyBodyUsingMultipleComponentProperties() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
@@ -544,7 +545,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testIfExchangeFailsShouldThrowConnectException() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         // we use a dummy component sink in order fail the exchange delivery
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "direct");
         props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
@@ -564,7 +565,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testAggregationBody() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
@@ -599,7 +600,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testAggregationBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5");
@@ -635,7 +636,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testSecretRaw() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "se+ret");
         props.put("camel.sink.endpoint.accessKey", "MoreSe+ret$");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
@@ -649,7 +650,7 @@ public class CamelSinkTaskTest {
     @Test
     public void testSecretRawReference() {
         Map<String, String> props = new HashMap<>();
-        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
         props.put("camel.sink.endpoint.accessKey", "#property:myAccessKey");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");