You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2020/03/25 15:23:06 UTC

[camel] 08/11: Support lightweight mode in enrichers

This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8a30982f6c5a908292834089b11c493bbf836a77
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Thu Mar 19 08:14:22 2020 +0100

    Support lightweight mode in enrichers
---
 .../java/org/apache/camel/processor/Enricher.java  |   2 +-
 .../org/apache/camel/processor/PollEnricher.java   |  43 ++++--
 .../org/apache/camel/model/ExpressionNode.java     |  20 +--
 .../apache/camel/reifier/PollEnrichReifier.java    |  15 +-
 .../camel/impl/lw/EnricherLightweightTest.java     |  59 ++++++++
 .../camel/impl/lw/PollEnricherLightweightTest.java | 152 +++++++++++++++++++++
 .../org/apache/camel/support/ExchangeHelper.java   |  36 ++++-
 7 files changed, 304 insertions(+), 23 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index 9a5615e..186ff1c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -360,7 +360,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA
         if (recipient != null) {
             if (recipient instanceof NormalizedEndpointUri) {
                 NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
-                ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+                ExtendedCamelContext ecc = exchange.getContext().adapt(ExtendedCamelContext.class);
                 return ecc.hasEndpoint(nu);
             } else {
                 String uri = recipient.toString();
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index 9483665..659bbf9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -69,6 +69,7 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
     private String routeId;
     private AggregationStrategy aggregationStrategy;
     private final Expression expression;
+    private final String destination;
     private long timeout;
     private boolean aggregateOnException;
     private int cacheSize;
@@ -82,6 +83,19 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
      */
     public PollEnricher(Expression expression, long timeout) {
         this.expression = expression;
+        this.destination = null;
+        this.timeout = timeout;
+    }
+
+    /**
+     * Creates a new {@link PollEnricher}.
+     *
+     * @param destination the endpoint to poll from.
+     * @param timeout timeout in millis
+     */
+    public PollEnricher(String destination, long timeout) {
+        this.expression = null;
+        this.destination = destination;
         this.timeout = timeout;
     }
 
@@ -183,6 +197,18 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
     }
 
+    @Override
+    protected void doInit() throws Exception {
+        if (destination != null) {
+            Endpoint endpoint = getExistingEndpoint(camelContext, destination);
+            if (endpoint == null) {
+                endpoint = resolveEndpoint(camelContext, destination, cacheSize < 0);
+            }
+        } else if (expression != null) {
+            expression.init(camelContext);
+        }
+    }
+
     /**
      * Enriches the input data (<code>exchange</code>) by first obtaining
      * additional data from an endpoint represented by an endpoint
@@ -213,11 +239,11 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         Object recipient = null;
         boolean prototype = cacheSize < 0;
         try {
-            recipient = expression.evaluate(exchange, Object.class);
+            recipient = destination != null ? destination : expression.evaluate(exchange, Object.class);
             recipient = prepareRecipient(exchange, recipient);
-            Endpoint existing = getExistingEndpoint(exchange, recipient);
+            Endpoint existing = getExistingEndpoint(camelContext, recipient);
             if (existing == null) {
-                endpoint = resolveEndpoint(exchange, recipient, prototype);
+                endpoint = resolveEndpoint(camelContext, recipient, prototype);
             } else {
                 endpoint = existing;
                 // we have an existing endpoint then its not a prototype scope
@@ -369,25 +395,26 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout
         return null;
     }
 
-    protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
+    protected static Endpoint getExistingEndpoint(CamelContext context, Object recipient) {
         if (recipient instanceof Endpoint) {
             return (Endpoint) recipient;
         }
         if (recipient != null) {
             if (recipient instanceof NormalizedEndpointUri) {
                 NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
-                ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
+                ExtendedCamelContext ecc = context.adapt(ExtendedCamelContext.class);
                 return ecc.hasEndpoint(nu);
             } else {
                 String uri = recipient.toString();
-                return exchange.getContext().hasEndpoint(uri);
+                return context.hasEndpoint(uri);
             }
         }
         return null;
     }
 
-    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
-        return prototype ? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient) : ExchangeHelper.resolveEndpoint(exchange, recipient);
+    protected static Endpoint resolveEndpoint(CamelContext camelContext, Object recipient, boolean prototype) {
+        return prototype ? ExchangeHelper.resolvePrototypeEndpoint(camelContext, recipient)
+                : ExchangeHelper.resolveEndpoint(camelContext, recipient);
     }
 
     /**
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java b/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java
index 92aec4e..5a0bdf8 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -48,19 +48,15 @@ public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode>
     }
 
     public ExpressionNode(ExpressionDefinition expression) {
-        this.expression = expression;
+        setExpression(expression);
     }
 
     public ExpressionNode(Expression expression) {
-        if (expression != null) {
-            setExpression(ExpressionNodeHelper.toExpressionDefinition(expression));
-        }
+        setExpression(expression);
     }
 
     public ExpressionNode(Predicate predicate) {
-        if (predicate != null) {
-            setExpression(ExpressionNodeHelper.toExpressionDefinition(predicate));
-        }
+        setExpression(predicate);
     }
 
     public ExpressionDefinition getExpression() {
@@ -68,7 +64,15 @@ public abstract class ExpressionNode extends ProcessorDefinition<ExpressionNode>
     }
 
     public void setExpression(Expression expression) {
-        setExpression(new ExpressionDefinition(expression));
+        if (expression != null) {
+            setExpression(ExpressionNodeHelper.toExpressionDefinition(expression));
+        }
+    }
+
+    private void setExpression(Predicate predicate) {
+        if (predicate != null) {
+            setExpression(ExpressionNodeHelper.toExpressionDefinition(predicate));
+        }
     }
 
     public void setExpression(ExpressionDefinition expression) {
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
index ec310c4..ea411b5 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/PollEnrichReifier.java
@@ -18,13 +18,16 @@ package org.apache.camel.reifier;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.PollEnrichDefinition;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
+import org.apache.camel.support.DefaultExchange;
 
 public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
 
@@ -38,9 +41,17 @@ public class PollEnrichReifier extends ProcessorReifier<PollEnrichDefinition> {
         // if no timeout then we should block, and there use a negative timeout
         long time = definition.getTimeout() != null ? parseLong(definition.getTimeout()) : -1;
         boolean isIgnoreInvalidEndpoint = parseBoolean(definition.getIgnoreInvalidEndpoint(), false);
-        Expression exp = createExpression(definition.getExpression());
 
-        PollEnricher enricher = new PollEnricher(exp, time);
+        PollEnricher enricher;
+        if (definition.getExpression() instanceof ConstantExpression) {
+            Expression exp = createExpression(definition.getExpression());
+            Exchange ex = new DefaultExchange(camelContext);
+            String dest = exp.evaluate(ex, String.class);
+            enricher = new PollEnricher(dest, time);
+        } else {
+            Expression exp = createExpression(definition.getExpression());
+            enricher = new PollEnricher(exp, time);
+        }
 
         AggregationStrategy strategy = createAggregationStrategy();
         if (strategy == null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/lw/EnricherLightweightTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/lw/EnricherLightweightTest.java
new file mode 100644
index 0000000..b1ca671
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/lw/EnricherLightweightTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.lw;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+public class EnricherLightweightTest extends ContextTestSupport {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        setUseLightweightContext(true);
+        super.setUp();
+    }
+
+    @Test
+    public void testEnrich() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:enriched");
+        mock.expectedBodiesReceived("res-1", "res-2", "res-3");
+
+        template.sendBody("direct:start", 1);
+        template.sendBody("direct:start", 2);
+        template.sendBody("direct:start", 3);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").enrichWith("direct:resource")
+                        .body(Integer.class, String.class, (o, n) -> n + o).to("mock:enriched");
+
+                // set an empty message
+                from("direct:resource").transform().body(b -> "res-");
+            }
+        };
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/lw/PollEnricherLightweightTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/lw/PollEnricherLightweightTest.java
new file mode 100644
index 0000000..e7142bc
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/lw/PollEnricherLightweightTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.lw;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.enricher.SampleAggregator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PollEnricherLightweightTest extends ContextTestSupport {
+
+    private static SampleAggregator aggregationStrategy = new SampleAggregator();
+
+    protected MockEndpoint mock;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        setUseLightweightContext(true);
+        super.setUp();
+        mock = getMockEndpoint("mock:mock");
+    }
+
+    // -------------------------------------------------------------
+    // InOnly routes
+    // -------------------------------------------------------------
+
+    @Test
+    public void testPollEnrichInOnly() throws InterruptedException {
+        template.sendBody("seda:foo1", "blah");
+
+        mock.expectedBodiesReceived("test:blah");
+        mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo1");
+
+        template.sendBody("direct:enricher-test-1", "test");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testPollEnrichInOnlyWaitWithTimeout() throws InterruptedException {
+        // this first try there is no data so we timeout
+        mock.expectedBodiesReceived("test:blah");
+        mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo2");
+        template.sendBody("direct:enricher-test-2", "test");
+        // not expected data so we are not happy
+        mock.assertIsNotSatisfied();
+
+        // now send it and try again
+        mock.reset();
+        template.sendBody("seda:foo2", "blah");
+        template.sendBody("direct:enricher-test-2", "test");
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void testPollEnrichInOnlyWaitNoTimeout() throws InterruptedException {
+        // use another thread to send it a bit later
+        Thread t = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(250);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                template.sendBody("seda:foo3", "blah");
+            }
+        });
+
+        long start = System.currentTimeMillis();
+        mock.expectedBodiesReceived("test:blah");
+        mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo3");
+        t.start();
+        template.sendBody("direct:enricher-test-3", "test");
+        // should take approx 1 sec to complete as the other thread is sending a
+        // bit later and we wait
+        mock.assertIsSatisfied();
+        long delta = System.currentTimeMillis() - start;
+        assertTrue("Should take approx 0.25 sec: was " + delta, delta > 150);
+    }
+
+    // -------------------------------------------------------------
+    // InOut routes
+    // -------------------------------------------------------------
+
+    @Test
+    public void testPollEnrichInOut() throws InterruptedException {
+        template.sendBody("seda:foo4", "blah");
+
+        String result = (String)template.sendBody("direct:enricher-test-4", ExchangePattern.InOut, "test");
+        assertEquals("test:blah", result);
+    }
+
+    @Test
+    public void testPollEnrichInOutPlusHeader() throws InterruptedException {
+        template.sendBody("seda:foo4", "blah");
+
+        Exchange exchange = template.request("direct:enricher-test-4", new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setHeader("foo", "bar");
+                exchange.getIn().setBody("test");
+            }
+        });
+        assertEquals("bar", exchange.getIn().getHeader("foo"));
+        assertEquals("test:blah", exchange.getIn().getBody());
+        assertEquals("seda://foo4", exchange.getMessage().getHeader(Exchange.TO_ENDPOINT));
+        assertNull(exchange.getException());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                // -------------------------------------------------------------
+                // InOnly routes
+                // -------------------------------------------------------------
+
+                from("direct:enricher-test-1").pollEnrich("seda:foo1", aggregationStrategy).to("mock:mock");
+
+                from("direct:enricher-test-2").pollEnrich("seda:foo2", 1000, aggregationStrategy).to("mock:mock");
+
+                from("direct:enricher-test-3").pollEnrich("seda:foo3", -1, aggregationStrategy).to("mock:mock");
+
+                // -------------------------------------------------------------
+                // InOut routes
+                // -------------------------------------------------------------
+
+                from("direct:enricher-test-4").pollEnrich("seda:foo4", aggregationStrategy);
+            }
+        };
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 70d79fa..0bf9076 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -89,6 +89,20 @@ public final class ExchangeHelper {
      * @throws NoSuchEndpointException if the endpoint cannot be resolved
      */
     public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+        return resolveEndpoint(exchange.getContext(), value);
+    }
+
+    /**
+     * Attempts to resolve the endpoint for the given value
+     *
+     * @param context  the camel context
+     * @param value    the value which can be an {@link Endpoint} or an object
+     *                 which provides a String representation of an endpoint via
+     *                 {@link #toString()}
+     * @return the endpoint
+     * @throws NoSuchEndpointException if the endpoint cannot be resolved
+     */
+    public static Endpoint resolveEndpoint(CamelContext context, Object value) throws NoSuchEndpointException {
         if (value == null) {
             throw new NoSuchEndpointException("null");
         }
@@ -97,10 +111,10 @@ public final class ExchangeHelper {
             endpoint = (Endpoint) value;
         } else if (value instanceof NormalizedEndpointUri) {
             NormalizedEndpointUri nu = (NormalizedEndpointUri) value;
-            endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), nu);
+            endpoint = CamelContextHelper.getMandatoryEndpoint(context, nu);
         } else {
             String uri = value.toString().trim();
-            endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
+            endpoint = CamelContextHelper.getMandatoryEndpoint(context, uri);
         }
         return endpoint;
     }
