You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/12 15:50:52 UTC
camel git commit: CAMEL-7609: Added shareUnitOfWork option to enrich.
Thanks to metatech for the patch which I adjusted.
Repository: camel
Updated Branches:
refs/heads/master 5e5a4d6db -> 111c01ad7
CAMEL-7609: Added shareUnitOfWork option to enrich. Thanks to metatech for the patch which I adjusted.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/111c01ad
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/111c01ad
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/111c01ad
Branch: refs/heads/master
Commit: 111c01ad76d9930f0540f83ff5d3adb8c0279788
Parents: 5e5a4d6
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Jul 12 14:49:13 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Jul 12 15:57:26 2015 +0200
----------------------------------------------------------------------
.../apache/camel/model/EnrichDefinition.java | 21 +++-
.../apache/camel/model/ProcessorDefinition.java | 49 +++++++++
.../org/apache/camel/processor/Enricher.java | 23 +++-
.../processor/EnrichSubUnitOfWorkTest.java | 108 +++++++++++++++++++
4 files changed, 196 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 5a3662f..6f0e358 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -57,7 +57,9 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
private Boolean aggregateOnException;
@XmlTransient
private AggregationStrategy aggregationStrategy;
-
+ @XmlAttribute
+ private Boolean shareUnitOfWork;
+
public EnrichDefinition() {
this(null, null);
}
@@ -107,8 +109,9 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
} else {
endpoint = routeContext.resolveEndpoint(null, resourceRef);
}
+ boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
- Enricher enricher = new Enricher(null, endpoint.createProducer());
+ Enricher enricher = new Enricher(null, endpoint.createProducer(), isShareUnitOfWork);
AggregationStrategy strategy = createAggregationStrategy(routeContext);
if (strategy == null) {
enricher.setDefaultAggregationStrategy();
@@ -232,4 +235,18 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple
public void setAggregateOnException(Boolean aggregateOnException) {
this.aggregateOnException = aggregateOnException;
}
+
+ public Boolean getShareUnitOfWork() {
+ return shareUnitOfWork;
+ }
+
+ /**
+ * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange.
+ * Enrich will by default not share unit of work between the parent exchange and the resource exchange.
+ * This means the resource exchange has its own individual unit of work.
+ */
+ public void setShareUnitOfWork(Boolean shareUnitOfWork) {
+ this.shareUnitOfWork = shareUnitOfWork;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 8aec8b9..c84ea47 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3153,6 +3153,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
*
* @param resourceUri URI of resource endpoint for obtaining additional data.
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
+ * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+ * an exception was thrown.
* @return the builder
* @see org.apache.camel.processor.Enricher
*/
@@ -3167,6 +3169,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
/**
* The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
* enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining additional data.
+ * @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
+ * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+ * an exception was thrown.
+ * @param shareUnitOfWork whether to share unit of work
+ * @return the builder
+ * @see org.apache.camel.processor.Enricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) {
+ EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri);
+ enrich.setAggregateOnException(aggregateOnException);
+ enrich.setShareUnitOfWork(shareUnitOfWork);
+ addOutput(enrich);
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
* <p/>
* The difference between this and {@link #pollEnrich(String)} is that this uses a producer
* to obatin the additional data, where as pollEnrich uses a polling consumer.
@@ -3228,6 +3251,32 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
/**
* The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
+ * <p/>
+ * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
+ * to obtain the additional data, where as pollEnrich uses a polling consumer.
+ *
+ * @param resourceRef Reference of resource endpoint for obtaining additional data.
+ * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
+ * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if
+ * an exception was thrown.
+ * @param shareUnitOfWork whether to share unit of work
+ * @return the builder
+ * @see org.apache.camel.processor.Enricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) {
+ EnrichDefinition enrich = new EnrichDefinition();
+ enrich.setResourceRef(resourceRef);
+ enrich.setAggregationStrategyRef(aggregationStrategyRef);
+ enrich.setAggregateOnException(aggregateOnException);
+ enrich.setShareUnitOfWork(shareUnitOfWork);
+ addOutput(enrich);
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
* enriches an exchange with additional data obtained from a <code>resourceUri</code>
* using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
* <p/>
http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index 75d2429..b0532ba 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -23,10 +23,13 @@ import org.apache.camel.Endpoint;
import org.apache.camel.EndpointAware;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
@@ -57,6 +60,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
private AggregationStrategy aggregationStrategy;
private Producer producer;
private boolean aggregateOnException;
+ private boolean shareUnitOfWork;
/**
* Creates a new {@link Enricher}. The default aggregation strategy is to
@@ -67,7 +71,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
* @param producer producer to resource endpoint.
*/
public Enricher(Producer producer) {
- this(defaultAggregationStrategy(), producer);
+ this(defaultAggregationStrategy(), producer, false);
}
/**
@@ -75,10 +79,12 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
*
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
* @param producer producer to resource endpoint.
+ * @param shareUnitOfWork whether to share unit of work
*/
- public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
+ public Enricher(AggregationStrategy aggregationStrategy, Producer producer, boolean shareUnitOfWork) {
this.aggregationStrategy = aggregationStrategy;
this.producer = producer;
+ this.shareUnitOfWork = shareUnitOfWork;
}
public String getId() {
@@ -144,7 +150,7 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
final Endpoint destination = producer.getEndpoint();
-
+
EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination);
// record timing for sending the exchange using the producer
final StopWatch watch = new StopWatch();
@@ -247,6 +253,13 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
// copy exchange, and do not share the unit of work
Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
target.setPattern(pattern);
+
+ // if we share unit of work, we need to prepare the resource exchange
+ if (isShareUnitOfWork()) {
+ target.setProperty(Exchange.PARENT_UNIT_OF_WORK, source.getUnitOfWork());
+ // and then share the unit of work
+ target.setUnitOfWork(source.getUnitOfWork());
+ }
return target;
}
@@ -260,6 +273,10 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint
return new CopyAggregationStrategy();
}
+ public boolean isShareUnitOfWork() {
+ return shareUnitOfWork;
+ }
+
@Override
public String toString() {
return "Enrich[" + producer.getEndpoint() + "]";
http://git-wip-us.apache.org/repos/asf/camel/blob/111c01ad/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java b/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java
new file mode 100644
index 0000000..0db19e6
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/EnrichSubUnitOfWorkTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class EnrichSubUnitOfWorkTest extends ContextTestSupport {
+
+ private static int counter;
+
+ public void testOK() throws Exception {
+ counter = 0;
+
+ getMockEndpoint("mock:dead").expectedMessageCount(0);
+ getMockEndpoint("mock:start").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testError() throws Exception {
+ counter = 0;
+
+ // the DLC should receive the original message which is Bye World
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:start").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:b").expectedMessageCount(0);
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ template.sendBody("direct:start", "Bye World");
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(4, counter); // 1 first + 3 redeliveries
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead").useOriginalMessage()
+ .maximumRedeliveries(3).redeliveryDelay(0));
+
+ from("direct:start")
+ .to("mock:start")
+ .process(new MyPreProcessor())
+ .enrich("direct:b", null, false, true)
+ .to("mock:result");
+
+ from("direct:b")
+ .process(new MyProcessor())
+ .to("mock:b");
+ }
+ };
+ }
+
+ public static class MyPreProcessor implements Processor {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // if its a bye message then alter it to something with
+ // Donkey to cause a failure in the sub unit of work
+ // but the DLC should still receive the original input
+ String body = exchange.getIn().getBody(String.class);
+ if (body.startsWith("Bye")) {
+ exchange.getIn().setBody("Donkey was here");
+ }
+ }
+ }
+
+ public static class MyProcessor implements Processor {
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ if (body.contains("Donkey")) {
+ counter++;
+ throw new IllegalArgumentException("Donkey not allowed");
+ }
+ }
+ }
+
+
+}