You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/21 12:40:38 UTC

[camel-kafka-connector] 01/02: core: use EndpointUriFactory to compute endpoint urls instead of the runtime catalog

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c8e93dfb043f9586508f7cf2a86a7436592f0ad9
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 21 09:22:44 2020 +0200

    core: use EndpointUriFactory to compute endpoint urls instead of the runtime catalog
---
 core/pom.xml                                       |  4 --
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  4 +-
 .../camel/kafkaconnector/utils/TaskHelper.java     | 57 ++++++++--------------
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  2 +
 .../camel/kafkaconnector/utils/TaskHelperTest.java |  7 +--
 6 files changed, 27 insertions(+), 50 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 61454c1..92787b1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,10 +41,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-core-catalog</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
             <artifactId>camel-main</artifactId>
         </dependency>
         <dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index f410c50..4d66219 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -24,7 +24,6 @@ import java.util.Objects;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -86,7 +85,7 @@ public class CamelSinkTask extends SinkTask {
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+                remoteUrl = TaskHelper.buildUrl(camelContext,
                                                 actualProps,
                                                 config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
                                                 CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 9845013..aef600f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -96,7 +95,8 @@ public class CamelSourceTask extends SourceTask {
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), actualProps,
+                remoteUrl = TaskHelper.buildUrl(camelContext,
+                                                actualProps,
                                                 config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 87f545e..0251d66 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -21,12 +21,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
-import org.apache.camel.tooling.model.BaseOptionModel;
-import org.apache.camel.tooling.model.ComponentModel;
-import org.apache.camel.tooling.model.JsonMapper;
+import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.slf4j.Logger;
 
@@ -38,7 +38,7 @@ public final class TaskHelper {
     /**
      * Try to build a url of a Camel {@link org.apache.camel.Endpoint}.
      *
-     * @param rcc RuntimeCamelCatalog used to build the url.
+     * @param camelContext the {@link CamelContext} used to retrieve an instance of a EndpointUriFactory for the given component schema.
      * @param props properties used to build the url in the form of a key -> value {@link Map}.
      * @param componentSchema the schema name of the Camel {@link org.apache.camel.Component} used to build the Camel {@link org.apache.camel.Endpoint} url.
      * @param endpointPropertiesPrefix prefix of all the Camel {@link org.apache.camel.Endpoint} properties.
@@ -47,34 +47,25 @@ public final class TaskHelper {
      * @return A String representing the built url.
      * @throws {@link URISyntaxException} in case of uri build failure.
      */
-    public static String buildUrl(RuntimeCamelCatalog rcc, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
-        ComponentModel cm = null;
-        if (componentSchema != null) {
-            String json = rcc.componentJSonSchema(componentSchema);
-            if (json != null) {
-                cm = JsonMapper.generateComponentModel(json);
-            }
+    public static String buildUrl(CamelContext camelContext, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
+        Map<String, Object> filteredProps = props.entrySet().stream()
+            .filter(e -> e.getKey().startsWith(endpointPropertiesPrefix) || e.getKey().startsWith(pathPropertiesPrefix))
+            .collect(Collectors.toMap(
+                e -> e.getKey().replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""),
+                Map.Entry::getValue
+            ));
+
+        EndpointUriFactory factory = camelContext.adapt(ExtendedCamelContext.class).getEndpointUriFactory(componentSchema);
+        if (factory == null) {
+            throw new IllegalStateException("Unable to compute endpoint uri. Reason: uri factory for schema `" + componentSchema + "` not found");
+        }
+        if (!factory.isEnabled(componentSchema)) {
+            throw new IllegalStateException("Unable to compute endpoint uri. Reason: uri factory for schema `" + componentSchema + "` not enabled");
         }
 
-        Map<String, String> filteredProps = new HashMap<>();
-        props.keySet().stream()
-                .filter(k -> k.startsWith(endpointPropertiesPrefix) || k.startsWith(pathPropertiesPrefix))
-                .forEach(k -> filteredProps.put(k.replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""), props.get(k)));
 
-        if (cm != null) {
-            // secret options should have their values in RAW mode so we can preseve credentials/passwords etc in uri encodings
-            for (String k : filteredProps.keySet()) {
-                if (isSecretOption(rcc, cm, k)) {
-                    String value = filteredProps.get(k);
-                    if (value != null && !value.startsWith("#") && !value.startsWith("RAW(")) {
-                        value = "RAW(" + value + ")";
-                        filteredProps.put(k, value);
-                    }
-                }
-            }
-        }
 
-        return rcc.asEndpointUri(componentSchema, filteredProps, false);
+        return factory.buildUri(componentSchema, filteredProps);
     }
 
     /**
@@ -163,12 +154,4 @@ public final class TaskHelper {
                 break;
         }
     }
-
-    private static boolean isSecretOption(RuntimeCamelCatalog rcc, ComponentModel cm, String endpointName) {
-        return cm.getEndpointOptions().stream()
-                .filter(o -> o.getName().equals(endpointName))
-                .findFirst()
-                .map(BaseOptionModel::isSecret).orElse(false);
-    }
-
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 4a8fc9e..6559456 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -670,6 +670,7 @@ public class CamelSinkTaskTest {
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "se+ret");
         props.put("camel.sink.endpoint.accessKey", "MoreSe+ret$");
+        props.put("camel.sink.endpoint.queueNameOrArn", "test");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -684,6 +685,7 @@ public class CamelSinkTaskTest {
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
         props.put("camel.sink.endpoint.accessKey", "#property:myAccessKey");
+        props.put("camel.sink.endpoint.queueNameOrArn", "test");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
         props.put("myAccessKey", "MoreSe+ret$");
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 1f53322..ee39b7f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -22,9 +22,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
@@ -114,13 +112,12 @@ public class TaskHelperTest {
     @Test
     public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
         DefaultCamelContext dcc = new DefaultCamelContext();
-        RuntimeCamelCatalog rcc = dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
         Map<String, String> props = mapOf(
             "camel.source.path.name", "test",
             "camel.source.endpoint.synchronous", "true"
         );
 
-        String result = TaskHelper.buildUrl(rcc, props, "direct", "camel.source.endpoint.", "camel.source.path.");
+        String result = TaskHelper.buildUrl(dcc, props, "direct", "camel.source.endpoint.", "camel.source.path.");
 
         assertEquals("direct:test?synchronous=true", result);
 
@@ -130,7 +127,7 @@ public class TaskHelperTest {
             "camel.source.path.hosts", "localhost"
         );
 
-        result = TaskHelper.buildUrl(rcc, props, "cql", "camel.source.endpoint.", "camel.source.path.");
+        result = TaskHelper.buildUrl(dcc, props, "cql", "camel.source.endpoint.", "camel.source.path.");
 
         assertEquals("cql:localhost:8080/test", result);
     }