@@ -116,6 +130,20 @@ public final class ExchangeHelper {
      * @throws NoSuchEndpointException if the endpoint cannot be resolved
      */
     public static Endpoint resolvePrototypeEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
+        return resolvePrototypeEndpoint(exchange.getContext(), value);
+    }
+
+    /**
+     * Attempts to resolve the endpoint (prototype scope) for the given value
+     *
+     * @param context  the camel context
+     * @param value    the value which can be an {@link Endpoint} or an object
+     *                 which provides a String representation of an endpoint via
+     *                 {@link #toString()}
+     * @return the endpoint
+     * @throws NoSuchEndpointException if the endpoint cannot be resolved
+     */
+    public static Endpoint resolvePrototypeEndpoint(CamelContext context, Object value) throws NoSuchEndpointException {
         if (value == null) {
             throw new NoSuchEndpointException("null");
         }
@@ -124,10 +152,10 @@ public final class ExchangeHelper {
             endpoint = (Endpoint) value;
         } else if (value instanceof NormalizedEndpointUri) {
             NormalizedEndpointUri nu = (NormalizedEndpointUri) value;
-            endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), nu);
+            endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(context, nu);
         } else {
             String uri = value.toString().trim();
-            endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(exchange.getContext(), uri);
+            endpoint = CamelContextHelper.getMandatoryPrototypeEndpoint(context, uri);
         }
         return endpoint;
     }