You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2020/03/25 15:23:06 UTC
[camel] 08/11: Support lightweight mode in enrichers
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8a30982f6c5a908292834089b11c493bbf836a77
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Thu Mar 19 08:14:22 2020 +0100
Support lightweight mode in enrichers
---
.../java/org/apache/camel/processor/Enricher.java | 2 +-
.../org/apache/camel/processor/PollEnricher.java | 43 ++++--
.../org/apache/camel/model/ExpressionNode.java | 20 +--
.../apache/camel/reifier/PollEnrichReifier.java | 15 +-
.../camel/impl/lw/EnricherLightweightTest.java | 59 ++++++++
.../camel/impl/lw/PollEnricherLightweightTest.java | 152 +++++++++++++++++++++
.../org/apache/camel/support/ExchangeHelper.java | 36 ++++-
7 files changed, 304 insertions(+), 23 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index 9a5615e..186ff1c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -360,7 +360,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
if (recipient != null) {
if (recipient instanceof NormalizedEndpointUri) {
NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
- ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ ExtendedCamelContext ecc = exchange.getContext().adapt(ExtendedCamelContext.class);
return ecc.hasEndpoint(nu);
} else {
String uri = recipient.toString();
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9483665..659bbf9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -69,6 +69,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
private String routeId;
private AggregationStrategy aggregationStrategy;
private final Expression expression;
+ private final String destination;
private long timeout;
private boolean aggregateOnException;
private int cacheSize;
@@ -82,6 +83,19 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
*/
public PollEnricher(Expression expression, long timeout) {
this.expression = expression;
+ this.destination = null;
+ this.timeout = timeout;
+ }
+
+ /**
+ * Creates a new {@link PollEnricher}.
+ *
+ * @param destination the endpoint to poll from.
+ * @param timeout timeout in millis
+ */
+ public PollEnricher(String destination, long timeout) {
+ this.expression = null;
+ this.destination = destination;
this.timeout = timeout;
}
@@ -183,6 +197,18 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
}
+ @Override
+ protected void doInit() throws Exception {
+ if (destination != null) {
+ Endpoint endpoint = getExistingEndpoint(camelContext, destination);
+ if (endpoint == null) {
+ endpoint = resolveEndpoint(camelContext, destination, cacheSize < 0);
+ }
+ } else if (expression != null) {
+ expression.init(camelContext);
+ }
+ }
+
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
@@ -213,11 +239,11 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
Object recipient = null;
boolean prototype = cacheSize < 0;
try {
- recipient = expression.evaluate(exchange, Object.class);
+ recipient = destination != null ? destination : expression.evaluate(exchange, Object.class);
recipient = prepareRecipient(exchange, recipient);
- Endpoint existing = getExistingEndpoint(exchange, recipient);
+ Endpoint existing = getExistingEndpoint(camelContext, recipient);
if (existing == null) {
- endpoint = resolveEndpoint(exchange, recipient, prototype);
+ endpoint = resolveEndpoint(camelContext, recipient, prototype);
} else {
endpoint = existing;
// we have an existing endpoint then its not a prototype scope
@@ -369,25 +395,26 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
return null;
}
- protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
+ protected static Endpoint getExistingEndpoint(CamelContext context, Object recipient) {
if (recipient instanceof Endpoint) {
return (Endpoint) recipient;
}
if (recipient != null) {
if (recipient instanceof NormalizedEndpointUri) {
NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
- ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+ ExtendedCamelContext ecc = context.adapt(ExtendedCamelContext.class);
return ecc.hasEndpoint(nu);
} else {
String uri = recipient.toString();
- return exchange.getContext().hasEndpoint(uri);
+ return context.hasEndpoint(uri);
}
}
return null;
}
- protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
- return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+ protected static Endpoint resolveEndpoint(CamelContext camelContext, Object recipient, boolean prototype) {
+ return prototype ? ExchangeHelper.resolvePrototypeEndpoint(camelContext, recipient)
+ : ExchangeHelper.resolveEndpoint(camelContext, recipient);
}
/**
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java
index 92aec4e..5a0bdf8 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -48,19 +48,15 @@ public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode>
}
public ExpressionNode(ExpressionDefinition expression) {
- this.expression = expression;
+ setExpression(expression);
}
public ExpressionNode(Expression expression) {
- if (expression != null) {
- setExpression(ExpressionNodeHelper.toExpressionDefinition(expression));
- }
+ setExpression(expression);
}
public ExpressionNode(Predicate predicate) {
- if (predicate != null) {
- setExpression(ExpressionNodeHelper.toExpressionDefinition(predicate));
- }
+ setExpression(predicate);
}
public ExpressionDefinition getExpression() {
@@ -68,7 +64,15 @@ public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode>
}
public void setExpression(Expression expression) {
- setExpression(new ExpressionDefinition(expression));
+ if (expression != null) {
+ setExpression(ExpressionNodeHelper.toExpressionDefinition(expression));
+ }
+ }
+
+ private void setExpression(Predicate predicate) {
+ if (predicate != null) {
+ setExpression(ExpressionNodeHelper.toExpressionDefinition(predicate));
+ }
}
public void setExpression(ExpressionDefinition expression) {
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
index ec310c4..ea411b5 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
@@ -18,13 +18,16 @@ package org.apache.camel.reifier;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.PollEnrichDefinition;
import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.support.DefaultExchange;
public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
@@ -38,9 +41,17 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
// if no timeout then we should block, and there use a negative timeout
long time = definition.getTimeout() != null ? parseLong(definition.getTimeout()) : -1;
boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
- Expression exp = createExpression(definition.getExpression());
- PollEnricher enricher = new PollEnricher(exp, time);
+ PollEnricher enricher;
+ if (definition.getExpression() instanceof ConstantExpression) {
+ Expression exp = createExpression(definition.getExpression());
+ Exchange ex = new DefaultExchange(camelContext);
+ String dest = exp.evaluate(ex, String.class);
+ enricher = new PollEnricher(dest, time);
+ } else {
+ Expression exp = createExpression(definition.getExpression());
+ enricher = new PollEnricher(exp, time);
+ }
AggregationStrategy strategy = createAggregationStrategy();
if (strategy == null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/lw/EnricherLightweightTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/lw/EnricherLightweightTest.java
new file mode 100644
index 0000000..b1ca671
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/lw/EnricherLightweightTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.lw;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+public class EnricherLightweightTest extends ContextTestSupport {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ setUseLightweightContext(true);
+ super.setUp();
+ }
+
+ @Test
+ public void testEnrich() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:enriched");
+ mock.expectedBodiesReceived("res-1", "res-2", "res-3");
+
+ template.sendBody("direct:start", 1);
+ template.sendBody("direct:start", 2);
+ template.sendBody("direct:start", 3);
+
+ mock.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").enrichWith("direct:resource")
+ .body(Integer.class, String.class, (o, n) -> n + o).to("mock:enriched");
+
+ // set an empty message
+ from("direct:resource").transform().body(b -> "res-");
+ }
+ };
+ }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/lw/PollEnricherLightweightTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/lw/PollEnricherLightweightTest.java
new file mode 100644
index 0000000..e7142bc
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/lw/PollEnricherLightweightTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.lw;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.enricher.SampleAggregator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PollEnricherLightweightTest extends ContextTestSupport {
+
+ private static SampleAggregator aggregationStrategy = new SampleAggregator();
+
+ protected MockEndpoint mock;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ setUseLightweightContext(true);
+ super.setUp();
+ mock = getMockEndpoint("mock:mock");
+ }
+
+ // -------------------------------------------------------------
+ // InOnly routes
+ // -------------------------------------------------------------
+
+ @Test
+ public void testPollEnrichInOnly() throws InterruptedException {
+ template.sendBody("seda:foo1", "blah");
+
+ mock.expectedBodiesReceived("test:blah");
+ mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo1");
+
+ template.sendBody("direct:enricher-test-1", "test");
+
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void testPollEnrichInOnlyWaitWithTimeout() throws InterruptedException {
+ // this first try there is no data so we timeout
+ mock.expectedBodiesReceived("test:blah");
+ mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo2");
+ template.sendBody("direct:enricher-test-2", "test");
+ // not expected data so we are not happy
+ mock.assertIsNotSatisfied();
+
+ // now send it and try again
+ mock.reset();
+ template.sendBody("seda:foo2", "blah");
+ template.sendBody("direct:enricher-test-2", "test");
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void testPollEnrichInOnlyWaitNoTimeout() throws InterruptedException {
+ // use another thread to send it a bit later
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ template.sendBody("seda:foo3", "blah");
+ }
+ });
+
+ long start = System.currentTimeMillis();
+ mock.expectedBodiesReceived("test:blah");
+ mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo3");
+ t.start();
+ template.sendBody("direct:enricher-test-3", "test");
+ // should take approx 1 sec to complete as the other thread is sending a
+ // bit later and we wait
+ mock.assertIsSatisfied();
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take approx 0.25 sec: was " + delta, delta > 150);
+ }
+
+ // -------------------------------------------------------------
+ // InOut routes
+ // -------------------------------------------------------------
+
+ @Test
+ public void testPollEnrichInOut() throws InterruptedException {
+ template.sendBody("seda:foo4", "blah");
+
+ String result = (String)template.sendBody("direct:enricher-test-4", ExchangePattern.InOut, "test");
+ assertEquals("test:blah", result);
+ }
+
+ @Test
+ public void testPollEnrichInOutPlusHeader() throws InterruptedException {
+ template.sendBody("seda:foo4", "blah");
+
+ Exchange exchange = template.request("direct:enricher-test-4", new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setHeader("foo", "bar");
+ exchange.getIn().setBody("test");
+ }
+ });
+ assertEquals("bar", exchange.getIn().getHeader("foo"));
+ assertEquals("test:blah", exchange.getIn().getBody());
+ assertEquals("seda://foo4", exchange.getMessage().getHeader(Exchange.TO_ENDPOINT));
+ assertNull(exchange.getException());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ // -------------------------------------------------------------
+ // InOnly routes
+ // -------------------------------------------------------------
+
+ from("direct:enricher-test-1").pollEnrich("seda:foo1", aggregationStrategy).to("mock:mock");
+
+ from("direct:enricher-test-2").pollEnrich("seda:foo2", 1000, aggregationStrategy).to("mock:mock");
+
+ from("direct:enricher-test-3").pollEnrich("seda:foo3", -1, aggregationStrategy).to("mock:mock");
+
+ // -------------------------------------------------------------
+ // InOut routes
+ // -------------------------------------------------------------
+
+ from("direct:enricher-test-4").pollEnrich("seda:foo4", aggregationStrategy);
+ }
+ };
+ }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 70d79fa..0bf9076 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -89,6 +89,20 @@ public final class ExchangeHelper {
* @throws NoSuchEndpointException if the endpoint cannot be resolved
*/
public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+ return resolveEndpoint(exchange.getContext(), value);
+ }
+
+ /**
+ * Attempts to resolve the endpoint for the given value
+ *
+ * @param context the camel context
+ * @param value the value which can be an {@link Endpoint} or an object
+ * which provides a String representation of an endpoint via
+ * {@link #toString()}
+ * @return the endpoint
+ * @throws NoSuchEndpointException if the endpoint cannot be resolved
+ */
+ public static Endpoint resolveEndpoint(CamelContext context, Object value) throws NoSuchEndpointException {
if (value == null) {
throw new NoSuchEndpointException("null");
}
@@ -97,10 +111,10 @@ public final class ExchangeHelper {
endpoint = (Endpoint) value;
} else if (value instanceof NormalizedEndpointUri) {
NormalizedEndpointUri nu = (NormalizedEndpointUri) value;
- endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), nu);
+ endpoint = CamelContextHelper.getMandatoryEndpoint(context, nu);
} else {
String uri = value.toString().trim();
- endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
+ endpoint = CamelContextHelper.getMandatoryEndpoint(context, uri);
}
return endpoint;
}
@@ -116,6 +130,20 @@ public final class ExchangeHelper {
* @throws NoSuchEndpointException if the endpoint cannot be resolved
*/
public static Endpoint resolvePrototypeEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+ return resolvePrototypeEndpoint(exchange.getContext(), value);
+ }
+
+ /**
+ * Attempts to resolve the endpoint (prototype scope) for the given value
+ *
+ * @param context the camel context
+ * @param value the value which can be an {@link Endpoint} or an object
+ * which provides a String representation of an endpoint via
+ * {@link #toString()}
+ * @return the endpoint
+ * @throws NoSuchEndpointException if the endpoint cannot be resolved
+ */
+ public static Endpoint resolvePrototypeEndpoint(CamelContext context, Object value) throws NoSuchEndpointException {
if (value == null) {
throw new NoSuchEndpointException("null");
}
@@ -124,10 +152,10 @@ public final class ExchangeHelper {
endpoint = (Endpoint) value;
} else if (value instanceof NormalizedEndpointUri) {
NormalizedEndpointUri nu = (NormalizedEndpointUri) value;
- endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), nu);
+ endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(context, nu);
} else {
String uri = value.toString().trim();
- endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), uri);
+ endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(context, uri);
}
return endpoint;
}