You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2013/08/25 16:30:49 UTC
git commit: CAMEL-6667 Loop EIP doesn't honour copy option in some
circumstances;
also add 'append' option to MyAsyncComponent for testing, such that it retains
the input body for assertion purposes
Updated Branches:
refs/heads/master 53575ff76 -> 1fc7bd7a7
CAMEL-6667 Loop EIP doesn't honour copy option in some circumstances; also add 'append' option to MyAsyncComponent for testing, such that it retains the input body for assertion purposes
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1fc7bd7a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1fc7bd7a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1fc7bd7a
Branch: refs/heads/master
Commit: 1fc7bd7a74e3533fb04141fd3343edcd2cc43261
Parents: 53575ff
Author: Raúl Kripalani <ra...@apache.org>
Authored: Sun Aug 25 15:26:55 2013 +0100
Committer: Raúl Kripalani <ra...@apache.org>
Committed: Sun Aug 25 15:30:32 2013 +0100
----------------------------------------------------------------------
.../apache/camel/processor/LoopProcessor.java | 25 ++--
.../camel/processor/AsyncLoopCopyTest.java | 77 +++++++++++
.../apache/camel/processor/AsyncLoopTest.java | 137 +++++++++++++++++++
.../camel/processor/async/MyAsyncEndpoint.java | 9 ++
.../camel/processor/async/MyAsyncProducer.java | 1 +
5 files changed, 240 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1fc7bd7a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
index df2baed..89649b1 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -60,7 +60,11 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
callback.done(true);
return true;
}
-
+
+ // we hold on to the original Exchange in case it's needed for copies
+ final Exchange original = exchange;
+
+ // per-iteration exchange
Exchange target = exchange;
// set the size before we start
@@ -70,8 +74,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
while (index.get() < count.get()) {
// and prepare for next iteration
- target = prepareExchange(exchange, index.get());
- boolean sync = process(target, callback, index, count);
+ // if (!copy) target = exchange; else copy of original
+ target = prepareExchange(exchange, index.get(), original);
+ boolean sync = process(target, callback, index, count, original);
if (!sync) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
@@ -94,12 +99,13 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
}
protected boolean process(final Exchange exchange, final AsyncCallback callback,
- final AtomicInteger index, final AtomicInteger count) {
+ final AtomicInteger index, final AtomicInteger count,
+ final Exchange original) {
// set current index as property
LOG.debug("LoopProcessor: iteration #{}", index.get());
exchange.setProperty(Exchange.LOOP_INDEX, index.get());
-
+
boolean sync = processor.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion of the routing slip
@@ -116,10 +122,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
while (index.get() < count.get()) {
// and prepare for next iteration
- target = prepareExchange(exchange, index.get());
+ target = prepareExchange(exchange, index.get(), original);
// process again
- boolean sync = process(target, callback, index, count);
+ boolean sync = process(target, callback, index, count, original);
if (!sync) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
// the remainder of the routing slip will be completed async
@@ -148,10 +154,11 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
* @param index the index of the next iteration
* @return the exchange to use
*/
- protected Exchange prepareExchange(Exchange exchange, int index) {
+ protected Exchange prepareExchange(Exchange exchange, int index, Exchange original) {
if (copy) {
// use a copy but let it reuse the same exchange id so it appear as one exchange
- return ExchangeHelper.createCopy(exchange, true);
+ // use the original exchange rather than the looping exchange (esp. with the async routing engine)
+ return ExchangeHelper.createCopy(original, true);
} else {
ExchangeHelper.prepareOutToIn(exchange);
return exchange;
http://git-wip-us.apache.org/repos/asf/camel/blob/1fc7bd7a/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopCopyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopCopyTest.java b/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopCopyTest.java
new file mode 100644
index 0000000..65bffe3
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopCopyTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.async.MyAsyncComponent;
+
+/**
+ * @version
+ */
+public class AsyncLoopCopyTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncLoopCopy() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:loopIterationStart").expectedBodiesReceived("Hello Camel", "Hello Camel");
+ getMockEndpoint("mock:loopIterationEnd").expectedBodiesReceived("Bye Camel", "Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello Camel");
+
+ String reply = template.requestBodyAndHeader("direct:start", "Hello Camel", "NumberIterations", 2, String.class);
+ assertEquals("Hello Camel", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before") // Should receive Hello Camel
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ })
+ .loop(header("NumberIterations")).copy()
+ .to("mock:loopIterationStart") // Should receive 2x Hello Camel
+ .to("async:bye:camel") // Will transform the body to Bye Camel
+ .to("mock:loopIterationEnd") // Should receive 2x Bye Camel
+ .end()
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:result"); // Should receive 1x Hello Camel (original message)
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1fc7bd7a/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopTest.java b/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopTest.java
new file mode 100644
index 0000000..9839dd7
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/AsyncLoopTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.async.MyAsyncComponent;
+
+/**
+ * @version
+ */
+public class AsyncLoopTest extends ContextTestSupport {
+
+ private static final String BASE_PAYLOAD = "<Hello n='4'/>";
+ MockEndpoint resultEndpoint;
+
+ public void testCounterLoop() throws Exception {
+ performLoopTest("direct:a", 8);
+ }
+
+ public void testExpressionLoop() throws Exception {
+ performLoopTest("direct:b", 6);
+ }
+
+ public void testExpressionClauseLoop() throws Exception {
+ performLoopTest("direct:c", 4);
+ }
+
+ public void testLoopAsBlock() throws Exception {
+ MockEndpoint lastEndpoint = resolveMandatoryEndpoint("mock:last", MockEndpoint.class);
+ lastEndpoint.expectedMessageCount(1);
+ lastEndpoint.expectedBodiesReceived(BASE_PAYLOAD + new String(new char[2]).replace("\0", " Hello Camel"));
+ performLoopTest("direct:d", 2);
+ lastEndpoint.assertIsSatisfied();
+ }
+
+ public void testLoopWithInvalidExpression() throws Exception {
+ try {
+ performLoopTest("direct:b", 4, "invalid");
+ fail("Exception expected for invalid expression");
+ } catch (RuntimeCamelException e) {
+ // expected
+ }
+ }
+
+ public void testLoopProperties() throws Exception {
+ MockEndpoint lastEndpoint = resolveMandatoryEndpoint("mock:last", MockEndpoint.class);
+ lastEndpoint.expectedMessageCount(1);
+ lastEndpoint.expectedBodiesReceived(BASE_PAYLOAD + new String(new char[10]).replace("\0", " Hello Camel"));
+ performLoopTest("direct:e", 10);
+ lastEndpoint.assertIsSatisfied();
+ }
+
+ private void performLoopTest(String endpointUri, int expectedIterations, String header) throws InterruptedException {
+ resultEndpoint.expectedMessageCount(expectedIterations);
+ List<String> results = new ArrayList<String>(expectedIterations);
+ for (int i = 0; i < expectedIterations; i++) {
+ results.add(BASE_PAYLOAD + new String(new char[i + 1]).replace("\0", " Hello Camel"));
+ }
+ resultEndpoint.expectedBodiesReceived(results);
+
+ template.sendBodyAndHeader(endpointUri, BASE_PAYLOAD, "loop", header);
+ resultEndpoint.assertIsSatisfied();
+ }
+
+ private void performLoopTest(String endpointUri, int expectedIterations) throws InterruptedException {
+ performLoopTest(endpointUri, expectedIterations, "6");
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.reset();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ final Processor loopTest = new LoopTestProcessor(10);
+
+ return new RouteBuilder() {
+ public void configure() {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:a")
+ .loop(8)
+ .to("async:hello:camel?append=true")
+ .to("mock:result");
+
+ from("direct:b")
+ .loop(header("loop"))
+ .to("async:hello:camel?append=true")
+ .to("mock:result");
+
+ from("direct:c")
+ .loop().xpath("/Hello/@n")
+ .to("async:hello:camel?append=true")
+ .to("mock:result");
+
+ from("direct:d")
+ .loop(2)
+ .to("async:hello:camel?append=true")
+ .to("mock:result")
+ .end()
+ .to("mock:last");
+
+ from("direct:e")
+ .loop(10)
+ .to("async:hello:camel?append=true")
+ .process(loopTest)
+ .to("mock:result")
+ .end()
+ .to("mock:last");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1fc7bd7a/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java b/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
index 04d0184..5bc9eff 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncEndpoint.java
@@ -28,6 +28,7 @@ import org.apache.camel.impl.SynchronousDelegateProducer;
*/
public class MyAsyncEndpoint extends DefaultEndpoint {
+ private boolean append;
private String reply;
private long delay = 25;
private int failFirstAttempts;
@@ -77,4 +78,12 @@ public class MyAsyncEndpoint extends DefaultEndpoint {
public void setFailFirstAttempts(int failFirstAttempts) {
this.failFirstAttempts = failFirstAttempts;
}
+
+ public boolean isAppend() {
+ return append;
+ }
+
+ public void setAppend(boolean append) {
+ this.append = append;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1fc7bd7a/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java b/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
index bedb4e0..c087999 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/MyAsyncProducer.java
@@ -58,6 +58,7 @@ public class MyAsyncProducer extends DefaultAsyncProducer {
exchange.setException(new CamelExchangeException("Simulated error at attempt " + count, exchange));
} else {
String reply = getEndpoint().getReply();
+ reply = getEndpoint().isAppend() ? exchange.getIn().getBody() + " " + reply : reply;
exchange.getOut().setBody(reply);
// propagate headers
exchange.getOut().setHeaders(exchange.getIn().getHeaders());