You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2015/11/13 15:43:46 UTC
camel git commit: CAMEL-9311: Fix concurrency when using
@DynamicRouter
Repository: camel
Updated Branches:
refs/heads/master 9c6bf4689 -> 8a770bfee
CAMEL-9311: Fix concurrency when using @DynamicRouter
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8a770bfe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8a770bfe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8a770bfe
Branch: refs/heads/master
Commit: 8a770bfeec6e2134eec12be6ef49a3458e99f7cc
Parents: 9c6bf46
Author: Gert Vanthienen <ge...@apache.org>
Authored: Fri Nov 13 15:41:08 2015 +0100
Committer: Gert Vanthienen <ge...@apache.org>
Committed: Fri Nov 13 15:41:19 2015 +0100
----------------------------------------------------------------------
.../apache/camel/processor/DynamicRouter.java | 2 +-
.../org/apache/camel/processor/RoutingSlip.java | 14 ++--
.../DynamicRouterConcurrentPOJOTest.java | 77 ++++++++++++++++++++
3 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8a770bfe/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java b/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
index 0e1c117..565c64e 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
@@ -43,7 +43,7 @@ public class DynamicRouter extends RoutingSlip {
}
@Override
- protected RoutingSlipIterator createRoutingSlipIterator(Exchange exchange) throws Exception {
+ protected RoutingSlipIterator createRoutingSlipIterator(Exchange exchange, Expression expression) throws Exception {
return new DynamicRoutingSlipIterator(expression);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8a770bfe/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index c20742c..c684593 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -183,25 +183,25 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
return true;
}
- return doRoutingSlip(exchange, callback);
+ return doRoutingSlipWithExpression(exchange, this.expression, callback);
}
public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) {
if (routingSlip instanceof Expression) {
- this.expression = (Expression) routingSlip;
+ return doRoutingSlipWithExpression(exchange, (Expression) routingSlip, callback);
} else {
- this.expression = ExpressionBuilder.constantExpression(routingSlip);
+ return doRoutingSlipWithExpression(exchange, ExpressionBuilder.constantExpression(routingSlip), callback);
}
- return doRoutingSlip(exchange, callback);
}
/**
* Creates the route slip iterator to be used.
*
* @param exchange the exchange
+ * @param expression the expression
* @return the iterator, should never be <tt>null</tt>
*/
- protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange) throws Exception {
+ protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange, final Expression expression) throws Exception {
Object slip = expression.evaluate(exchange, Object.class);
if (exchange.getException() != null) {
// force any exceptions occurred during evaluation to be thrown
@@ -221,11 +221,11 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
};
}
- private boolean doRoutingSlip(final Exchange exchange, final AsyncCallback callback) {
+ private boolean doRoutingSlipWithExpression(final Exchange exchange, final Expression expression, final AsyncCallback callback) {
Exchange current = exchange;
RoutingSlipIterator iter;
try {
- iter = createRoutingSlipIterator(exchange);
+ iter = createRoutingSlipIterator(exchange, expression);
} catch (Exception e) {
exchange.setException(e);
callback.done(true);
http://git-wip-us.apache.org/repos/asf/camel/blob/8a770bfe/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java b/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
new file mode 100644
index 0000000..eb6db04
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.dynamicrouter;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DynamicRouterConcurrentPOJOTest extends ContextTestSupport {
+
+ private static final int COUNT = 100;
+
+ public void testConcurrentDynamicRouter() throws Exception {
+ getMockEndpoint("mock:a").expectedMessageCount(COUNT);
+ getMockEndpoint("mock:b").expectedMessageCount(COUNT);
+
+ Thread sendToSedaA = createSedaSenderThread("seda:a");
+ Thread sendToSedaB = createSedaSenderThread("seda:b");
+
+ sendToSedaA.start();
+ sendToSedaB.start();
+
+ assertMockEndpointsSatisfied();
+ }
+
+ private Thread createSedaSenderThread(final String seda) {
+ return new Thread(new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < COUNT; i++) {
+ template.sendBody(seda, "Message from " + seda);
+ }
+ }
+ });
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("seda:a").bean(new MyDynamicRouterPojo("mock:a"));
+ from("seda:b").bean(new MyDynamicRouterPojo("mock:b"));
+ }
+ };
+ }
+
+ public class MyDynamicRouterPojo {
+
+ private final String target;
+
+ public MyDynamicRouterPojo(String target) {
+ super();
+ this.target = target;
+ }
+
+ @DynamicRouter
+ public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) {
+ if (previous == null) {
+ return target;
+ } else {
+ return null;
+ }
+ }
+ }
+}