You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ge...@apache.org on 2015/11/13 15:43:46 UTC

camel git commit: CAMEL-9311: Fix concurrency when using @DynamicRouter

Repository: camel
Updated Branches:
  refs/heads/master 9c6bf4689 -> 8a770bfee


CAMEL-9311: Fix concurrency when using @DynamicRouter


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8a770bfe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8a770bfe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8a770bfe

Branch: refs/heads/master
Commit: 8a770bfeec6e2134eec12be6ef49a3458e99f7cc
Parents: 9c6bf46
Author: Gert Vanthienen <ge...@apache.org>
Authored: Fri Nov 13 15:41:08 2015 +0100
Committer: Gert Vanthienen <ge...@apache.org>
Committed: Fri Nov 13 15:41:19 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/processor/DynamicRouter.java   |  2 +-
 .../org/apache/camel/processor/RoutingSlip.java | 14 ++--
 .../DynamicRouterConcurrentPOJOTest.java        | 77 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8a770bfe/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java b/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
index 0e1c117..565c64e 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/DynamicRouter.java
@@ -43,7 +43,7 @@ public class DynamicRouter extends RoutingSlip {
     }
 
     @Override
-    protected RoutingSlipIterator createRoutingSlipIterator(Exchange exchange) throws Exception {
+    protected RoutingSlipIterator createRoutingSlipIterator(Exchange exchange, Expression expression) throws Exception {
         return new DynamicRoutingSlipIterator(expression);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/8a770bfe/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index c20742c..c684593 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -183,25 +183,25 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
             return true;
         }
 
-        return doRoutingSlip(exchange, callback);
+        return doRoutingSlipWithExpression(exchange, this.expression, callback);
     }
 
     public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) {
         if (routingSlip instanceof Expression) {
-            this.expression = (Expression) routingSlip;
+            return doRoutingSlipWithExpression(exchange, (Expression) routingSlip, callback);
         } else {
-            this.expression = ExpressionBuilder.constantExpression(routingSlip);
+            return doRoutingSlipWithExpression(exchange, ExpressionBuilder.constantExpression(routingSlip), callback);
         }
-        return doRoutingSlip(exchange, callback);
     }
 
     /**
      * Creates the route slip iterator to be used.
      *
      * @param exchange the exchange
+     * @param expression the expression
      * @return the iterator, should never be <tt>null</tt>
      */
-    protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange) throws Exception {
+    protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange, final Expression expression) throws Exception {
         Object slip = expression.evaluate(exchange, Object.class);
         if (exchange.getException() != null) {
             // force any exceptions occurred during evaluation to be thrown
@@ -221,11 +221,11 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace
         };
     }
 
-    private boolean doRoutingSlip(final Exchange exchange, final AsyncCallback callback) {
+    private boolean doRoutingSlipWithExpression(final Exchange exchange, final Expression expression, final AsyncCallback callback) {
         Exchange current = exchange;
         RoutingSlipIterator iter;
         try {
-            iter = createRoutingSlipIterator(exchange);
+            iter = createRoutingSlipIterator(exchange, expression);
         } catch (Exception e) {
             exchange.setException(e);
             callback.done(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/8a770bfe/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java b/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
new file mode 100644
index 0000000..eb6db04
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/dynamicrouter/DynamicRouterConcurrentPOJOTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.dynamicrouter;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DynamicRouterConcurrentPOJOTest extends ContextTestSupport {
+
+    private static final int COUNT = 100;
+
+    public void testConcurrentDynamicRouter() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(COUNT);
+        getMockEndpoint("mock:b").expectedMessageCount(COUNT);
+
+        Thread sendToSedaA = createSedaSenderThread("seda:a");
+        Thread sendToSedaB = createSedaSenderThread("seda:b");
+
+        sendToSedaA.start();
+        sendToSedaB.start();
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private Thread createSedaSenderThread(final String seda) {
+        return new Thread(new Runnable() {
+            @Override
+            public void run() {
+                for (int i = 0; i < COUNT; i++) {
+                    template.sendBody(seda, "Message from " + seda);
+                }
+            }
+        });
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("seda:a").bean(new MyDynamicRouterPojo("mock:a"));
+                from("seda:b").bean(new MyDynamicRouterPojo("mock:b"));
+            }
+        };
+    }
+
+    public class MyDynamicRouterPojo {
+
+        private final String target;
+
+        public MyDynamicRouterPojo(String target) {
+            super();
+            this.target = target;
+        }
+
+        @DynamicRouter
+        public String route(@Header(Exchange.SLIP_ENDPOINT) String previous) {
+            if (previous == null) {
+                return target;
+            } else {
+                return null;
+            }
+        }
+    }
+}