You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/12 15:50:52 UTC

camel git commit: CAMEL-7609: Added shareUnitOfWork option to enrich. Thanks to metatech for the patch which I adjusted.

Repository: camel
Updated Branches:
  refs/heads/master 5e5a4d6db -> 111c01ad7


CAMEL-7609: Added shareUnitOfWork option to enrich. Thanks to metatech for the patch which I adjusted.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/111c01ad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/111c01ad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/111c01ad

Branch: refs/heads/master
Commit: 111c01ad76d9930f0540f83ff5d3adb8c0279788
Parents: 5e5a4d6
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jul 12 14:49:13 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jul 12 15:57:26 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/model/EnrichDefinition.java    |  21 +++-
 .../apache/camel/model/ProcessorDefinition.java |  49 +++++++++
 .../org/apache/camel/processor/Enricher.java    |  23 +++-
 .../processor/EnrichSubUnitOfWorkTest.java      | 108 +++++++++++++++++++
 4 files changed, 196 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 5a3662f..6f0e358 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -57,7 +57,9 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
     private Boolean aggregateOnException;
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
-    
+    @XmlAttribute
+    private Boolean shareUnitOfWork;
+
     public EnrichDefinition() {
         this(null, null);
     }
@@ -107,8 +109,9 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
         } else {
             endpoint = routeContext.resolveEndpoint(null, resourceRef);
         }
+        boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
 
-        Enricher enricher = new Enricher(null, endpoint.createProducer());
+        Enricher enricher = new Enricher(null, endpoint.createProducer(), isShareUnitOfWork);
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
         if (strategy == null) {
             enricher.setDefaultAggregationStrategy();
@@ -232,4 +235,18 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
     public void setAggregateOnException(Boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
+
+    public Boolean getShareUnitOfWork() {
+        return shareUnitOfWork;
+    }
+
+    /**
+     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange.
+     * Enrich will by default not share unit of work between the parent exchange and the resource exchange.
+     * This means the resource exchange has its own individual unit of work.
+     */
+    public void setShareUnitOfWork(Boolean shareUnitOfWork) {
+        this.shareUnitOfWork = shareUnitOfWork;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 8aec8b9..c84ea47 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3153,6 +3153,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      *
      * @param resourceUri           URI of resource endpoint for obtaining additional data.
      * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
+     * @param aggregateOnException   whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+     *                               an exception was thrown.
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
@@ -3167,6 +3169,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
     /**
      * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
      * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining additional data.
+     * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
+     * @param aggregateOnException  whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+     *                              an exception was thrown.
+     * @param shareUnitOfWork       whether to share unit of work
+     * @return the builder
+     * @see org.apache.camel.processor.Enricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) {
+        EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri);
+        enrich.setAggregateOnException(aggregateOnException);
+        enrich.setShareUnitOfWork(shareUnitOfWork);
+        addOutput(enrich);
+        return (Type) this;
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
      * <p/>
      * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
      * to obatin the additional data, where as pollEnrich uses a polling consumer.
@@ -3228,6 +3251,32 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
 
     /**
      * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+     * <p/>
+     * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
+     * to obtain the additional data, where as pollEnrich uses a polling consumer.
+     *
+     * @param resourceRef            Reference of resource endpoint for obtaining additional data.
+     * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
+     * @param aggregateOnException   whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+     *                               an exception was thrown.
+     * @param shareUnitOfWork        whether to share unit of work
+     * @return the builder
+     * @see org.apache.camel.processor.Enricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) {
+        EnrichDefinition enrich = new EnrichDefinition();
+        enrich.setResourceRef(resourceRef);
+        enrich.setAggregationStrategyRef(aggregationStrategyRef);
+        enrich.setAggregateOnException(aggregateOnException);
+        enrich.setShareUnitOfWork(shareUnitOfWork);
+        addOutput(enrich);
+        return (Type) this;
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
      * enriches an exchange with additional data obtained from a <code>resourceUri</code>
      * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
      * <p/>

http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index 75d2429..b0532ba 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -23,10 +23,13 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +60,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
     private AggregationStrategy aggregationStrategy;
     private Producer producer;
     private boolean aggregateOnException;
+    private boolean shareUnitOfWork;
 
     /**
      * Creates a new {@link Enricher}. The default aggregation strategy is to
@@ -67,7 +71,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
      * @param producer producer to resource endpoint.
      */
     public Enricher(Producer producer) {
-        this(defaultAggregationStrategy(), producer);
+        this(defaultAggregationStrategy(), producer, false);
     }
 
     /**
@@ -75,10 +79,12 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
      * 
      * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
      * @param producer producer to resource endpoint.
+     * @param shareUnitOfWork whether to share unit of work
      */
-    public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
+    public Enricher(AggregationStrategy aggregationStrategy, Producer producer, boolean shareUnitOfWork) {
         this.aggregationStrategy = aggregationStrategy;
         this.producer = producer;
+        this.shareUnitOfWork = shareUnitOfWork;
     }
 
     public String getId() {
@@ -144,7 +150,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
         final Endpoint destination = producer.getEndpoint();
-        
+
         EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination);
         // record timing for sending the exchange using the producer
         final StopWatch watch = new StopWatch();
@@ -247,6 +253,13 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
         // copy exchange, and do not share the unit of work
         Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
         target.setPattern(pattern);
+
+        // if we share unit of work, we need to prepare the resource exchange
+        if (isShareUnitOfWork()) {
+            target.setProperty(Exchange.PARENT_UNIT_OF_WORK, source.getUnitOfWork());
+            // and then share the unit of work
+            target.setUnitOfWork(source.getUnitOfWork());
+        }
         return target;
     }
 
@@ -260,6 +273,10 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
         return new CopyAggregationStrategy();
     }
 
+    public boolean isShareUnitOfWork() {
+        return shareUnitOfWork;
+    }
+
     @Override
     public String toString() {
         return "Enrich[" + producer.getEndpoint() + "]";

http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java
new file mode 100644
index 0000000..0db19e6
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class EnrichSubUnitOfWorkTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testOK() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:dead").expectedMessageCount(0);
+        getMockEndpoint("mock:start").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testError() throws Exception {
+        counter = 0;
+
+        // the DLC should receive the original message which is Bye World
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:start").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        template.sendBody("direct:start", "Bye World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(4, counter); // 1 first + 3 redeliveries
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead").useOriginalMessage()
+                        .maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:start")
+                    .to("mock:start")
+                    .process(new MyPreProcessor())
+                    .enrich("direct:b", null, false, true)
+                    .to("mock:result");
+
+                from("direct:b")
+                    .process(new MyProcessor())
+                    .to("mock:b");
+            }
+        };
+    }
+
+    public static class MyPreProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            // if its a bye message then alter it to something with
+            // Donkey to cause a failure in the sub unit of work
+            // but the DLC should still receive the original input
+            String body = exchange.getIn().getBody(String.class);
+            if (body.startsWith("Bye")) {
+                exchange.getIn().setBody("Donkey was here");
+            }
+        }
+    }
+
+    public static class MyProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            String body = exchange.getIn().getBody(String.class);
+            if (body.contains("Donkey")) {
+                counter++;
+                throw new IllegalArgumentException("Donkey not allowed");
+            }
+        }
+    }
+
+
+}