You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/13 13:10:34 UTC
(camel) branch main updated: CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters (#12443)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 096329ddfa8 CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters (#12443)
096329ddfa8 is described below
commit 096329ddfa893e47d74413ca54767abee27e62b5
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Dec 13 14:10:28 2023 +0100
CAMEL-20232: camel-core - Kamelets with Enrich and PollEnrich dynamic endpoints with template parameters (#12443)
---
.../camel/component/kamelet/KameletEnrichTest.java | 84 ++++++++++++++++++++++
.../component/kamelet/KameletPollEnrichTest.java | 66 +++++++++++++++++
.../org/apache/camel/reifier/EnrichReifier.java | 33 +++++----
.../apache/camel/reifier/PollEnrichReifier.java | 35 +++++----
4 files changed, 190 insertions(+), 28 deletions(-)
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java
new file mode 100644
index 00000000000..efcbd2e16ad
--- /dev/null
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEnrichTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KameletEnrichTest extends CamelTestSupport {
+
+ @Test
+ public void testEnrich() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("A");
+ getMockEndpoint("mock:bar").expectedBodiesReceived("B");
+
+ getMockEndpoint("mock:foo").whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String b = exchange.getMessage().getBody(String.class);
+ exchange.getMessage().setBody(b + b);
+ }
+ });
+
+ getMockEndpoint("mock:bar").whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String b = exchange.getMessage().getBody(String.class);
+ exchange.getMessage().setBody("Hello " + b);
+ }
+ });
+
+ String out = template.requestBody("direct:foo", "A", String.class);
+ Assertions.assertEquals("AA", out);
+
+ out = template.requestBody("direct:bar", "B", String.class);
+ Assertions.assertEquals("Hello B", out);
+
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ // **********************************************
+ //
+ // test set-up
+ //
+ // **********************************************
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ routeTemplate("broker")
+ .templateParameter("queue")
+ .from("kamelet:source")
+ .enrich().simple("mock:{{queue}}");
+
+ from("direct:foo")
+ .kamelet("broker?queue=foo");
+
+ from("direct:bar")
+ .kamelet("broker?queue=bar");
+ }
+ };
+ }
+}
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java
new file mode 100644
index 00000000000..33e5b846c05
--- /dev/null
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPollEnrichTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class KameletPollEnrichTest extends CamelTestSupport {
+
+ @Test
+ public void testPollEnrich() throws Exception {
+ template.sendBody("seda:foo", "AA");
+ template.sendBody("seda:bar", "Hello B");
+
+ String out = template.requestBody("direct:foo", "A", String.class);
+ Assertions.assertEquals("AA", out);
+
+ out = template.requestBody("direct:bar", "B", String.class);
+ Assertions.assertEquals("Hello B", out);
+
+ MockEndpoint.assertIsSatisfied(context);
+ }
+
+ // **********************************************
+ //
+ // test set-up
+ //
+ // **********************************************
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ routeTemplate("broker")
+ .templateParameter("queue")
+ .from("kamelet:source")
+ .pollEnrich().simple("seda:{{queue}}").timeout(5000);
+
+ from("direct:foo")
+ .kamelet("broker?queue=foo");
+
+ from("direct:bar")
+ .kamelet("broker?queue=bar");
+ }
+ };
+ }
+}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
index 337b99c5c58..2302628c569 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/EnrichReifier.java
@@ -23,9 +23,12 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.EnrichDefinition;
import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.processor.Enricher;
import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.EndpointHelper;
public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
@@ -35,25 +38,27 @@ public class EnrichReifier extends ExpressionReifier<EnrichDefinition> {
@Override
public Processor createProcessor() throws Exception {
- boolean isShareUnitOfWork = parseBoolean(definition.getShareUnitOfWork(), false);
- boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
- boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false);
-
- Enricher enricher;
+ Expression exp;
+ String uri;
if (definition.getExpression() instanceof ConstantExpression) {
- Expression exp = createExpression(definition.getExpression());
+ exp = createExpression(definition.getExpression());
Exchange ex = new DefaultExchange(camelContext);
- String uri = exp.evaluate(ex, String.class);
- enricher = new Enricher(exp, uri);
+ uri = exp.evaluate(ex, String.class);
} else {
- Expression exp = createExpression(definition.getExpression());
- String uri = definition.getExpression().getExpression();
- enricher = new Enricher(exp, uri);
+ exp = createExpression(definition.getExpression());
+ uri = definition.getExpression().getExpression();
+ }
+
+ // route templates should pre parse uri as they have dynamic values as part of their template parameters
+ RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition);
+ if (rd != null && rd.isTemplate() != null && rd.isTemplate()) {
+ uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri);
}
- enricher.setShareUnitOfWork(isShareUnitOfWork);
- enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
- enricher.setAggregateOnException(isAggregateOnException);
+ Enricher enricher = new Enricher(exp, uri);
+ enricher.setShareUnitOfWork(parseBoolean(definition.getShareUnitOfWork(), false));
+ enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false));
+ enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false));
Integer num = parseInt(definition.getCacheSize());
if (num != null) {
enricher.setCacheSize(num);
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
index 36d43a27205..4ddcbcc6ed8 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
@@ -23,9 +23,12 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.PollEnrichDefinition;
import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
+import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.processor.PollEnricher;
import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.EndpointHelper;
public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
@@ -35,23 +38,27 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
@Override
public Processor createProcessor() throws Exception {
- // if no timeout then we should block, and there use a negative timeout
- long time = parseDuration(definition.getTimeout(), -1);
- boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
- boolean isAggregateOnException = parseBoolean(definition.getAggregateOnException(), false);
-
- PollEnricher enricher;
+ Expression exp;
+ String uri;
if (definition.getExpression() instanceof ConstantExpression) {
- Expression exp = createExpression(definition.getExpression());
+ exp = createExpression(definition.getExpression());
Exchange ex = new DefaultExchange(camelContext);
- String uri = exp.evaluate(ex, String.class);
- enricher = new PollEnricher(uri, time);
+ uri = exp.evaluate(ex, String.class);
} else {
- Expression exp = createExpression(definition.getExpression());
- String uri = definition.getExpression().getExpression();
- enricher = new PollEnricher(exp, uri, time);
+ exp = createExpression(definition.getExpression());
+ uri = definition.getExpression().getExpression();
+ }
+
+ // route templates should pre parse uri as they have dynamic values as part of their template parameters
+ RouteDefinition rd = ProcessorDefinitionHelper.getRoute(definition);
+ if (rd != null && rd.isTemplate() != null && rd.isTemplate()) {
+ uri = EndpointHelper.resolveEndpointUriPropertyPlaceholders(camelContext, uri);
}
+ // if no timeout then we should block, and there use a negative timeout
+ long timeout = parseDuration(definition.getTimeout(), -1);
+
+ PollEnricher enricher = new PollEnricher(exp, uri, timeout);
AggregationStrategy strategy = getConfiguredAggregationStrategy(definition);
if (strategy != null) {
enricher.setAggregationStrategy(strategy);
@@ -60,8 +67,8 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
if (num != null) {
enricher.setCacheSize(num);
}
- enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
- enricher.setAggregateOnException(isAggregateOnException);
+ enricher.setIgnoreInvalidEndpoint(parseBoolean(definition.getIgnoreInvalidEndpoint(), false));
+ enricher.setAggregateOnException(parseBoolean(definition.getAggregateOnException(), false));
if (definition.getAutoStartComponents() != null) {
enricher.setAutoStartupComponents(parseBoolean(definition.getAutoStartComponents(), true));
}