You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/08 04:39:01 UTC
[camel-kafka-connector] branch master updated: [core] use Endpoint
instead of URI string to send exchnages using the producer template
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new aa8dfb9 [core] use Endpoint instead of URI string to send exchnages using the producer template
aa8dfb9 is described below
commit aa8dfb994ab51fc8192562328453a52ef852fc6f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 7 23:10:44 2020 +0200
[core] use Endpoint instead of URI string to send exchnages using the producer template
---
.../main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 3ac39e6..2a97de4 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
@@ -57,6 +58,7 @@ public class CamelSinkTask extends SinkTask {
private CamelKafkaConnectMain cms;
private ProducerTemplate producer;
private CamelSinkConnectorConfig config;
+ private Endpoint localEndpoint;
@Override
public String version() {
@@ -93,9 +95,12 @@ public class CamelSinkTask extends SinkTask {
.withAggregationTimeout(timeout)
.build(camelContext);
- producer = cms.getProducerTemplate();
cms.start();
+
+ producer = cms.getProducerTemplate();
+ localEndpoint = cms.getCamelContext().getEndpoint(LOCAL_URL);
+
LOG.info("CamelSinkTask connector task started");
} catch (Exception e) {
throw new ConnectException("Failed to create and start Camel context", e);
@@ -136,7 +141,7 @@ public class CamelSinkTask extends SinkTask {
exchange.getMessage().setBody(record.value());
LOG.debug("Sending exchange {} to {}", exchange.getExchangeId(), LOCAL_URL);
- producer.send(LOCAL_URL, exchange);
+ producer.send(localEndpoint, exchange);
if (exchange.isFailed()) {
throw new ConnectException("Exchange delivery has failed!", exchange.getException());