You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/08/12 16:39:59 UTC
git commit: CAMEL-5286: More flexible onCompletion allow to configure
mode before/after consumer. And whether to use async/sync with thread pool or
not.
Repository: camel
Updated Branches:
refs/heads/master c475fb661 -> f4d9d3c33
CAMEL-5286: More flexible onCompletion allow to configure mode before/after consumer. And whether to use async/sync with thread pool or not.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f4d9d3c3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f4d9d3c3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f4d9d3c3
Branch: refs/heads/master
Commit: f4d9d3c3397dc91aaccc980c6afc753d3df28e09
Parents: c475fb6
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Aug 12 16:39:48 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 12 16:39:48 2014 +0200
----------------------------------------------------------------------
.../camel/model/OnCompletionDefinition.java | 71 +++++++++-
.../apache/camel/model/OnCompletionMode.java | 28 ++++
.../camel/processor/OnCompletionProcessor.java | 135 +++++++++++++++----
.../resources/org/apache/camel/model/jaxb.index | 1 +
.../camel/processor/OnCompletionAsyncTest.java | 10 +-
.../camel/processor/OnCompletionModeTest.java | 69 ++++++++++
.../OnCompletionParallelProcessingTest.java | 49 +++++++
.../OnCompletionUseOriginalBodyTest.java | 2 +-
...ompletionAndInterceptAndOnExceptionTest.java | 4 +-
.../component/jms/JmsOnCompletionTest.java | 2 +-
.../scala/dsl/SOnCompletionDefinition.scala | 8 +-
.../camel/scala/dsl/SOnCompletionModeTest.scala | 48 +++++++
.../dsl/SOnCompletionOnCompleteOnlyTest.scala | 2 +-
.../dsl/SOnCompletionOnFailureOnlyTest.scala | 2 +-
.../processor/SpringOnCompletionModeTest.java | 33 +++++
.../processor/SpringOnCompletionModeTest.xml | 59 ++++++++
.../SpringOnCompletionUseOriginalBodyTest.xml | 2 +-
17 files changed, 480 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
index 001c0fe..1e2bb7e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionDefinition.java
@@ -46,12 +46,16 @@ import org.apache.camel.spi.RouteContext;
@XmlAccessorType(XmlAccessType.FIELD)
public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
@XmlAttribute
+ private OnCompletionMode mode;
+ @XmlAttribute
private Boolean onCompleteOnly;
@XmlAttribute
private Boolean onFailureOnly;
@XmlElement(name = "onWhen")
private WhenDefinition onWhen;
@XmlAttribute
+ private Boolean parallelProcessing;
+ @XmlAttribute
private String executorServiceRef;
@XmlAttribute(name = "useOriginalMessage")
private Boolean useOriginalMessagePolicy;
@@ -137,14 +141,16 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
when = onWhen.getExpression().createPredicate(routeContext);
}
- // executor service is mandatory for on completion
- boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
- ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
+ boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
+ ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing());
+
+ // should be after consumer by default
+ boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer;
// should be false by default
boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false;
OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal,
- threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original);
+ threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original, afterConsumer);
return answer;
}
@@ -173,6 +179,32 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
}
/**
+ * Sets the mode to be after route is done (default due backwards compatible).
+ * <p/>
+ * This executes the on completion work <i>after</i> the route consumer have written response
+ * back to the callee (if its InOut mode).
+ *
+ * @return the builder
+ */
+ public OnCompletionDefinition modeAfterConsumer() {
+ setMode(OnCompletionMode.AfterConsumer);
+ return this;
+ }
+
+ /**
+ * Sets the mode to be before consumer is done.
+ * <p/>
+ * This allows the on completion work to execute <i>before</i> the route consumer, writes any response
+ * back to the callee (if its InOut mode).
+ *
+ * @return the builder
+ */
+ public OnCompletionDefinition modeBeforeConsumer() {
+ setMode(OnCompletionMode.BeforeConsumer);
+ return this;
+ }
+
+ /**
* Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors).
*
* @return the builder
@@ -239,6 +271,16 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
return this;
}
+ /**
+ * Doing the on completion work in parallel
+ *
+ * @return the builder
+ */
+ public OnCompletionDefinition parallelProcessing() {
+ setParallelProcessing(true);
+ return this;
+ }
+
public List<ProcessorDefinition<?>> getOutputs() {
return outputs;
}
@@ -251,6 +293,14 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
return true;
}
+ public OnCompletionMode getMode() {
+ return mode;
+ }
+
+ public void setMode(OnCompletionMode mode) {
+ this.mode = mode;
+ }
+
public Boolean getOnCompleteOnly() {
return onCompleteOnly;
}
@@ -307,4 +357,17 @@ public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefi
this.useOriginalMessagePolicy = useOriginalMessagePolicy;
}
+ public Boolean getParallelProcessing() {
+ return parallelProcessing;
+ }
+
+ public void setParallelProcessing(Boolean parallelProcessing) {
+ this.parallelProcessing = parallelProcessing;
+ }
+
+ public boolean isParallelProcessing() {
+ return parallelProcessing != null && parallelProcessing;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java b/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java
new file mode 100644
index 0000000..1d6a800
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/model/OnCompletionMode.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlEnum;
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType
+@XmlEnum(String.class)
+public enum OnCompletionMode {
+
+ AfterConsumer, BeforeConsumer
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index 3604a5c..35b0a52 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -28,6 +28,7 @@ import org.apache.camel.Message;
import org.apache.camel.Ordered;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.Route;
import org.apache.camel.Traceable;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.support.SynchronizationAdapter;
@@ -53,9 +54,10 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
private final boolean onFailureOnly;
private final Predicate onWhen;
private final boolean useOriginalBody;
+ private final boolean afterConsumer;
public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService,
- boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody) {
+ boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody, boolean afterConsumer) {
notNull(camelContext, "camelContext");
notNull(processor, "processor");
this.camelContext = camelContext;
@@ -66,6 +68,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
this.onFailureOnly = onFailureOnly;
this.onWhen = onWhen;
this.useOriginalBody = useOriginalBody;
+ this.afterConsumer = afterConsumer;
}
@Override
@@ -97,13 +100,22 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
public boolean process(Exchange exchange, AsyncCallback callback) {
if (processor != null) {
// register callback
- exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronization());
+ if (afterConsumer) {
+ exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationAfterConsumer());
+ } else {
+ exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationBeforeConsumer());
+ }
}
callback.done(true);
return true;
}
+ protected boolean isCreateCopy() {
+ // we need to create a correlated copy if we run in parallel mode
+ return executorService != null;
+ }
+
/**
* Processes the exchange by the processors
*
@@ -127,17 +139,22 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
protected Exchange prepareExchange(Exchange exchange) {
Exchange answer;
- // for asynchronous routing we must use a copy as we dont want it
- // to cause side effects of the original exchange
- // (the original thread will run in parallel)
- answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
- if (answer.hasOut()) {
- // move OUT to IN (pipes and filters)
- answer.setIn(answer.getOut());
- answer.setOut(null);
+ if (isCreateCopy()) {
+ // for asynchronous routing we must use a copy as we dont want it
+ // to cause side effects of the original exchange
+ // (the original thread will run in parallel)
+ answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
+ if (answer.hasOut()) {
+ // move OUT to IN (pipes and filters)
+ answer.setIn(answer.getOut());
+ answer.setOut(null);
+ }
+ // set MEP to InOnly as this wire tap is a fire and forget
+ answer.setPattern(ExchangePattern.InOnly);
+ } else {
+ // use the exchange as-is
+ answer = exchange;
}
- // set MEP to InOnly as this wire tap is a fire and forget
- answer.setPattern(ExchangePattern.InOnly);
if (useOriginalBody) {
LOG.trace("Using the original IN message instead of current");
@@ -152,7 +169,7 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
return answer;
}
- private final class OnCompletionSynchronization extends SynchronizationAdapter implements Ordered {
+ private final class OnCompletionSynchronizationAfterConsumer extends SynchronizationAdapter implements Ordered {
public int getOrder() {
// we want to be last
@@ -173,13 +190,19 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
// must use a copy as we dont want it to cause side effects of the original exchange
final Exchange copy = prepareExchange(exchange);
- executorService.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- LOG.debug("Processing onComplete: {}", copy);
- doProcess(processor, copy);
- return copy;
- }
- });
+ if (executorService != null) {
+ executorService.submit(new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ LOG.debug("Processing onComplete: {}", copy);
+ doProcess(processor, copy);
+ return copy;
+ }
+ });
+ } else {
+ // run without thread-pool
+ LOG.debug("Processing onComplete: {}", copy);
+ doProcess(processor, copy);
+ }
}
public void onFailure(final Exchange exchange) {
@@ -192,19 +215,31 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
return;
}
+
// must use a copy as we dont want it to cause side effects of the original exchange
final Exchange copy = prepareExchange(exchange);
+ final Exception original = copy.getException();
// must remove exception otherwise onFailure routing will fail as well
// the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange
copy.setException(null);
- executorService.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- LOG.debug("Processing onFailure: {}", copy);
- doProcess(processor, copy);
- return null;
- }
- });
+ if (executorService != null) {
+ executorService.submit(new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ LOG.debug("Processing onFailure: {}", copy);
+ doProcess(processor, copy);
+ // restore exception after processing
+ copy.setException(original);
+ return null;
+ }
+ });
+ } else {
+ // run without thread-pool
+ LOG.debug("Processing onFailure: {}", copy);
+ doProcess(processor, copy);
+ // restore exception after processing
+ copy.setException(original);
+ }
}
@Override
@@ -219,6 +254,52 @@ public class OnCompletionProcessor extends ServiceSupport implements AsyncProces
}
}
+ private final class OnCompletionSynchronizationBeforeConsumer extends SynchronizationAdapter implements Ordered {
+
+ public int getOrder() {
+ // we want to be last
+ return Ordered.LOWEST;
+ }
+
+ @Override
+ public void onAfterRoute(Route route, Exchange exchange) {
+ if (exchange.isFailed() && onCompleteOnly) {
+ return;
+ }
+
+ if (!exchange.isFailed() && onFailureOnly) {
+ return;
+ }
+
+ if (onWhen != null && !onWhen.matches(exchange)) {
+ // predicate did not match so do not route the onComplete
+ return;
+ }
+
+ // must use a copy as we dont want it to cause side effects of the original exchange
+ final Exchange copy = prepareExchange(exchange);
+
+ if (executorService != null) {
+ executorService.submit(new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ LOG.debug("Processing onAfterRoute: {}", copy);
+ doProcess(processor, copy);
+ return copy;
+ }
+ });
+ } else {
+ // run without thread-pool
+ LOG.debug("Processing onAfterRoute: {}", copy);
+ doProcess(processor, copy);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "onAfterRoute";
+ }
+ }
+
@Override
public String toString() {
return "OnCompletionProcessor[" + processor + "]";
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
----------------------------------------------------------------------
diff --git a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
index f0d43ae..ea0d2b9 100644
--- a/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
+++ b/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
@@ -43,6 +43,7 @@ LoopDefinition
MarshalDefinition
MulticastDefinition
OnCompletionDefinition
+OnCompletionMode
OnExceptionDefinition
OptimisticLockRetryPolicyDefinition
OptionalIdentifiedDefinition
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
index 5c4ef2d..386e402 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionAsyncTest.java
@@ -37,7 +37,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- onCompletion()
+ onCompletion().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"))
@@ -67,7 +67,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- onCompletion()
+ onCompletion().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"))
@@ -101,7 +101,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- onCompletion().useOriginalBody()
+ onCompletion().useOriginalBody().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"))
@@ -131,7 +131,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- onCompletion().useOriginalBody()
+ onCompletion().useOriginalBody().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"))
@@ -166,7 +166,7 @@ public class OnCompletionAsyncTest extends ContextTestSupport {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
- onCompletion()
+ onCompletion().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"))
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java
new file mode 100644
index 0000000..e958ef2
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionModeTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class OnCompletionModeTest extends ContextTestSupport {
+
+ public void testOnCompletionScopeBefore() throws Exception {
+ getMockEndpoint("mock:input").expectedBodiesReceived("Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello Camel");
+
+ String out = template.requestBody("seda:foo", "Camel", String.class);
+ assertEquals("I was here Hello Camel", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testOnCompletionScopeAfter() throws Exception {
+ getMockEndpoint("mock:input").expectedBodiesReceived("World");
+ getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello World");
+
+ String out = template.requestBody("seda:bar", "World", String.class);
+ assertEquals("Hello World", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo")
+ // we do not want parallel as we want to change the message before the consumer writes the response
+ .onCompletion().modeBeforeConsumer()
+ .transform(body().prepend("I was here "))
+ .to("mock:after")
+ .end()
+ .to("mock:input")
+ .transform(body().prepend("Hello ")).to("log:foo");
+
+ from("seda:bar")
+ // need to use parallel to make copy so we do not do side-effects
+ .onCompletion().modeAfterConsumer().parallelProcessing()
+ .transform(body().prepend("I was here "))
+ .to("mock:after")
+ .end()
+ .to("mock:input")
+ .transform(body().prepend("Hello ")).to("log:bar");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java
new file mode 100644
index 0000000..da99971
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionParallelProcessingTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class OnCompletionParallelProcessingTest extends ContextTestSupport {
+
+ public void testOnCompletionParallel() throws Exception {
+ getMockEndpoint("mock:input").expectedBodiesReceived("World");
+ getMockEndpoint("mock:after").expectedBodiesReceived("I was here Hello World");
+
+ String out = template.requestBody("seda:bar", "World", String.class);
+ assertEquals("Hello World", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:bar")
+ .onCompletion().parallelProcessing()
+ .transform(body().prepend("I was here "))
+ .to("mock:after")
+ .end()
+ .to("mock:input")
+ .transform(body().prepend("Hello ")).to("log:bar");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
index ade4fb5..dc6d31f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/OnCompletionUseOriginalBodyTest.java
@@ -46,7 +46,7 @@ public class OnCompletionUseOriginalBodyTest extends ContextTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- onCompletion().useOriginalBody()
+ onCompletion().useOriginalBody().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"))
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
index 30eb66d..67a3b45 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionAndInterceptAndOnExceptionTest.java
@@ -69,10 +69,10 @@ public class JmsOnCompletionAndInterceptAndOnExceptionTest extends CamelTestSupp
public void configure() throws Exception {
intercept().to("mock:intercept");
- // define a global on completion that is invoked when the exchage is complete
+ // define a global on completion that is invoked when the exchange is complete
onCompletion().to("log:global").to("mock:sync");
- // define an on excpetion
+ // define an on exception
onException(Exception.class).to("mock:exception");
from("activemq:queue:start")
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
index 171be60..3d51861 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsOnCompletionTest.java
@@ -73,7 +73,7 @@ public class JmsOnCompletionTest extends CamelTestSupport {
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
- // here the original route contiues
+ // here the original route continues
.process(new MyProcessor())
.to("mock:result");
// END SNIPPET: e1
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
index 94659f4..9277819 100644
--- a/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
+++ b/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SOnCompletionDefinition.scala
@@ -32,8 +32,12 @@ case class SOnCompletionDefinition(override val target : OnCompletionDefinition)
def onFailureOnly = wrap(target.onFailureOnly)
def onCompleteOnly = wrap(target.onCompleteOnly)
- def useOriginalBody = wrap(target.useOriginalBody())
-
+ def useOriginalBody = wrap(target.useOriginalBody)
+
+ def modeBeforeConsumer = wrap(target.modeBeforeConsumer)
+ def modeAfterConsumer = wrap(target.modeAfterConsumer)
+
+ def parallelProcessing = wrap(target.parallelProcessing)
def executorService(executorService: ExecutorService) = wrap(target.setExecutorService(executorService))
def executorServiceRef(ref: String) = wrap(target.setExecutorServiceRef(ref))
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala
new file mode 100644
index 0000000..8e981bb
--- /dev/null
+++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionModeTest.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.scala.dsl
+
+import org.apache.camel.scala.dsl.builder.{RouteBuilderSupport, RouteBuilder}
+import org.apache.camel.processor.OnCompletionModeTest
+
+class SOnCompletionModeTest extends OnCompletionModeTest with RouteBuilderSupport {
+
+ override def createRouteBuilder = new RouteBuilder {
+
+ "seda:foo" ==> {
+ onCompletion.modeBeforeConsumer {
+ transform(simple("I was here ${body}"))
+ to("mock:after")
+ }
+ to("mock:input")
+ transform(simple("Hello ${body}"))
+ to("log:foo")
+ }
+
+ "seda:bar" ==> {
+ onCompletion.modeAfterConsumer {
+ transform(simple("I was here ${body}"))
+ to("mock:after")
+ }
+ to("mock:input")
+ transform(simple("Hello ${body}"))
+ to("log:bar")
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
index 837c997..354c34d 100644
--- a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
+++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnCompleteOnlyTest.scala
@@ -26,7 +26,7 @@ class SOnCompletionOnCompleteOnlyTest extends OnCompletionOnCompleteOnlyTest wit
override def createRouteBuilder = new RouteBuilder {
"direct:start" ==> {
- onCompletion(completeOnly) {
+ onCompletion(completeOnly).parallelProcessing {
to("mock:sync")
}
process(new MyProcessor())
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
index e4ce4cb..f645d54 100644
--- a/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
+++ b/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SOnCompletionOnFailureOnlyTest.scala
@@ -26,7 +26,7 @@ class SOnCompletionOnFailureOnlyTest extends OnCompletionOnFailureOnlyTest with
override def createRouteBuilder = new RouteBuilder {
"direct:start" ==> {
- onCompletion(failureOnly) {
+ onCompletion(failureOnly).parallelProcessing {
to("mock:sync")
}
process(new MyProcessor())
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java
new file mode 100644
index 0000000..eee48ee
--- /dev/null
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringOnCompletionModeTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.OnCompletionModeTest;
+
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version
+ */
+public class SpringOnCompletionModeTest extends OnCompletionModeTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml
new file mode 100644
index 0000000..2bc1211
--- /dev/null
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionModeTest.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+
+ <route>
+ <from uri="seda:foo"/>
+ <onCompletion mode="BeforeConsumer">
+ <transform>
+ <simple>I was here ${body}</simple>
+ </transform>
+ <to uri="mock:after"/>
+ </onCompletion>
+ <to uri="mock:input"/>
+ <transform>
+ <simple>Hello ${body}</simple>
+ </transform>
+ <to uri="log:foo"/>
+ </route>
+
+ <route>
+ <from uri="seda:bar"/>
+ <onCompletion mode="AfterConsumer" parallelProcessing="true">
+ <transform>
+ <simple>I was here ${body}</simple>
+ </transform>
+ <to uri="mock:after"/>
+ </onCompletion>
+ <to uri="mock:input"/>
+ <transform>
+ <simple>Hello ${body}</simple>
+ </transform>
+ <to uri="log:bar"/>
+ </route>
+
+ </camelContext>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/camel/blob/f4d9d3c3/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
index f125623..83b3a42 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringOnCompletionUseOriginalBodyTest.xml
@@ -25,7 +25,7 @@
<bean id="myProcessor" class="org.apache.camel.processor.OnCompletionUseOriginalBodyTest$MyProcessor"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
- <onCompletion useOriginalMessage="true">
+ <onCompletion useOriginalMessage="true" parallelProcessing="true">
<to uri="mock:before"/>
<delay>
<constant>1000</constant>