You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/13 12:57:48 UTC

(camel) branch kamelet-poll-enrich created (now 4168dbc8f4c)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch kamelet-poll-enrich
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 4168dbc8f4c CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters

This branch includes the following new commits:

     new 4168dbc8f4c CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(camel) 01/01: CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kamelet-poll-enrich
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4168dbc8f4cdc0bca6107723ef10f873c34fed61
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 13 13:55:26 2023 +0100

    CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters
---
 .../camel/component/kamelet/KameletEnrichTest.java | 84 ++++++++++++++++++++++
 .../component/kamelet/KameletPollEnrichTest.java   | 66 +++++++++++++++++
 .../org/apache/camel/reifier/EnrichReifier.java    | 33 +++++----
 .../apache/camel/reifier/PollEnrichReifier.java    | 35 +++++----
 4 files changed, 190 insertions(+), 28 deletions(-)

diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java
new file mode 100644
index 00000000000..efcbd2e16ad
--- /dev/null
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KameletEnrichTest extends CamelTestSupport {
+
+    @Test
+    public void testEnrich() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("A");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("B");
+
+        getMockEndpoint("mock:foo").whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String b = exchange.getMessage().getBody(String.class);
+                exchange.getMessage().setBody(b + b);
+            }
+        });
+
+        getMockEndpoint("mock:bar").whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                String b = exchange.getMessage().getBody(String.class);
+                exchange.getMessage().setBody("Hello " + b);
+            }
+        });
+
+        String out = template.requestBody("direct:foo", "A", String.class);
+        Assertions.assertEquals("AA", out);
+
+        out = template.requestBody("direct:bar", "B", String.class);
+        Assertions.assertEquals("Hello B", out);
+
+        MockEndpoint.assertIsSatisfied(context);
+    }
+
+    // **********************************************
+    //
+    // test set-up
+    //
+    // **********************************************
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                routeTemplate("broker")
+                        .templateParameter("queue")
+                        .from("kamelet:source")
+                        .enrich().simple("mock:{{queue}}");
+
+                from("direct:foo")
+                        .kamelet("broker?queue=foo");
+
+                from("direct:bar")
+                        .kamelet("broker?queue=bar");
+            }
+        };
+    }
+}
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java
new file mode 100644
index 00000000000..33e5b846c05
--- /dev/null
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KameletPollEnrichTest extends CamelTestSupport {
+
+    @Test
+    public void testPollEnrich() throws Exception {
+        template.sendBody("seda:foo", "AA");
+        template.sendBody("seda:bar", "Hello B");
+
+        String out = template.requestBody("direct:foo", "A", String.class);
+        Assertions.assertEquals("AA", out);
+
+        out = template.requestBody("direct:bar", "B", String.class);
+        Assertions.assertEquals("Hello B", out);
+
+        MockEndpoint.assertIsSatisfied(context);
+    }
+
+    // **********************************************
+    //
+    // test set-up
+    //
+    // **********************************************
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                routeTemplate("broker")
+                        .templateParameter("queue")
+                        .from("kamelet:source")
+                        .pollEnrich().simple("seda:{{queue}}").timeout(5000);
+
+                from("direct:foo")
+                        .kamelet("broker?queue=foo");
+
+                from("direct:bar")
+                        .kamelet("broker?queue=bar");
+            }
+        };
+    }
+}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
index 337b99c5c58..2302628c569 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
@@ -23,9 +23,12 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.EnrichDefinition;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.processor.Enricher;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.EndpointHelper;
 
 public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
 
@@ -35,25 +38,27 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
 
     @Override
     public Processor createProcessor() throws Exception {
-        boolean isShareUnitOfWork = parseBoolean(definition.getShareUnitOfWork(), false);
-        boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
-        boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false);
-
-        Enricher enricher;
+        Expression exp;
+        String uri;
         if (definition.getExpression() instanceof ConstantExpression) {
-            Expression exp = createExpression(definition.getExpression());
+            exp = createExpression(definition.getExpression());
             Exchange ex = new DefaultExchange(camelContext);
-            String uri = exp.evaluate(ex, String.class);
-            enricher = new Enricher(exp, uri);
+            uri = exp.evaluate(ex, String.class);
         } else {
-            Expression exp = createExpression(definition.getExpression());
-            String uri = definition.getExpression().getExpression();
-            enricher = new Enricher(exp, uri);
+            exp = createExpression(definition.getExpression());
+            uri = definition.getExpression().getExpression();
+        }
+
+        // route templates should pre parse uri as they have dynamic values as part of their template parameters
+        RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition);
+        if (rd != null && rd.isTemplate() != null && rd.isTemplate()) {
+            uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri);
         }
 
-        enricher.setShareUnitOfWork(isShareUnitOfWork);
-        enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
-        enricher.setAggregateOnException(isAggregateOnException);
+        Enricher enricher = new Enricher(exp, uri);
+        enricher.setShareUnitOfWork(parseBoolean(definition.getShareUnitOfWork(), false));
+        enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false));
+        enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false));
         Integer num = parseInt(definition.getCacheSize());
         if (num != null) {
             enricher.setCacheSize(num);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
index 36d43a27205..4ddcbcc6ed8 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
@@ -23,9 +23,12 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.PollEnrichDefinition;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.EndpointHelper;
 
 public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
 
@@ -35,23 +38,27 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
 
     @Override
     public Processor createProcessor() throws Exception {
-        // if no timeout then we should block, and there use a negative timeout
-        long time = parseDuration(definition.getTimeout(), -1);
-        boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
-        boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false);
-
-        PollEnricher enricher;
+        Expression exp;
+        String uri;
         if (definition.getExpression() instanceof ConstantExpression) {
-            Expression exp = createExpression(definition.getExpression());
+            exp = createExpression(definition.getExpression());
             Exchange ex = new DefaultExchange(camelContext);
-            String uri = exp.evaluate(ex, String.class);
-            enricher = new PollEnricher(uri, time);
+            uri = exp.evaluate(ex, String.class);
         } else {
-            Expression exp = createExpression(definition.getExpression());
-            String uri = definition.getExpression().getExpression();
-            enricher = new PollEnricher(exp, uri, time);
+            exp = createExpression(definition.getExpression());
+            uri = definition.getExpression().getExpression();
+        }
+
+        // route templates should pre parse uri as they have dynamic values as part of their template parameters
+        RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition);
+        if (rd != null && rd.isTemplate() != null && rd.isTemplate()) {
+            uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri);
         }
 
+        // if no timeout then we should block, and there use a negative timeout
+        long timeout = parseDuration(definition.getTimeout(), -1);
+
+        PollEnricher enricher = new PollEnricher(exp, uri, timeout);
         AggregationStrategy strategy = getConfiguredAggregationStrategy(definition);
         if (strategy != null) {
             enricher.setAggregationStrategy(strategy);
@@ -60,8 +67,8 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
         if (num != null) {
             enricher.setCacheSize(num);
         }
-        enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
-        enricher.setAggregateOnException(isAggregateOnException);
+        enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false));
+        enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false));
         if (definition.getAutoStartComponents() != null) {
             enricher.setAutoStartupComponents(parseBoolean(definition.getAutoStartComponents(), true));
         }