You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/21 12:40:37 UTC

[camel-kafka-connector] branch master updated (fe39ef8 -> 0801cd4)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


    from fe39ef8  Updated CHANGELOG.md
     new c8e93df  core: use EndpointUriFactory to compute endpoint urls instead of the runtime catalog
     new 0801cd4  itest-cassandra: add workaround for CAMEL-15722

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/pom.xml                                       |  4 --
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  4 +-
 .../camel/kafkaconnector/utils/TaskHelper.java     | 57 ++++++++--------------
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  2 +
 .../camel/kafkaconnector/utils/TaskHelperTest.java |  7 +--
 .../sink/CamelCassandraPropertyFactory.java        |  4 +-
 7 files changed, 30 insertions(+), 51 deletions(-)


[camel-kafka-connector] 02/02: itest-cassandra: add workaround for CAMEL-15722

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 0801cd41ca213a122c6fa24b270a6ed552d7e250
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 21 11:28:31 2020 +0200

    itest-cassandra: add workaround for CAMEL-15722
---
 .../kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java
index 1222010..6f8e598 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java
@@ -30,7 +30,9 @@ final class CamelCassandraPropertyFactory extends SinkConnectorPropertyFactory<C
     }
 
     public CamelCassandraPropertyFactory withCql(String cql) {
-        return setProperty("camel.sink.endpoint.cql", cql);
+        // RAW is required as the endpoint URI builder encodes the URI
+        // TODO: remove once https://issues.apache.org/jira/browse/CAMEL-15722 get fixed
+        return setProperty("camel.sink.endpoint.cql", "RAW(" + cql + ")");
     }
 
     public CamelCassandraPropertyFactory withHosts(String hosts) {


[camel-kafka-connector] 01/02: core: use EndpointUriFactory to compute endpoint urls instead of the runtime catalog

Posted by ac...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit c8e93dfb043f9586508f7cf2a86a7436592f0ad9
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Oct 21 09:22:44 2020 +0200

    core: use EndpointUriFactory to compute endpoint urls instead of the runtime catalog
---
 core/pom.xml                                       |  4 --
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  4 +-
 .../camel/kafkaconnector/utils/TaskHelper.java     | 57 ++++++++--------------
 .../camel/kafkaconnector/CamelSinkTaskTest.java    |  2 +
 .../camel/kafkaconnector/utils/TaskHelperTest.java |  7 +--
 6 files changed, 27 insertions(+), 50 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 61454c1..92787b1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,10 +41,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-core-catalog</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
             <artifactId>camel-main</artifactId>
         </dependency>
         <dependency>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index f410c50..4d66219 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -24,7 +24,6 @@ import java.util.Objects;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -86,7 +85,7 @@ public class CamelSinkTask extends SinkTask {
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+                remoteUrl = TaskHelper.buildUrl(camelContext,
                                                 actualProps,
                                                 config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
                                                 CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 9845013..aef600f 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -26,7 +26,6 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -96,7 +95,8 @@ public class CamelSourceTask extends SourceTask {
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), actualProps,
+                remoteUrl = TaskHelper.buildUrl(camelContext,
+                                                actualProps,
                                                 config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
                                                 CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 87f545e..0251d66 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -21,12 +21,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
-import org.apache.camel.tooling.model.BaseOptionModel;
-import org.apache.camel.tooling.model.ComponentModel;
-import org.apache.camel.tooling.model.JsonMapper;
+import org.apache.camel.spi.EndpointUriFactory;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.slf4j.Logger;
 
@@ -38,7 +38,7 @@ public final class TaskHelper {
     /**
      * Try to build a url of a Camel {@link org.apache.camel.Endpoint}.
      *
-     * @param rcc RuntimeCamelCatalog used to build the url.
+     * @param camelContext the {@link CamelContext} used to retrieve an instance of a EndpointUriFactory for the given component schema.
      * @param props properties used to build the url in the form of a key -> value {@link Map}.
      * @param componentSchema the schema name of the Camel {@link org.apache.camel.Component} used to build the Camel {@link org.apache.camel.Endpoint} url.
      * @param endpointPropertiesPrefix prefix of all the Camel {@link org.apache.camel.Endpoint} properties.
@@ -47,34 +47,25 @@ public final class TaskHelper {
      * @return A String representing the built url.
      * @throws {@link URISyntaxException} in case of uri build failure.
      */
-    public static String buildUrl(RuntimeCamelCatalog rcc, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
-        ComponentModel cm = null;
-        if (componentSchema != null) {
-            String json = rcc.componentJSonSchema(componentSchema);
-            if (json != null) {
-                cm = JsonMapper.generateComponentModel(json);
-            }
+    public static String buildUrl(CamelContext camelContext, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
+        Map<String, Object> filteredProps = props.entrySet().stream()
+            .filter(e -> e.getKey().startsWith(endpointPropertiesPrefix) || e.getKey().startsWith(pathPropertiesPrefix))
+            .collect(Collectors.toMap(
+                e -> e.getKey().replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""),
+                Map.Entry::getValue
+            ));
+
+        EndpointUriFactory factory = camelContext.adapt(ExtendedCamelContext.class).getEndpointUriFactory(componentSchema);
+        if (factory == null) {
+            throw new IllegalStateException("Unable to compute endpoint uri. Reason: uri factory for schema `" + componentSchema + "` not found");
+        }
+        if (!factory.isEnabled(componentSchema)) {
+            throw new IllegalStateException("Unable to compute endpoint uri. Reason: uri factory for schema `" + componentSchema + "` not enabled");
         }
 
-        Map<String, String> filteredProps = new HashMap<>();
-        props.keySet().stream()
-                .filter(k -> k.startsWith(endpointPropertiesPrefix) || k.startsWith(pathPropertiesPrefix))
-                .forEach(k -> filteredProps.put(k.replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""), props.get(k)));
 
-        if (cm != null) {
-            // secret options should have their values in RAW mode so we can preseve credentials/passwords etc in uri encodings
-            for (String k : filteredProps.keySet()) {
-                if (isSecretOption(rcc, cm, k)) {
-                    String value = filteredProps.get(k);
-                    if (value != null && !value.startsWith("#") && !value.startsWith("RAW(")) {
-                        value = "RAW(" + value + ")";
-                        filteredProps.put(k, value);
-                    }
-                }
-            }
-        }
 
-        return rcc.asEndpointUri(componentSchema, filteredProps, false);
+        return factory.buildUri(componentSchema, filteredProps);
     }
 
     /**
@@ -163,12 +154,4 @@ public final class TaskHelper {
                 break;
         }
     }
-
-    private static boolean isSecretOption(RuntimeCamelCatalog rcc, ComponentModel cm, String endpointName) {
-        return cm.getEndpointOptions().stream()
-                .filter(o -> o.getName().equals(endpointName))
-                .findFirst()
-                .map(BaseOptionModel::isSecret).orElse(false);
-    }
-
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 4a8fc9e..6559456 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -670,6 +670,7 @@ public class CamelSinkTaskTest {
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "se+ret");
         props.put("camel.sink.endpoint.accessKey", "MoreSe+ret$");
+        props.put("camel.sink.endpoint.queueNameOrArn", "test");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
 
         CamelSinkTask sinkTask = new CamelSinkTask();
@@ -684,6 +685,7 @@ public class CamelSinkTaskTest {
         props.put(TOPIC_CONF, TOPIC_NAME);
         props.put("camel.sink.endpoint.secretKey", "#bean:mySecretKey");
         props.put("camel.sink.endpoint.accessKey", "#property:myAccessKey");
+        props.put("camel.sink.endpoint.queueNameOrArn", "test");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "aws2-sqs");
         props.put("myAccessKey", "MoreSe+ret$");
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 1f53322..ee39b7f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -22,9 +22,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
@@ -114,13 +112,12 @@ public class TaskHelperTest {
     @Test
     public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
         DefaultCamelContext dcc = new DefaultCamelContext();
-        RuntimeCamelCatalog rcc = dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
         Map<String, String> props = mapOf(
             "camel.source.path.name", "test",
             "camel.source.endpoint.synchronous", "true"
         );
 
-        String result = TaskHelper.buildUrl(rcc, props, "direct", "camel.source.endpoint.", "camel.source.path.");
+        String result = TaskHelper.buildUrl(dcc, props, "direct", "camel.source.endpoint.", "camel.source.path.");
 
         assertEquals("direct:test?synchronous=true", result);
 
@@ -130,7 +127,7 @@ public class TaskHelperTest {
             "camel.source.path.hosts", "localhost"
         );
 
-        result = TaskHelper.buildUrl(rcc, props, "cql", "camel.source.endpoint.", "camel.source.path.");
+        result = TaskHelper.buildUrl(dcc, props, "cql", "camel.source.endpoint.", "camel.source.path.");
 
         assertEquals("cql:localhost:8080/test", result);
     }