You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/21 12:40:38 UTC
[camel-kafka-connector] 01/02: core: use EndpointUriFactory to
compute endpoint urls instead of the runtime catalog
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit c8e93dfb043f9586508f7cf2a86a7436592f0ad9
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 21 09:22:44 2020 +0200
core: use EndpointUriFactory to compute endpoint urls instead of the runtime catalog
---
core/pom.xml | 4 --
.../apache/camel/kafkaconnector/CamelSinkTask.java | 3 +-
.../camel/kafkaconnector/CamelSourceTask.java | 4 +-
.../camel/kafkaconnector/utils/TaskHelper.java | 57 ++++++++--------------
.../camel/kafkaconnector/CamelSinkTaskTest.java | 2 +
.../camel/kafkaconnector/utils/TaskHelperTest.java | 7 +--
6 files changed, 27 insertions(+), 50 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 61454c1..92787b1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,10 +41,6 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-core-catalog</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
<artifactId>camel-main</artifactId>
</dependency>
<dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index f410c50..4d66219 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -24,7 +24,6 @@ import java.util.Objects;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
@@ -86,7 +85,7 @@ public class CamelSinkTask extends SinkTask {
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
- remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+ remoteUrl = TaskHelper.buildUrl(camelContext,
actualProps,
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 9845013..aef600f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.PollingConsumer;
import org.apache.camel.impl.DefaultCamelContext;
@@ -96,7 +95,8 @@ public class CamelSourceTask extends SourceTask {
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
- remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), actualProps,
+ remoteUrl = TaskHelper.buildUrl(camelContext,
+ actualProps,
config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 87f545e..0251d66 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -21,12 +21,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
-import org.apache.camel.tooling.model.BaseOptionModel;
-import org.apache.camel.tooling.model.ComponentModel;
-import org.apache.camel.tooling.model.JsonMapper;
+import org.apache.camel.spi.EndpointUriFactory;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.slf4j.Logger;
@@ -38,7 +38,7 @@ public final class TaskHelper {
/**
* Try to build a url of a Camel {@link org.apache.camel.Endpoint}.
*
- * @param rcc RuntimeCamelCatalog used to build the url.
+ * @param camelContext the {@link CamelContext} used to retrieve an instance of a EndpointUriFactory for the given component schema.
* @param props properties used to build the url in the form of a key -> value {@link Map}.
* @param componentSchema the schema name of the Camel {@link org.apache.camel.Component} used to build the Camel {@link org.apache.camel.Endpoint} url.
* @param endpointPropertiesPrefix prefix of all the Camel {@link org.apache.camel.Endpoint} properties.
@@ -47,34 +47,25 @@ public final class TaskHelper {
* @return A String representing the built url.
* @throws {@link URISyntaxException} in case of uri build failure.
*/
- public static String buildUrl(RuntimeCamelCatalog rcc, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
- ComponentModel cm = null;
- if (componentSchema != null) {
- String json = rcc.componentJSonSchema(componentSchema);
- if (json != null) {
- cm = JsonMapper.generateComponentModel(json);
- }
+ public static String buildUrl(CamelContext camelContext, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
+ Map<String, Object> filteredProps = props.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(endpointPropertiesPrefix) || e.getKey().startsWith(pathPropertiesPrefix))
+ .collect(Collectors.toMap(
+ e -> e.getKey().replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""),
+ Map.Entry::getValue
+ ));
+
+ EndpointUriFactory factory = camelContext.adapt(ExtendedCamelContext.class).getEndpointUriFactory(componentSchema);
+ if (factory == null) {
+ throw new IllegalStateException("Unable to compute endpoint uri. Reason: uri factory for schema `" + componentSchema + "` not found");
+ }
+ if (!factory.isEnabled(componentSchema)) {
+ throw new IllegalStateException("Unable to compute endpoint uri. Reason: uri factory for schema `" + componentSchema + "` not enabled");
}
- Map<String, String> filteredProps = new HashMap<>();
- props.keySet().stream()
- .filter(k -> k.startsWith(endpointPropertiesPrefix) || k.startsWith(pathPropertiesPrefix))
- .forEach(k -> filteredProps.put(k.replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""), props.get(k)));
- if (cm != null) {
- // secret options should have their values in RAW mode so we can preseve credentials/passwords etc in uri encodings
- for (String k : filteredProps.keySet()) {
- if (isSecretOption(rcc, cm, k)) {
- String value = filteredProps.get(k);
- if (value != null && !value.startsWith("#") && !value.startsWith("RAW(")) {
- value = "RAW(" + value + ")";
- filteredProps.put(k, value);
- }
- }
- }
- }
- return rcc.asEndpointUri(componentSchema, filteredProps, false);
+ return factory.buildUri(componentSchema, filteredProps);
}
/**
@@ -163,12 +154,4 @@ public final class TaskHelper {
break;
}
}
-
- private static boolean isSecretOption(RuntimeCamelCatalog rcc, ComponentModel cm, String endpointName) {
- return cm.getEndpointOptions().stream()
- .filter(o -> o.getName().equals(endpointName))
- .findFirst()
- .map(BaseOptionModel::isSecret).orElse(false);
- }
-
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 4a8fc9e..6559456 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -670,6 +670,7 @@ public class CamelSinkTaskTest {
props.put(TOPIC_CONF, TOPIC_NAME);
props.put("camel.sink.endpoint.secretKey", "se+ret");
props.put("camel.sink.endpoint.accessKey", "MoreSe+ret$");
+ props.put("camel.sink.endpoint.queueNameOrArn", "test");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
CamelSinkTask sinkTask = new CamelSinkTask();
@@ -684,6 +685,7 @@ public class CamelSinkTaskTest {
props.put(TOPIC_CONF, TOPIC_NAME);
props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
props.put("camel.sink.endpoint.accessKey", "#property:myAccessKey");
+ props.put("camel.sink.endpoint.queueNameOrArn", "test");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
props.put("myAccessKey", "MoreSe+ret$");
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 1f53322..ee39b7f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -22,9 +22,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
@@ -114,13 +112,12 @@ public class TaskHelperTest {
@Test
public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
DefaultCamelContext dcc = new DefaultCamelContext();
- RuntimeCamelCatalog rcc = dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
Map<String, String> props = mapOf(
"camel.source.path.name", "test",
"camel.source.endpoint.synchronous", "true"
);
- String result = TaskHelper.buildUrl(rcc, props, "direct", "camel.source.endpoint.", "camel.source.path.");
+ String result = TaskHelper.buildUrl(dcc, props, "direct", "camel.source.endpoint.", "camel.source.path.");
assertEquals("direct:test?synchronous=true", result);
@@ -130,7 +127,7 @@ public class TaskHelperTest {
"camel.source.path.hosts", "localhost"
);
- result = TaskHelper.buildUrl(rcc, props, "cql", "camel.source.endpoint.", "camel.source.path.");
+ result = TaskHelper.buildUrl(dcc, props, "cql", "camel.source.endpoint.", "camel.source.path.");
assertEquals("cql:localhost:8080/test", result);
}