You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/08 04:39:01 UTC

[camel-kafka-connector] branch master updated: [core] use Endpoint instead of URI string to send exchnages using the producer template

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new aa8dfb9  [core] use Endpoint instead of URI string to send exchnages using the producer template
aa8dfb9 is described below

commit aa8dfb994ab51fc8192562328453a52ef852fc6f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 23:10:44 2020 +0200

    [core] use Endpoint instead of URI string to send exchnages using the producer template
---
 .../main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 3ac39e6..2a97de4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
@@ -57,6 +58,7 @@ public class CamelSinkTask extends SinkTask {
     private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
     private CamelSinkConnectorConfig config;
+    private Endpoint localEndpoint;
 
     @Override
     public String version() {
@@ -93,9 +95,12 @@ public class CamelSinkTask extends SinkTask {
                 .withAggregationTimeout(timeout)
                 .build(camelContext);
 
-            producer = cms.getProducerTemplate();
 
             cms.start();
+
+            producer = cms.getProducerTemplate();
+            localEndpoint = cms.getCamelContext().getEndpoint(LOCAL_URL);
+
             LOG.info("CamelSinkTask connector task started");
         } catch (Exception e) {
             throw new ConnectException("Failed to create and start Camel context", e);
@@ -136,7 +141,7 @@ public class CamelSinkTask extends SinkTask {
             exchange.getMessage().setBody(record.value());
 
             LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL);
-            producer.send(LOCAL_URL, exchange);
+            producer.send(localEndpoint, exchange);
 
             if (exchange.isFailed()) {
                 throw new ConnectException("Exchange delivery has failed!", exchange.getException());