You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/13 13:10:34 UTC

(camel) branch main updated: CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters (#12443)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 096329ddfa8 CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters (#12443)
096329ddfa8 is described below

commit 096329ddfa893e47d74413ca54767abee27e62b5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 13 14:10:28 2023 +0100

    CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters (#12443)
---
 .../camel/component/kamelet/KameletEnrichTest.java | 84 ++++++++++++++++++++++
 .../component/kamelet/KameletPollEnrichTest.java   | 66 +++++++++++++++++
 .../org/apache/camel/reifier/EnrichReifier.java    | 33 +++++----
 .../apache/camel/reifier/PollEnrichReifier.java    | 35 +++++----
 4 files changed, 190 insertions(+), 28 deletions(-)

diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java
new file mode 100644
index 00000000000..efcbd2e16ad
--- /dev/null
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KameletEnrichTest extends CamelTestSupport {
+
+    @Test
+    public void testEnrich() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("A");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("B");
+
+        getMockEndpoint("mock:foo").whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String b = exchange.getMessage().getBody(String.class);
+                exchange.getMessage().setBody(b + b);
+            }
+        });
+
+        getMockEndpoint("mock:bar").whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String b = exchange.getMessage().getBody(String.class);
+                exchange.getMessage().setBody("Hello " + b);
+            }
+        });
+
+        String out = template.requestBody("direct:foo", "A", String.class);
+        Assertions.assertEquals("AA", out);
+
+        out = template.requestBody("direct:bar", "B", String.class);
+        Assertions.assertEquals("Hello B", out);
+
+        MockEndpoint.assertIsSatisfied(context);
+    }
+
+    // **********************************************
+    //
+    // test set-up
+    //
+    // **********************************************
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                routeTemplate("broker")
+                        .templateParameter("queue")
+                        .from("kamelet:source")
+                        .enrich().simple("mock:{{queue}}");
+
+                from("direct:foo")
+                        .kamelet("broker?queue=foo");
+
+                from("direct:bar")
+                        .kamelet("broker?queue=bar");
+            }
+        };
+    }
+}
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java
new file mode 100644
index 00000000000..33e5b846c05
--- /dev/null
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KameletPollEnrichTest extends CamelTestSupport {
+
+    @Test
+    public void testPollEnrich() throws Exception {
+        template.sendBody("seda:foo", "AA");
+        template.sendBody("seda:bar", "Hello B");
+
+        String out = template.requestBody("direct:foo", "A", String.class);
+        Assertions.assertEquals("AA", out);
+
+        out = template.requestBody("direct:bar", "B", String.class);
+        Assertions.assertEquals("Hello B", out);
+
+        MockEndpoint.assertIsSatisfied(context);
+    }
+
+    // **********************************************
+    //
+    // test set-up
+    //
+    // **********************************************
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                routeTemplate("broker")
+                        .templateParameter("queue")
+                        .from("kamelet:source")
+                        .pollEnrich().simple("seda:{{queue}}").timeout(5000);
+
+                from("direct:foo")
+                        .kamelet("broker?queue=foo");
+
+                from("direct:bar")
+                        .kamelet("broker?queue=bar");
+            }
+        };
+    }
+}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
index 337b99c5c58..2302628c569 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
@@ -23,9 +23,12 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.EnrichDefinition;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.processor.Enricher;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.EndpointHelper;
 
 public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
 
@@ -35,25 +38,27 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
 
     @Override
     public Processor createProcessor() throws Exception {
-        boolean isShareUnitOfWork = parseBoolean(definition.getShareUnitOfWork(), false);
-        boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
-        boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false);
-
-        Enricher enricher;
+        Expression exp;
+        String uri;
         if (definition.getExpression() instanceof ConstantExpression) {
-            Expression exp = createExpression(definition.getExpression());
+            exp = createExpression(definition.getExpression());
             Exchange ex = new DefaultExchange(camelContext);
-            String uri = exp.evaluate(ex, String.class);
-            enricher = new Enricher(exp, uri);
+            uri = exp.evaluate(ex, String.class);
         } else {
-            Expression exp = createExpression(definition.getExpression());
-            String uri = definition.getExpression().getExpression();
-            enricher = new Enricher(exp, uri);
+            exp = createExpression(definition.getExpression());
+            uri = definition.getExpression().getExpression();
+        }
+
+        // route templates should pre parse uri as they have dynamic values as part of their template parameters
+        RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition);
+        if (rd != null && rd.isTemplate() != null && rd.isTemplate()) {
+            uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri);
         }
 
-        enricher.setShareUnitOfWork(isShareUnitOfWork);
-        enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
-        enricher.setAggregateOnException(isAggregateOnException);
+        Enricher enricher = new Enricher(exp, uri);
+        enricher.setShareUnitOfWork(parseBoolean(definition.getShareUnitOfWork(), false));
+        enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false));
+        enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false));
         Integer num = parseInt(definition.getCacheSize());
         if (num != null) {
             enricher.setCacheSize(num);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
index 36d43a27205..4ddcbcc6ed8 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
@@ -23,9 +23,12 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.PollEnrichDefinition;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.EndpointHelper;
 
 public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
 
@@ -35,23 +38,27 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
 
     @Override
     public Processor createProcessor() throws Exception {
-        // if no timeout then we should block, and there use a negative timeout
-        long time = parseDuration(definition.getTimeout(), -1);
-        boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
-        boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false);
-
-        PollEnricher enricher;
+        Expression exp;
+        String uri;
         if (definition.getExpression() instanceof ConstantExpression) {
-            Expression exp = createExpression(definition.getExpression());
+            exp = createExpression(definition.getExpression());
             Exchange ex = new DefaultExchange(camelContext);
-            String uri = exp.evaluate(ex, String.class);
-            enricher = new PollEnricher(uri, time);
+            uri = exp.evaluate(ex, String.class);
         } else {
-            Expression exp = createExpression(definition.getExpression());
-            String uri = definition.getExpression().getExpression();
-            enricher = new PollEnricher(exp, uri, time);
+            exp = createExpression(definition.getExpression());
+            uri = definition.getExpression().getExpression();
+        }
+
+        // route templates should pre parse uri as they have dynamic values as part of their template parameters
+        RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition);
+        if (rd != null && rd.isTemplate() != null && rd.isTemplate()) {
+            uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri);
         }
 
+        // if no timeout then we should block, and there use a negative timeout
+        long timeout = parseDuration(definition.getTimeout(), -1);
+
+        PollEnricher enricher = new PollEnricher(exp, uri, timeout);
         AggregationStrategy strategy = getConfiguredAggregationStrategy(definition);
         if (strategy != null) {
             enricher.setAggregationStrategy(strategy);
@@ -60,8 +67,8 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
         if (num != null) {
             enricher.setCacheSize(num);
         }
-        enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
-        enricher.setAggregateOnException(isAggregateOnException);
+        enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false));
+        enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false));
         if (definition.getAutoStartComponents() != null) {
             enricher.setAutoStartupComponents(parseBoolean(definition.getAutoStartComponents(), true));
         }