You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/02/26 11:28:08 UTC

[camel] branch master updated: Removed brokers property from kafka component (can still be setted in configurations) details here: https://issues.apache.org/jira/browse/CAMEL-14615 fixed CAMEL-14615: camel kafka starter for spring does not consider camel.component.kafka.brokers property.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new eb790e4  Removed brokers property from kafka component (can still be setted in configurations) details here: https://issues.apache.org/jira/browse/CAMEL-14615 fixed CAMEL-14615: camel kafka starter for spring does not consider camel.component.kafka.brokers property.
     new 0be7fe4  Merge pull request #3600 from valdar/CAMEL-14615
eb790e4 is described below

commit eb790e49a6297746e6d2b628e4fb4d1d1aab3f02
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Tue Feb 25 17:49:12 2020 +0100

    Removed brokers property from kafka component (can still be setted in configurations) details here: https://issues.apache.org/jira/browse/CAMEL-14615
    fixed CAMEL-14615: camel kafka starter for spring does not consider camel.component.kafka.brokers property.
---
 .../component/kafka/KafkaComponentConfigurer.java   |  1 -
 .../org/apache/camel/component/kafka/kafka.json     |  1 -
 .../camel-kafka/src/main/docs/kafka-component.adoc  |  3 +--
 .../camel/component/kafka/KafkaComponent.java       | 21 +--------------------
 .../apache/camel/component/kafka/KafkaConsumer.java | 10 +---------
 .../apache/camel/component/kafka/KafkaProducer.java |  6 +-----
 .../component/kafka/BaseEmbeddedKafkaTest.java      |  2 +-
 .../camel/component/kafka/KafkaComponentTest.java   |  4 ++--
 .../camel/component/kafka/KafkaProducerTest.java    |  2 +-
 .../component/dsl/KafkaComponentBuilderFactory.java | 15 ---------------
 .../component/ComponentsBuilderFactoryTest.java     | 15 ++++++++-------
 11 files changed, 16 insertions(+), 64 deletions(-)

diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
index 99427f8..550a57c 100644
--- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
+++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java
@@ -15,7 +15,6 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen
     public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) {
         KafkaComponent target = (KafkaComponent) obj;
         switch (ignoreCase ? name.toLowerCase() : name) {
-        case "brokers": target.setBrokers(property(camelContext, java.lang.String.class, value)); return true;
         case "configuration": target.setConfiguration(property(camelContext, org.apache.camel.component.kafka.KafkaConfiguration.class, value)); return true;
         case "allowmanualcommit":
         case "allowManualCommit": target.setAllowManualCommit(property(camelContext, boolean.class, value)); return true;
diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
index 8049724..eb37010 100644
--- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
+++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json
@@ -19,7 +19,6 @@
     "version": "3.1.0-SNAPSHOT"
   },
   "componentProperties": {
-    "brokers": { "kind": "property", "displayName": "Brokers", "group": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "secret": false, "description": "URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation." },
     "configuration": { "kind": "property", "displayName": "Configuration", "group": "common", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaConfiguration", "deprecated": false, "secret": false, "description": "Allows to pre-configure the Kafka component with common options that the endpoints will reuse." },
     "allowManualCommit": { "kind": "property", "displayName": "Allow Manual Commit", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of KafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manu [...]
     "breakOnFirstError": { "kind": "property", "displayName": "Break On First Error", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": "false", "description": "This options controls what happens when a consumer is processing an exchange and it fails. If the option is false then the consumer continues to the next message and processes it. If the option is true then the consumer brea [...]
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 7317930..9193e42 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -37,14 +37,13 @@ kafka:topic[?options]
 
 
 // component options: START
-The Kafka component supports 10 options, which are listed below.
+The Kafka component supports 9 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *brokers* (common) | URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers. This option is known as bootstrap.servers in the Kafka documentation. |  | String
 | *configuration* (common) | Allows to pre-configure the Kafka component with common options that the endpoints will reuse. |  | KafkaConfiguration
 | *allowManualCommit* (consumer) | Whether to allow doing manual commits via KafkaManualCommit. If this option is enabled then an instance of KafkaManualCommit is stored on the Exchange message header, which allows end users to access this API and perform manual offset commits via the Kafka consumer. | false | boolean
 | *breakOnFirstError* (consumer) | This options controls what happens when a consumer is processing an exchange and it fails. If the option is false then the consumer continues to the next message and processes it. If the option is true then the consumer breaks out, and will seek back to offset of the message that caused a failure, and then re-attempt to process this message. However this can lead to endless processing of the same message if its bound to fail every time, eg a poison mess [...]
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index e6d53da..b5c7334 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -31,7 +31,7 @@ import org.apache.camel.util.PropertiesHelper;
 @Component("kafka")
 public class KafkaComponent extends DefaultComponent implements SSLContextParametersAware {
 
-    private KafkaConfiguration configuration;
+    private KafkaConfiguration configuration = new KafkaConfiguration();
 
     @Metadata(label = "advanced")
     private ExecutorService workerPool;
@@ -102,25 +102,6 @@ public class KafkaComponent extends DefaultComponent implements SSLContextParame
         this.configuration = configuration;
     }
 
-    public String getBrokers() {
-        return configuration != null ? configuration.getBrokers() : null;
-    }
-
-    /**
-     * URL of the Kafka brokers to use. The format is host1:port1,host2:port2,
-     * and the list can be a subset of brokers or a VIP pointing to a subset of
-     * brokers.
-     * <p/>
-     * This option is known as <tt>bootstrap.servers</tt> in the Kafka
-     * documentation.
-     */
-    public void setBrokers(String brokers) {
-        if (configuration == null) {
-            configuration = new KafkaConfiguration();
-        }
-        configuration.setBrokers(brokers);
-    }
-
     public ExecutorService getWorkerPool() {
         return workerPool;
     }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 01ec0e0..622f0c7 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -72,11 +72,7 @@ public class KafkaConsumer extends DefaultConsumer {
         this.processor = processor;
         this.pollTimeoutMs = endpoint.getConfiguration().getPollTimeoutMs();
 
-        // brokers can be configured on endpoint or component level
         String brokers = endpoint.getConfiguration().getBrokers();
-        if (brokers == null) {
-            brokers = endpoint.getComponent().getBrokers();
-        }
         if (ObjectHelper.isEmpty(brokers)) {
             throw new IllegalArgumentException("Brokers must be configured");
         }
@@ -86,13 +82,9 @@ public class KafkaConsumer extends DefaultConsumer {
         Properties props = endpoint.getConfiguration().createConsumerProperties();
         endpoint.updateClassProperties(props);
 
-        // brokers can be configured on endpoint or component level
         String brokers = endpoint.getConfiguration().getBrokers();
         if (brokers == null) {
-            brokers = endpoint.getComponent().getBrokers();
-        }
-        if (brokers == null) {
-            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option on either the component or endpoint.");
+            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
         }
 
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 61af9e0..dd15bba 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -70,13 +70,9 @@ public class KafkaProducer extends DefaultAsyncProducer {
         Properties props = endpoint.getConfiguration().createProducerProperties();
         endpoint.updateClassProperties(props);
 
-        // brokers can be configured on endpoint or component level
         String brokers = endpoint.getConfiguration().getBrokers();
         if (brokers == null) {
-            brokers = endpoint.getComponent().getBrokers();
-        }
-        if (brokers == null) {
-            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option on either the component or endpoint.");
+            throw new IllegalArgumentException("URL to the Kafka brokers must be configured with the brokers option.");
         }
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index 3fc0750..3ced492 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -62,7 +62,7 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
 
         KafkaComponent kafka = new KafkaComponent(context);
         kafka.init();
-        kafka.setBrokers(kafkaBroker.getBootstrapServers());
+        kafka.getConfiguration().setBrokers(kafkaBroker.getBootstrapServers());
         context.addComponent("kafka", kafka);
 
         return context;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
index 324bf2a..ad2aaeb 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java
@@ -51,13 +51,13 @@ public class KafkaComponentTest extends CamelTestSupport {
     @Test
     public void testBrokersOnComponent() throws Exception {
         KafkaComponent kafka = context.getComponent("kafka", KafkaComponent.class);
-        kafka.setBrokers("broker1:12345,broker2:12566");
+        kafka.getConfiguration().setBrokers("broker1:12345,broker2:12566");
 
         String uri = "kafka:mytopic?partitioner=com.class.Party";
 
         KafkaEndpoint endpoint = context.getEndpoint(uri, KafkaEndpoint.class);
         assertEquals("broker1:12345,broker2:12566", endpoint.getConfiguration().getBrokers());
-        assertEquals("broker1:12345,broker2:12566", endpoint.getComponent().getBrokers());
+        assertEquals("broker1:12345,broker2:12566", endpoint.getComponent().getConfiguration().getBrokers());
         assertEquals("mytopic", endpoint.getConfiguration().getTopic());
         assertEquals("com.class.Party", endpoint.getConfiguration().getPartitioner());
     }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 6bfe695..57ee16c 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -72,7 +72,7 @@ public class KafkaProducerTest {
     @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws Exception {
         KafkaComponent kafka = new KafkaComponent(new DefaultCamelContext());
-        kafka.setBrokers("broker1:1234,broker2:4567");
+        kafka.getConfiguration().setBrokers("broker1:1234,broker2:4567");
         kafka.init();
 
         endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new HashMap());
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
index 7ec62e9..63fbebe 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java
@@ -49,20 +49,6 @@ public interface KafkaComponentBuilderFactory {
      */
     interface KafkaComponentBuilder extends ComponentBuilder<KafkaComponent> {
         /**
-         * URL of the Kafka brokers to use. The format is
-         * host1:port1,host2:port2, and the list can be a subset of brokers or a
-         * VIP pointing to a subset of brokers. This option is known as
-         * bootstrap.servers in the Kafka documentation.
-         * 
-         * The option is a: <code>java.lang.String</code> type.
-         * 
-         * Group: common
-         */
-        default KafkaComponentBuilder brokers(java.lang.String brokers) {
-            doSetProperty("brokers", brokers);
-            return this;
-        }
-        /**
          * Allows to pre-configure the Kafka component with common options that
          * the endpoints will reuse.
          * 
@@ -232,7 +218,6 @@ public interface KafkaComponentBuilderFactory {
                 String name,
                 Object value) {
             switch (name) {
-            case "brokers": ((KafkaComponent) component).setBrokers((java.lang.String) value); return true;
             case "configuration": ((KafkaComponent) component).setConfiguration((org.apache.camel.component.kafka.KafkaConfiguration) value); return true;
             case "allowManualCommit": ((KafkaComponent) component).setAllowManualCommit((boolean) value); return true;
             case "breakOnFirstError": ((KafkaComponent) component).setBreakOnFirstError((boolean) value); return true;
diff --git a/core/camel-componentdsl/src/test/java/org/apache/camel/builder/component/ComponentsBuilderFactoryTest.java b/core/camel-componentdsl/src/test/java/org/apache/camel/builder/component/ComponentsBuilderFactoryTest.java
index 1e08f76..9df4c38 100644
--- a/core/camel-componentdsl/src/test/java/org/apache/camel/builder/component/ComponentsBuilderFactoryTest.java
+++ b/core/camel-componentdsl/src/test/java/org/apache/camel/builder/component/ComponentsBuilderFactoryTest.java
@@ -46,15 +46,16 @@ public class ComponentsBuilderFactoryTest extends ContextTestSupport {
     public void testIfResolvePlaceholdersCorrectly() {
         context.getPropertiesComponent().setLocation("classpath:application.properties");
 
+        //TODO: this test needs to be reverted after resolving issues.apache.org/jira/browse/CAMEL-14568
+        final KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
+        kafkaConfiguration.setBrokers("{{kafka.host}}:{{kafka.port}}");
+
         final KafkaComponent kafkaComponent = ComponentsBuilderFactory.kafka()
-                .brokers("{{kafka.host}}:{{kafka.port}}")
+                .configuration(kafkaConfiguration)
                 .build(context);
 
-        final KafkaConfiguration kafkaConfiguration = kafkaComponent.getConfiguration();
-
-        assertNotNull(kafkaComponent);
+        assertNotNull(kafkaComponent.getConfiguration());
         assertEquals("localhost:9092", kafkaConfiguration.getBrokers());
-        assertEquals("localhost:9092", kafkaComponent.getBrokers());
     }
 
     @Test
@@ -62,16 +63,16 @@ public class ComponentsBuilderFactoryTest extends ContextTestSupport {
         final KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
         kafkaConfiguration.setGroupId("testGroup");
         kafkaConfiguration.setConsumerRequestTimeoutMs(5000);
+        kafkaConfiguration.setBrokers("localhost:9092");
 
         final KafkaComponent kafkaComponent = ComponentsBuilderFactory.kafka()
                 .allowManualCommit(true)
                 .configuration(kafkaConfiguration)
-                .brokers("localhost:9092")
                 .build();
 
         assertNotNull(kafkaComponent);
 
-        assertEquals("localhost:9092", kafkaComponent.getBrokers());
+        assertEquals("localhost:9092", kafkaComponent.getConfiguration().getBrokers());
         assertTrue(kafkaComponent.isAllowManualCommit());
 
         assertEquals("testGroup", kafkaComponent.getConfiguration().getGroupId());